NCache possède de nombreuses fonctionnalités : fournisseur de cache ASP.NET et recherche de texte intégral, entre autres. Mais, cette fois, voyons un autre NCache fonctionnalité : messagerie Éditeur/Abonné (Pub/Sub). Apprenons à mettre en œuvre le modèle de messagerie Pub/Sub avec NCache.
Qu'est-ce que la messagerie Pub/Sub ?
Pub/Sub est un modèle de messagerie dans lequel les expéditeurs (ou éditeurs) partagent des messages avec plusieurs destinataires (ou abonnés) via un canal, évitant ainsi le couplage entre expéditeurs et destinataires.
Avec la messagerie Pub/Sub, nous pouvons partager des messages entre les applications et décharger les opérations chronophages sur des processeurs d'arrière-plan. Par exemple, dans un système de gestion des réservations, nous pouvons redimensionner les images des chambres dans un processeur d'arrière-plan en "écoutant" un événement déclenché par une application Web.
NCache fournit une messagerie Pub/Sub en mémoire pour permettre le partage d'informations en temps réel entre les applications .NET.
Grâce au architecture distribuée, NCache offre un mécanisme de messagerie pub/sub évolutif, hautement disponible et économe en stockage.
Comment mettre en œuvre la messagerie Pub/Sub avec NCache
Avant de passer à un exemple d'application, passons d'abord en revue la terminologie.
Avec NCache, le canal pour échanger des messages est appelé un Sujet. Chaque abonné s'abonne à un sujet pour recevoir les messages qui lui sont envoyés.
NCache fournit plusieurs types d'abonnements entre les abonnés et les éditeurs. Dans la messagerie Pub/Sub, un abonnement est "l'intérêt" d'un ou plusieurs abonnés pour un sujet.
NCache propose deux types d'abonnements :
- Non durable : Avec ce type d'abonnement, les abonnés ne recevront aucun message envoyé lorsqu'ils sont déconnectés. Il s'agit du type d'abonnement par défaut.
- Durable: Les abonnés ne perdront aucun message tant qu'ils seront déconnectés. Avec les abonnements durables, les messages sont stockés jusqu'à ce que les abonnés se rejoignent ou que les messages expirent.
Aussi, les NCache a ces deux types de politiques :
- Partagé: Dans cette politique, un abonnement peut avoir plus d'un abonné actif à la fois. Si un abonné quitte le réseau, l'abonnement reste actif. Cette stratégie est uniquement prise en charge par les abonnements Durable.
- Exclusif: Contrairement à la politique partagée, les abonnements avec une politique exclusive ne pouvaient avoir qu'un seul abonné à la fois. Cette politique est disponible pour les abonnements durables et non durables.
Dans la messagerie Pub/Sub, les abonnés s'inscrivent à un sujet par leur nom. Plus loin dans notre exemple d'application, nous utiliserons cette méthode d'abonnement. Mais, NCache prend également en charge une méthode d'abonnement basée sur un modèle. Cela signifie qu'un abonné listens
à plusieurs sujets en un seul appel. Par exemple, un abonné écoutant le motif new-*
recevra les messages envoyés au new-movies
et new-series
sujets.
Pour plus de détails sur les priorités des sujets et les paramètres des messages, consultez Composants et utilisation de la messagerie Pub/Sub.
Avec cette terminologie en place, construisons un exemple d'application .NET pour informer les cinéphiles de chaque nouvelle sortie de film. Écrivons deux applications console : une en tant qu'éditeur et une autre en tant qu'abonné.
1. Publier un nouveau message de film
Commençons par créer une application console pour publier des films. Aussi, installons le Alachisoft.NCache.SDK
Paquet NuGet, version 5.3.0.
Dans le Program.cs
fichier, publions des films aléatoires d'IMDb vers un newReleases
sujet. Quelque chose comme ça,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; using Movies.Shared.Extensions; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.CreateTopic(topicName); // 2. Create a new topic newReleasesTopic.MessageDeliveryFailure += OnFailureMessageReceived; // Attach a callback in case of delivery failures var newReleases = new List { new Movie("Top Gun: Maverick", 2022, 8.60f, 130, new []{ Genre.Action, Genre.Drama }), // Some other new movies here... }; foreach (var movie in newReleases) { var message = movie.ToMessage(Config.Expiration); // 3. Create a Message await newReleasesTopic.PublishAsync(message, DeliveryOption.All, true); // 4. Publish it } Console.WriteLine("Press any key to continue"); Console.ReadKey(); static void OnFailureMessageReceived(object sender, MessageFailedEventArgs args) { Console.WriteLine($"[ERROR] Failed to delivered message '{args.Message.Payload}'. Topic: [{args.TopicName}], Reason: [{args.MessageFailureReason}]"); } |
Passons en revue. Tout d'abord, nous avons commencé par saisir une référence à un NCache cache avec GetCache()
. Nous utilisons la valeur par défaut demoCache
créé au cours de la NCache installation.
Ensuite, nous avons créé un nouveau sujet en utilisant CreateTopic()
avec un nom de sujet. C'est le même nom que nous utiliserons plus tard dans l'application d'abonné. De plus, nous avons joint un rappel pour être informé des échecs de livraison.
Pour publier un message pour chaque film, nous avons utilisé PublishAsync()
avec trois paramètres : un NCache Message
, DeliveryOption.All
informer tous les abonnés, et true
pour être informé des échecs.
La NCache Message
est un wrapper autour de l'objet à envoyer. Nous devrions le marquer comme [Serializable]
. Par exemple, voici le Movie
record,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Entities; [Serializable] public record Movie(string Name, int ReleaseYear, float? Rating, int DurationInMinutes, Genre[] Genres) { public override string ToString() { return $"Movie: [{Name}] ({ReleaseYear})"; } } public enum Genre { Action, // Some other movie genres } |
Pour construire le Message
, nous avons utilisé le ToMessage()
méthode d'extension avec une expiration.
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Extensions; public static class MovieExtensions { public static Message ToMessage(this object self, TimeSpan? expiration = null) => new Message(self, expiration); } |
Si l'un de nos messages expire et que personne ne le reçoit, le OnFailureMessageReceived
rappel est appelé. Pour plus de détails, consultez Comportement et propriétés des messages.
Par exemple, exécutons notre application Publisher sans (encore) abonné pour laisser les messages expirer,
Dans notre application Publisher, nous avons utilisé PublishAsync()
. En outre, NCache prend en charge l'envoi de plusieurs messages en masse en un seul appel avec le PublishBulk()
méthode.
L'éditeur est prêt. Écrivons l'application Abonné.
2. Abonnez-vous aux nouveaux messages de film
Créons une autre application console pour s'abonner au newReleases
sujet.
La Program.cs
le fichier ressemble à ceci,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.GetTopic(topicName); // 2. Grab the same topic newReleasesTopic.OnTopicDeleted = OnTopicDeleted; // Attach a callback if the topic gets deleted if (newReleasesTopic == null) { Console.WriteLine($"Ooops...Topic [{topicName}] deleted."); } else { ITopicSubscription newReleasesSubscriber = newReleasesTopic.CreateSubscription(MessageReceived, DeliveryMode.Async); // 3. Attach a callback for new movies Console.WriteLine("Press any key to continue"); Console.ReadKey(); newReleasesSubscriber.UnSubscribe(); // 4. Unsubscribe... } void MessageReceived(object sender, MessageEventArgs args) { if (args.Message.Payload is Movie movie) { Console.WriteLine($"New Movie released: {movie}"); } } void OnTopicDeleted(object sender, TopicDeleteEventArgs args) { Console.WriteLine($"[ERROR] Ooops Topic deleted. Topic: [{args.TopicName}]"); } |
Cette fois, nous avons référencé un sujet en utilisant le même nom de sujet que nous avons utilisé dans l'application Publisher et avons joint un rappel en cas de suppression de sujet.
Ensuite, nous créons un abonnement non durable en utilisant CreateSubscription()
avec une MessageReceived
rappel et DeliveryMode.Async
. Avec ce mode de livraison, NCache ne garantit pas l'ordre des messages. Pour recevoir des messages dans l'ordre, nous devons utiliser DeliveryMode.Sync
.