NCache ha molte funzionalità: provider di cache ASP.NET e ricerca full-text, tra le altre. Ma, questa volta, vediamone un altro NCache caratteristica: messaggistica editore/abbonato (pub/sottotitoli). Impariamo come implementare il modello di messaggistica Pub/Sub con NCache.
Che cos'è la messaggistica Pub/Sub?
Pub/Sub è un modello di messaggistica in cui i mittenti (o gli editori) condividono i messaggi con più destinatari (o abbonati) attraverso un canale, evitando l'accoppiamento tra mittenti e destinatari.
Con la messaggistica Pub/Sub, possiamo condividere messaggi tra applicazioni e scaricare operazioni che richiedono tempo ai processori in background. Ad esempio, in un sistema di gestione delle prenotazioni, possiamo ridimensionare le immagini delle stanze in un processore in background "ascoltando" un evento attivato da un'applicazione Web.
NCache fornisce una messaggistica Pub/Sub in memoria per consentire la condivisione di informazioni in tempo reale tra le applicazioni .NET.
Grazie al architettura distribuita, NCache offre un meccanismo di messaggistica pub/sub scalabile, ad alta disponibilità ed efficiente in termini di archiviazione.
Come implementare la messaggistica Pub/Sub con NCache
Prima di passare a un'applicazione di esempio, esaminiamo prima un po' di terminologia.
Con NCache, il canale per lo scambio di messaggi è chiamato a Argomento. Ogni sottoscrittore si iscrive a un argomento per ricevere i messaggi che gli vengono inviati.
NCache fornisce più tipi di abbonamenti tra gli abbonati e gli editori. Nella messaggistica Pub/Sub, un abbonamento è l'"interesse" di uno o più iscritti a un argomento.
NCache ha due tipi di abbonamento:
- Non durevole: Con questo tipo di abbonamento, gli abbonati non riceveranno alcun messaggio inviato mentre sono disconnessi. Questo è il tipo di abbonamento predefinito.
- Durevole: Gli abbonati non perderanno alcun messaggio mentre sono disconnessi. Con gli abbonamenti durevoli, i messaggi vengono archiviati fino a quando gli abbonati non si uniscono nuovamente o i messaggi scadono.
Inoltre NCache ha questi due tipi di politiche:
- Condivisa: In questa politica, un abbonamento può avere più di un abbonato attivo alla volta. Se un abbonato lascia la rete, l'abbonamento rimane attivo. Questa politica è supportata solo dagli abbonamenti durevoli.
- Esclusivo: A differenza della politica Condivisa, gli abbonamenti con una politica Esclusiva possono avere un solo abbonato alla volta. Questa politica è disponibile sia per gli abbonamenti durevoli che non durevoli.
Nella messaggistica Pub/Sub, gli abbonati si registrano a un argomento per nome. Più avanti nella nostra applicazione di esempio, utilizzeremo questo metodo di abbonamento. Ma, NCache supporta anche un metodo di sottoscrizione basato su pattern. Questo significa un abbonato listens
a più argomenti in un'unica chiamata. Ad esempio, un abbonato che ascolta il modello new-*
riceverà i messaggi inviati al new-movies
ed new-series
argomenti.
Per maggiori dettagli sulle priorità degli argomenti e sui parametri dei messaggi, controlla Componenti e utilizzo della messaggistica Pub/Sub.
Con questa terminologia, creiamo un'applicazione .NET di esempio per informare gli appassionati di film su ogni nuova versione di film. Scriviamo due applicazioni Console: una come editore e un'altra come abbonato.
1. Pubblica un nuovo messaggio di film
Innanzitutto, creiamo un'app Console per pubblicare alcuni film. Inoltre, installiamo il Alachisoft.NCache.SDK
Pacchetto NuGet, versione 5.3.0.
Nel Program.cs
file, pubblichiamo alcuni filmati casuali da IMDb in a newReleases
argomento. Qualcosa come questo,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; using Movies.Shared.Extensions; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.CreateTopic(topicName); // 2. Create a new topic newReleasesTopic.MessageDeliveryFailure += OnFailureMessageReceived; // Attach a callback in case of delivery failures var newReleases = new List { new Movie("Top Gun: Maverick", 2022, 8.60f, 130, new []{ Genre.Action, Genre.Drama }), // Some other new movies here... }; foreach (var movie in newReleases) { var message = movie.ToMessage(Config.Expiration); // 3. Create a Message await newReleasesTopic.PublishAsync(message, DeliveryOption.All, true); // 4. Publish it } Console.WriteLine("Press any key to continue"); Console.ReadKey(); static void OnFailureMessageReceived(object sender, MessageFailedEventArgs args) { Console.WriteLine($"[ERROR] Failed to delivered message '{args.Message.Payload}'. Topic: [{args.TopicName}], Reason: [{args.MessageFailureReason}]"); } |
Esaminiamolo. Innanzitutto, abbiamo iniziato afferrando un riferimento a un NCache cache con GetCache()
. Stiamo usando l'impostazione predefinita demoCache
creato durante il NCache installazione.
Quindi, abbiamo creato un nuovo argomento utilizzando CreateTopic()
con un nome di argomento. Questo è lo stesso nome che useremo più avanti nell'applicazione dell'abbonato. Inoltre, abbiamo allegato una richiamata per essere avvisati di errori di consegna.
Per pubblicare un messaggio per ogni film, abbiamo usato PublishAsync()
con tre parametri: an NCache Message
, le DeliveryOption.All
per avvisare tutti gli abbonati, e true
per essere avvisato dei guasti.
I NCache Message
è un wrapper attorno all'oggetto da inviare. Dovremmo contrassegnarlo come [Serializable]
. Ad esempio, ecco il Movie
disco,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Entities; [Serializable] public record Movie(string Name, int ReleaseYear, float? Rating, int DurationInMinutes, Genre[] Genres) { public override string ToString() { return $"Movie: [{Name}] ({ReleaseYear})"; } } public enum Genre { Action, // Some other movie genres } |
Per costruire il Message
, abbiamo usato il ToMessage()
metodo di estensione con una scadenza.
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Extensions; public static class MovieExtensions { public static Message ToMessage(this object self, TimeSpan? expiration = null) => new Message(self, expiration); } |
Se uno dei nostri messaggi scade e nessuno lo riceve, il OnFailureMessageReceived
viene chiamata la richiamata. Per maggiori dettagli, controlla Comportamento e proprietà dei messaggi.
Ad esempio, eseguiamo la nostra applicazione Publisher senza (ancora) abbonato per far scadere i messaggi,
Nella nostra app Publisher, abbiamo utilizzato PublishAsync()
. Inoltre, NCache ha il supporto per inviare più messaggi in blocco in una singola chiamata con il PublishBulk()
metodo.
L'editore è pronto. Scriviamo la domanda di iscrizione.
2. Iscriviti ai nuovi messaggi di film
Creiamo un'altra applicazione Console a cui iscriversi newReleases
argomento.
I Program.cs
il file è simile a questo,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.GetTopic(topicName); // 2. Grab the same topic newReleasesTopic.OnTopicDeleted = OnTopicDeleted; // Attach a callback if the topic gets deleted if (newReleasesTopic == null) { Console.WriteLine($"Ooops...Topic [{topicName}] deleted."); } else { ITopicSubscription newReleasesSubscriber = newReleasesTopic.CreateSubscription(MessageReceived, DeliveryMode.Async); // 3. Attach a callback for new movies Console.WriteLine("Press any key to continue"); Console.ReadKey(); newReleasesSubscriber.UnSubscribe(); // 4. Unsubscribe... } void MessageReceived(object sender, MessageEventArgs args) { if (args.Message.Payload is Movie movie) { Console.WriteLine($"New Movie released: {movie}"); } } void OnTopicDeleted(object sender, TopicDeleteEventArgs args) { Console.WriteLine($"[ERROR] Ooops Topic deleted. Topic: [{args.TopicName}]"); } |
Questa volta, abbiamo fatto riferimento a un argomento utilizzando lo stesso nome dell'argomento utilizzato nell'applicazione Publisher e abbiamo allegato un callback in caso di eliminazione dell'argomento.
Successivamente, creiamo un abbonamento non durevole utilizzando CreateSubscription()
con una MessageReceived
richiamata e DeliveryMode.Async
. Con questa modalità di consegna, NCache non garantisce l'ordine dei messaggi. Per ricevere i messaggi in ordine, dovremmo usare DeliveryMode.Sync
.