NCache tem muitos recursos: provedor de cache ASP.NET e pesquisa de texto completo, entre outros. Mas, desta vez, vamos ver outro NCache recurso: mensagens do editor/assinante (Pub/Sub). Vamos aprender a implementar o padrão de mensagens Pub/Sub com NCache.
O que são mensagens do Pub/Sub?
Pub/Sub é um padrão de mensagens em que os remetentes (ou editores) compartilham mensagens com vários destinatários (ou assinantes) por meio de um canal, evitando o acoplamento entre remetentes e destinatários.
Com o sistema de mensagens Pub/Sub, podemos compartilhar mensagens entre aplicativos e transferir operações demoradas para processadores em segundo plano. Por exemplo, em um Sistema de Gerenciamento de Reservas, podemos redimensionar imagens de salas em um processador em segundo plano “escutando” um evento acionado por um aplicativo da Web.
NCache fornece um sistema de mensagens Pub/Sub na memória para permitir o compartilhamento de informações em tempo real entre aplicativos .NET.
Graças a arquitetura distribuída, NCache oferece um mecanismo de mensagens pub/sub escalável, altamente disponível e eficiente em armazenamento.
Como implementar mensagens do Pub/Sub com NCache
Antes de passar para um aplicativo de exemplo, vamos primeiro passar por algumas terminologias.
Com o NCache, o canal para troca de mensagens é chamado de Tema. Cada assinante assina um tópico para receber as mensagens enviadas a ele.
NCache fornece vários tipos de assinaturas entre os assinantes e os editores. Nas mensagens do Pub/Sub, uma assinatura é o "interesse" de um ou mais assinantes em um tópico.
NCache tem dois tipos de assinaturas:
- Não durável: Com esse tipo de assinatura, os assinantes não receberão nenhuma mensagem enviada enquanto estiverem desconectados. Este é o tipo de assinatura padrão.
- Durável: Os assinantes não perderão nenhuma mensagem enquanto estiverem desconectados. Com assinaturas duráveis, as mensagens são armazenadas até que os assinantes voltem a participar ou as mensagens expirem.
Também, NCache tem estes dois tipos de políticas:
- Compartilhado: Nesta política, uma assinatura pode ter mais de um assinante ativo por vez. Se um assinante sair da rede, a assinatura permanece ativa. Esta política só é compatível com assinaturas Duráveis.
- Exclusivo: Ao contrário da política Compartilhada, as assinaturas com uma política Exclusiva podem ter apenas um assinante por vez. Esta política está disponível para assinaturas Duráveis e Não Duráveis.
No sistema de mensagens Pub/Sub, os assinantes se registram em um tópico pelo nome. Mais tarde em nosso aplicativo de exemplo, usaremos esse método de assinatura. Mas, NCache também suporta um método de assinatura baseado em padrão. Isso significa um assinante listens
para vários tópicos em uma única chamada. Por exemplo, um assinante ouvindo o padrão new-*
receberá as mensagens enviadas para o new-movies
e new-series
tópicos.
Para obter mais detalhes sobre prioridades de tópicos e parâmetros de mensagens, consulte Componentes e uso de mensagens do Pub/Sub.
Com essa terminologia em vigor, vamos criar um aplicativo .NET de amostra para notificar os fãs de cinema sobre cada novo lançamento de filme. Vamos escrever dois aplicativos de console: um como editor e outro como assinante.
1. Publique uma nova mensagem de filme
Primeiro, vamos criar um aplicativo de console para publicar alguns filmes. Além disso, vamos instalar o Alachisoft.NCache.SDK
Pacote NuGet, versão 5.3.0.
No Program.cs
arquivo, vamos publicar alguns filmes aleatórios do IMDb para um newReleases
tema. Algo assim,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; using Movies.Shared.Extensions; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.CreateTopic(topicName); // 2. Create a new topic newReleasesTopic.MessageDeliveryFailure += OnFailureMessageReceived; // Attach a callback in case of delivery failures var newReleases = new List { new Movie("Top Gun: Maverick", 2022, 8.60f, 130, new []{ Genre.Action, Genre.Drama }), // Some other new movies here... }; foreach (var movie in newReleases) { var message = movie.ToMessage(Config.Expiration); // 3. Create a Message await newReleasesTopic.PublishAsync(message, DeliveryOption.All, true); // 4. Publish it } Console.WriteLine("Press any key to continue"); Console.ReadKey(); static void OnFailureMessageReceived(object sender, MessageFailedEventArgs args) { Console.WriteLine($"[ERROR] Failed to delivered message '{args.Message.Payload}'. Topic: [{args.TopicName}], Reason: [{args.MessageFailureReason}]"); } |
Vamos passar por isso. Primeiro, começamos pegando uma referência a um NCache cache com GetCache()
. Estamos usando o padrão demoCache
criado durante o NCache instalação.
Em seguida, criamos um novo tópico usando CreateTopic()
com um nome de tópico. Este é o mesmo nome que usaremos posteriormente no aplicativo do assinante. Além disso, anexamos um retorno de chamada para ser notificado sobre falhas de entrega.
Para publicar uma mensagem para cada filme, usamos PublishAsync()
com três parâmetros: um NCache Message
, DeliveryOption.All
notificar todos os assinantes, e true
para ser notificado de falhas.
A NCache Message
é um wrapper em torno do objeto a ser enviado. Devemos marcá-lo como [Serializable]
. Por exemplo, aqui está o Movie
registro,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Entities; [Serializable] public record Movie(string Name, int ReleaseYear, float? Rating, int DurationInMinutes, Genre[] Genres) { public override string ToString() { return $"Movie: [{Name}] ({ReleaseYear})"; } } public enum Genre { Action, // Some other movie genres } |
Para construir o Message
, usamos o ToMessage()
método de extensão com uma expiração.
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Extensions; public static class MovieExtensions { public static Message ToMessage(this object self, TimeSpan? expiration = null) => new Message(self, expiration); } |
Se uma de nossas mensagens expirar e ninguém a receber, o OnFailureMessageReceived
retorno de chamada é chamado. Para mais detalhes, verifique Comportamento e Propriedades das Mensagens.
Por exemplo, vamos executar nosso aplicativo Publisher sem nenhum assinante (ainda) para deixar as mensagens expirarem,
Em nosso aplicativo Publisher, usamos PublishAsync()
. Além disso, NCache tem suporte para enviar várias mensagens em massa em uma única chamada com o PublishBulk()
método.
A Editora está pronta. Vamos escrever o aplicativo Assinante.
2. Assine as Mensagens de Novos Filmes
Vamos criar outro aplicativo de console para assinar o newReleases
tópico.
A Program.cs
arquivo se parece com isso,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.GetTopic(topicName); // 2. Grab the same topic newReleasesTopic.OnTopicDeleted = OnTopicDeleted; // Attach a callback if the topic gets deleted if (newReleasesTopic == null) { Console.WriteLine($"Ooops...Topic [{topicName}] deleted."); } else { ITopicSubscription newReleasesSubscriber = newReleasesTopic.CreateSubscription(MessageReceived, DeliveryMode.Async); // 3. Attach a callback for new movies Console.WriteLine("Press any key to continue"); Console.ReadKey(); newReleasesSubscriber.UnSubscribe(); // 4. Unsubscribe... } void MessageReceived(object sender, MessageEventArgs args) { if (args.Message.Payload is Movie movie) { Console.WriteLine($"New Movie released: {movie}"); } } void OnTopicDeleted(object sender, TopicDeleteEventArgs args) { Console.WriteLine($"[ERROR] Ooops Topic deleted. Topic: [{args.TopicName}]"); } |
Desta vez, referenciamos um tópico usando o mesmo nome de tópico que usamos no aplicativo Publicador e anexamos um retorno de chamada em caso de exclusão do tópico.
Em seguida, criamos uma assinatura não durável usando CreateSubscription()
com uma MessageReceived
retorno de chamada e DeliveryMode.Async
. Com este modo de entrega, NCache não garante a ordem das mensagens. Para receber as mensagens em ordem, devemos usar DeliveryMode.Sync
.