NCache tiene muchas características: proveedor de caché ASP.NET y búsqueda de texto completo, entre otras. Pero, esta vez, veamos otra. NCache característica: Mensajería de publicador/suscriptor (Pub/Sub). Aprendamos a implementar el patrón de mensajería Pub/Sub con NCache.
¿Qué es la mensajería Pub/Sub?
Pub/Sub es un patrón de mensajería en el que los remitentes (o publicadores) comparten mensajes con múltiples receptores (o suscriptores) a través de un canal, evitando el acoplamiento entre remitentes y receptores.
Con la mensajería Pub/Sub, podemos compartir mensajes entre aplicaciones y descargar operaciones que consumen mucho tiempo a procesadores en segundo plano. Por ejemplo, en un sistema de gestión de reservas, podemos cambiar el tamaño de las imágenes de las habitaciones en un procesador de fondo al "escuchar" un evento desencadenado por una aplicación web.
NCache proporciona una mensajería Pub/Sub en memoria para permitir el intercambio de información en tiempo real entre aplicaciones .NET.
Gracias a arquitectura distribuida, NCache ofrece un mecanismo de mensajería pub/sub escalable, altamente disponible y eficiente en almacenamiento.
Cómo implementar la mensajería Pub/Sub con NCache
Antes de pasar a una aplicación de muestra, primero analicemos un poco la terminología.
Con NCache, el canal para intercambiar mensajes se llama Tema. Cada suscriptor se suscribe a un tema para recibir los mensajes que se le envían.
NCache proporciona múltiples tipos de suscripciones entre los suscriptores y los editores. En la mensajería Pub/Sub, una suscripción es el "interés" de uno o más suscriptores en un tema.
NCache tiene dos tipos de suscripciones:
- No duradero: Con este tipo de suscripción, los suscriptores no recibirán ningún mensaje enviado mientras estén desconectados. Este es el tipo de suscripción predeterminado.
- Durable: Los suscriptores no perderán ningún mensaje mientras estén desconectados. Con las suscripciones duraderas, los mensajes se almacenan hasta que los suscriptores se reincorporan o caducan.
También, NCache tiene estos dos tipos de pólizas:
- Compartido: En esta política, una suscripción podría tener más de un suscriptor activo a la vez. Si un suscriptor abandona la red, la suscripción se mantiene activa. Esta política solo es compatible con suscripciones duraderas.
- Exclusivo: A diferencia de la política Compartida, las suscripciones con una política Exclusiva solo pueden tener un suscriptor a la vez. Esta política está disponible para suscripciones duraderas y no duraderas.
En la mensajería Pub/Sub, los suscriptores se registran en un tema por su nombre. Más adelante en nuestra aplicación de muestra, usaremos este método de suscripción. Pero, NCache también admite un método de suscripción basado en patrones. Esto significa un suscriptor listens
a múltiples temas en una sola llamada. Por ejemplo, un suscriptor escuchando el patrón new-*
recibirá los mensajes enviados al new-movies
y new-series
temas.
Para obtener más detalles sobre las prioridades de los temas y los parámetros de los mensajes, consulte Uso y componentes de mensajería Pub/Sub.
Con esta terminología en su lugar, construyamos una aplicación .NET de muestra para notificar a los fanáticos del cine sobre cada nuevo lanzamiento de película. Escribamos dos aplicaciones de consola: una como editor y otra como suscriptor.
1. Publicar un nuevo mensaje de película
Primero, creemos una aplicación de consola para publicar algunas películas. Además, vamos a instalar el Alachisoft.NCache.SDK
Paquete NuGet, versión 5.3.0.
En Program.cs
archivo, publiquemos algunas películas al azar de IMDb a un newReleases
tema. Algo como esto,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; using Movies.Shared.Extensions; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.CreateTopic(topicName); // 2. Create a new topic newReleasesTopic.MessageDeliveryFailure += OnFailureMessageReceived; // Attach a callback in case of delivery failures var newReleases = new List { new Movie("Top Gun: Maverick", 2022, 8.60f, 130, new []{ Genre.Action, Genre.Drama }), // Some other new movies here... }; foreach (var movie in newReleases) { var message = movie.ToMessage(Config.Expiration); // 3. Create a Message await newReleasesTopic.PublishAsync(message, DeliveryOption.All, true); // 4. Publish it } Console.WriteLine("Press any key to continue"); Console.ReadKey(); static void OnFailureMessageReceived(object sender, MessageFailedEventArgs args) { Console.WriteLine($"[ERROR] Failed to delivered message '{args.Message.Payload}'. Topic: [{args.TopicName}], Reason: [{args.MessageFailureReason}]"); } |
Vamos a repasarlo. Primero, empezamos tomando una referencia a un NCache caché con GetCache()
. Estamos usando el predeterminado demoCache
creado durante el NCache instalación.
Luego, creamos un nuevo tema usando CreateTopic()
con un nombre de tema. Este es el mismo nombre que usaremos más adelante en la aplicación de suscripción. Además, adjuntamos una devolución de llamada para recibir notificaciones de fallas en la entrega.
Para publicar un mensaje para cada película, usamos PublishAsync()
con tres parámetros: un NCache Message
, la DeliveryOption.All
para notificar a todos los suscriptores, y true
para recibir notificaciones de fallas.
La NCache Message
es un contenedor alrededor del objeto a enviar. Deberíamos marcarlo como [Serializable]
. Por ejemplo, aquí está el Movie
grabar,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Entities; [Serializable] public record Movie(string Name, int ReleaseYear, float? Rating, int DurationInMinutes, Genre[] Genres) { public override string ToString() { return $"Movie: [{Name}] ({ReleaseYear})"; } } public enum Genre { Action, // Some other movie genres } |
Para construir el Message
, usamos el ToMessage()
método de extensión con una caducidad.
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Extensions; public static class MovieExtensions { public static Message ToMessage(this object self, TimeSpan? expiration = null) => new Message(self, expiration); } |
Si uno de nuestros mensajes caduca y nadie lo recibe, el OnFailureMessageReceived
se llama la devolución de llamada. Para más detalles, consulte Comportamiento y propiedades de los mensajes.
Por ejemplo, ejecutemos nuestra aplicación Publisher sin ningún suscriptor (todavía) para dejar que los mensajes caduquen,
En nuestra aplicación Publisher, usamos PublishAsync()
. Además, NCache tiene soporte para enviar múltiples mensajes de forma masiva en una sola llamada con el PublishBulk()
método.
El editor está listo. Escribamos la aplicación Suscriptor.
2. Suscríbete a nuevos mensajes de películas
Vamos a crear otra aplicación de consola para suscribirnos a la newReleases
tema.
La Program.cs
el archivo se ve así,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.GetTopic(topicName); // 2. Grab the same topic newReleasesTopic.OnTopicDeleted = OnTopicDeleted; // Attach a callback if the topic gets deleted if (newReleasesTopic == null) { Console.WriteLine($"Ooops...Topic [{topicName}] deleted."); } else { ITopicSubscription newReleasesSubscriber = newReleasesTopic.CreateSubscription(MessageReceived, DeliveryMode.Async); // 3. Attach a callback for new movies Console.WriteLine("Press any key to continue"); Console.ReadKey(); newReleasesSubscriber.UnSubscribe(); // 4. Unsubscribe... } void MessageReceived(object sender, MessageEventArgs args) { if (args.Message.Payload is Movie movie) { Console.WriteLine($"New Movie released: {movie}"); } } void OnTopicDeleted(object sender, TopicDeleteEventArgs args) { Console.WriteLine($"[ERROR] Ooops Topic deleted. Topic: [{args.TopicName}]"); } |
Esta vez, hicimos referencia a un tema con el mismo nombre de tema que usamos en la aplicación Publisher y adjuntamos una devolución de llamada en caso de eliminación del tema.
A continuación, creamos una suscripción no duradera usando CreateSubscription()
con un MessageReceived
devolución de llamada y DeliveryMode.Async
. Con este modo de entrega, NCache no garantiza el orden de los mensajes. Para recibir mensajes en orden, debemos usar DeliveryMode.Sync
.