NCache hat viele Funktionen: unter anderem ASP.NET-Cache-Anbieter und Volltextsuche. Aber dieses Mal sehen wir uns einen anderen an NCache Feature: Publisher/Subscriber (Pub/Sub) Messaging. Sehen wir uns an, wie Sie das Pub/Sub-Messaging-Muster mit implementieren NCache.
Was ist Pub/Sub-Messaging?
Pub/Sub ist ein Messaging-Muster, bei dem Sender (oder Herausgeber) Nachrichten über einen Kanal mit mehreren Empfängern (oder Abonnenten) teilen, wodurch eine Kopplung zwischen Sendern und Empfängern vermieden wird.
Mit Pub/Sub-Messaging können wir Nachrichten zwischen Anwendungen austauschen und zeitaufwändige Vorgänge an Hintergrundprozessoren auslagern. Beispielsweise können wir in einem Reservierungsmanagementsystem die Größe von Zimmerbildern in einem Hintergrundprozessor ändern, indem wir auf ein Ereignis „lauschen“, das von einer Webanwendung ausgelöst wird.
NCache bietet ein In-Memory-Pub/Sub-Messaging, um den Informationsaustausch in Echtzeit zwischen .NET-Anwendungen zu ermöglichen.
Dank an verteilte Architektur, NCache bietet einen skalierbaren, hochverfügbaren und speichereffizienten Pub/Sub-Messaging-Mechanismus.
So implementieren Sie Pub/Sub-Messaging mit NCache
Bevor wir zu einer Beispielanwendung übergehen, gehen wir zunächst einige Terminologien durch.
Mit der NCache, der Kanal zum Austausch von Nachrichten heißt a Betreff. Jeder Abonnent abonniert ein Thema, um die an ihn gesendeten Nachrichten zu erhalten.
NCache bietet mehrere Arten von Abonnements zwischen den Abonnenten und Herausgebern. Beim Pub/Sub-Messaging ist ein Abonnement das „Interesse“ eines oder mehrerer Abonnenten an einem Thema.
NCache hat zwei Arten von Abonnements:
- Nicht haltbar: Bei diesem Abonnementtyp erhalten Abonnenten keine Nachrichten, die gesendet werden, während sie getrennt sind. Dies ist der Standardabonnementtyp.
- Dauerhaft: Abonnenten verlieren keine Nachrichten, während sie getrennt sind. Bei dauerhaften Abonnements werden Nachrichten gespeichert, bis Abonnenten wieder beitreten oder die Nachrichten ablaufen.
Ebenfalls, NCache hat diese zwei Arten von Richtlinien:
- Geteilt: In dieser Richtlinie kann ein Abonnement mehr als einen aktiven Abonnenten gleichzeitig haben. Wenn ein Abonnent das Netzwerk verlässt, bleibt das Abonnement aktiv. Diese Richtlinie wird nur von Durable-Abonnements unterstützt.
- exklusiv: Anders als bei der Shared-Policy können Abonnements mit einer Exclusive-Policy jeweils nur einen Abonnenten haben. Diese Richtlinie ist sowohl für dauerhafte als auch für nicht dauerhafte Abonnements verfügbar.
Beim Pub/Sub-Messaging registrieren sich Abonnenten namentlich für ein Thema. Später in unserer Beispielanwendung werden wir diese Abonnementmethode verwenden. Aber, NCache unterstützt auch eine musterbasierte Abonnementmethode. Dies bedeutet einen Abonnenten listens
zu mehreren Themen in einem einzigen Anruf. Zum Beispiel ein Abonnent, der das Muster abhört new-*
erhält Nachrichten, die an die gesendet werden new-movies
und new-series
Themen.
Weitere Einzelheiten zu Themenprioritäten und Nachrichtenparametern finden Sie unter Pub/Sub-Messaging-Komponenten und -Nutzung.
Lassen Sie uns mit dieser Terminologie eine .NET-Beispielanwendung erstellen, um Filmfans über jede neue Filmveröffentlichung zu informieren. Lassen Sie uns zwei Konsolenanwendungen schreiben: eine als Herausgeber und eine als Abonnent.
1. Veröffentlichen Sie eine neue Filmnachricht
Lassen Sie uns zunächst eine Konsolen-App erstellen, um einige Filme zu veröffentlichen. Lassen Sie uns auch installieren Alachisoft.NCache.SDK
NuGet-Paket, Version 5.3.0.
Im Program.cs
Datei, veröffentlichen wir einige zufällige Filme von IMDb in a newReleases
Thema. Etwas wie das,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; using Movies.Shared.Extensions; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.CreateTopic(topicName); // 2. Create a new topic newReleasesTopic.MessageDeliveryFailure += OnFailureMessageReceived; // Attach a callback in case of delivery failures var newReleases = new List { new Movie("Top Gun: Maverick", 2022, 8.60f, 130, new []{ Genre.Action, Genre.Drama }), // Some other new movies here... }; foreach (var movie in newReleases) { var message = movie.ToMessage(Config.Expiration); // 3. Create a Message await newReleasesTopic.PublishAsync(message, DeliveryOption.All, true); // 4. Publish it } Console.WriteLine("Press any key to continue"); Console.ReadKey(); static void OnFailureMessageReceived(object sender, MessageFailedEventArgs args) { Console.WriteLine($"[ERROR] Failed to delivered message '{args.Message.Payload}'. Topic: [{args.TopicName}], Reason: [{args.MessageFailureReason}]"); } |
Gehen wir es durch. Zuerst haben wir damit begonnen, einen Verweis auf an zu greifen NCache Cache mit GetCache()
. Wir verwenden die Standardeinstellung demoCache
erstellt während der NCache Installation.
Dann haben wir ein neues Thema mit erstellt CreateTopic()
mit Themennamen. Dies ist derselbe Name, den wir später in der Abonnentenanwendung verwenden werden. Außerdem haben wir einen Rückruf angehängt, um über Lieferausfälle benachrichtigt zu werden.
Um eine Nachricht für jeden Film zu veröffentlichen, haben wir verwendet PublishAsync()
mit drei Parametern: an NCache Message
, der DeliveryOption.All
alle Abonnenten zu benachrichtigen, und true
um über Störungen benachrichtigt zu werden.
Das NCache Message
ist ein Wrapper um das zu sendende Objekt. Wir sollten es als markieren [Serializable]
. Hier ist zum Beispiel die Movie
Aufzeichnung,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Entities; [Serializable] public record Movie(string Name, int ReleaseYear, float? Rating, int DurationInMinutes, Genre[] Genres) { public override string ToString() { return $"Movie: [{Name}] ({ReleaseYear})"; } } public enum Genre { Action, // Some other movie genres } |
Um die zu bauen Message
, wir haben das verwendet ToMessage()
Erweiterungsmethode mit einem Ablaufdatum.
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Extensions; public static class MovieExtensions { public static Message ToMessage(this object self, TimeSpan? expiration = null) => new Message(self, expiration); } |
Wenn eine unserer Nachrichten abläuft und niemand sie erhält, wird die OnFailureMessageReceived
Rückruf wird gerufen. Weitere Einzelheiten finden Sie unter Verhalten und Eigenschaften von Nachrichten.
Lassen Sie uns beispielsweise unsere Publisher-Anwendung (noch) ohne Abonnenten ausführen, damit die Nachrichten ablaufen.
In unserer Publisher-App haben wir verwendet PublishAsync()
. Auch NCache hat Unterstützung, um mehrere Nachrichten in großen Mengen in einem einzigen Anruf mit zu senden PublishBulk()
Methode.
Der Verlag ist bereit. Lassen Sie uns die Abonnentenanwendung schreiben.
2. Abonnieren Sie neue Filmnachrichten
Lassen Sie uns eine weitere Konsolenanwendung erstellen, um die zu abonnieren newReleases
Thema.
Das Program.cs
Datei sieht so aus,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.GetTopic(topicName); // 2. Grab the same topic newReleasesTopic.OnTopicDeleted = OnTopicDeleted; // Attach a callback if the topic gets deleted if (newReleasesTopic == null) { Console.WriteLine($"Ooops...Topic [{topicName}] deleted."); } else { ITopicSubscription newReleasesSubscriber = newReleasesTopic.CreateSubscription(MessageReceived, DeliveryMode.Async); // 3. Attach a callback for new movies Console.WriteLine("Press any key to continue"); Console.ReadKey(); newReleasesSubscriber.UnSubscribe(); // 4. Unsubscribe... } void MessageReceived(object sender, MessageEventArgs args) { if (args.Message.Payload is Movie movie) { Console.WriteLine($"New Movie released: {movie}"); } } void OnTopicDeleted(object sender, TopicDeleteEventArgs args) { Console.WriteLine($"[ERROR] Ooops Topic deleted. Topic: [{args.TopicName}]"); } |
Dieses Mal haben wir auf ein Thema mit dem gleichen Themennamen verwiesen, den wir in der Publisher-Anwendung verwendet haben, und im Falle einer Themenlöschung einen Rückruf angehängt.
Als Nächstes erstellen wir ein nicht dauerhaftes Abonnement mit CreateSubscription()
mit MessageReceived
Rückruf und DeliveryMode.Async
. Bei diesem Liefermodus NCache garantiert nicht die Reihenfolge der Nachrichten. Um Nachrichten in der richtigen Reihenfolge zu erhalten, sollten wir verwenden DeliveryMode.Sync
.