NCache には、ASP.NET キャッシュ プロバイダーやフルテキスト検索など、多くの機能があります。 だけど、今度は別のものを見てみましょう NCache 機能: パブリッシャー/サブスクライバー (Pub/Sub) メッセージング。 Pub/Sub メッセージング パターンを実装する方法を学びましょう NCache.
Pub/Sub メッセージングとは
Pub/Sub は、送信者 (またはパブリッシャー) がチャネルを介して複数の受信者 (またはサブスクライバー) とメッセージを共有し、送信者と受信者の間の結合を回避するメッセージング パターンです。
Pub/Sub メッセージングを使用すると、アプリケーション間でメッセージを共有し、時間のかかる操作をバックグラウンド プロセッサにオフロードできます。 たとえば、予約管理システムでは、Web アプリケーションによってトリガーされるイベントを「リッスン」することで、バックグラウンド プロセッサで部屋の画像のサイズを変更できます。
NCache メモリ内の Pub/Sub メッセージングを提供して、.NET アプリケーション間でリアルタイムの情報共有を可能にします。
おかげ 分散アーキテクチャ, NCache スケーラブルで可用性が高く、ストレージ効率の高い pub/sub メッセージング メカニズムを提供します。
Pub/Sub メッセージングを実装する方法 NCache
サンプル アプリケーションに移る前に、まずいくつかの用語について説明しましょう。
NCache、メッセージを交換するチャネルは a と呼ばれます。 ご用件. 各サブスクライバーは、トピックにサブスクライブして、送信されたメッセージを受信します。
NCache サブスクライバーとパブリッシャーの間で複数のタイプのサブスクリプションを提供します。 Pub/Sub メッセージングでは、サブスクリプションはトピック内の XNUMX 人以上のサブスクライバーの「関心」です。
NCache には、次の XNUMX 種類のサブスクリプションがあります。
- 非耐久性: このサブスクリプション タイプでは、サブスクライバーは、切断されている間に送信されたメッセージを受信しません。 これはデフォルトのサブスクリプション タイプです。
- 耐久性: サブスクライバーは、切断されている間、メッセージを失うことはありません。 永続サブスクリプションでは、メッセージはサブスクライバーが再参加するかメッセージの有効期限が切れるまで保存されます。
だから、 NCache 次の XNUMX 種類のポリシーがあります。
- 共有: このポリシーでは、サブスクリプションは一度に複数のアクティブなサブスクライバーを持つことができます。 サブスクライバーがネットワークを離れても、サブスクリプションはアクティブなままです。 このポリシーは、Durable サブスクリプションでのみサポートされています。
- 排他的: 共有ポリシーとは異なり、排他ポリシーのサブスクリプションは一度に XNUMX つのサブスクライバーのみを持つことができます。 このポリシーは、永続サブスクリプションと非永続サブスクリプションの両方で使用できます。
Pub/Sub メッセージングでは、サブスクライバーは名前でトピックに登録します。 サンプル アプリケーションの後半で、このサブスクリプション メソッドを使用します。 しかし、 NCache パターンベースの購読方法もサポートしています。 これはサブスクライバーを意味します listens
XNUMX 回の呼び出しで複数のトピックに。 たとえば、パターンを聞いている加入者 new-*
に送信されたメッセージを受信します new-movies
& new-series
トピック。
トピックの優先度とメッセージ パラメータの詳細については、 Pub/Sub メッセージングのコンポーネントと使用法。
この用語を使用して、サンプル .NET アプリケーションを作成し、新しい映画がリリースされるたびに映画ファンに通知してみましょう。 XNUMX つのコンソール アプリケーションを作成してみましょう。XNUMX つはパブリッシャーとして、もう XNUMX つはサブスクライバーとしてです。
1. 新しいムービー メッセージを公開する
まず、映画を公開するためのコンソール アプリを作成しましょう。 また、インストールしましょう Alachisoft.NCache.SDK
NuGet パッケージ、バージョン 5.3.0。
Program.cs
ファイル、IMDb からランダムなムービーをいくつか公開してみましょう。 newReleases
トピック。 このようなもの、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; using Movies.Shared.Extensions; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.CreateTopic(topicName); // 2. Create a new topic newReleasesTopic.MessageDeliveryFailure += OnFailureMessageReceived; // Attach a callback in case of delivery failures var newReleases = new List { new Movie("Top Gun: Maverick", 2022, 8.60f, 130, new []{ Genre.Action, Genre.Drama }), // Some other new movies here... }; foreach (var movie in newReleases) { var message = movie.ToMessage(Config.Expiration); // 3. Create a Message await newReleasesTopic.PublishAsync(message, DeliveryOption.All, true); // 4. Publish it } Console.WriteLine("Press any key to continue"); Console.ReadKey(); static void OnFailureMessageReceived(object sender, MessageFailedEventArgs args) { Console.WriteLine($"[ERROR] Failed to delivered message '{args.Message.Payload}'. Topic: [{args.TopicName}], Reason: [{args.MessageFailureReason}]"); } |
それを見てみましょう。 まず、への参照を取得することから始めました。 NCache キャッシュ GetCache()
. デフォルトを使用しています demoCache
の間に作成された NCache インストール。
次に、を使用して新しいトピックを作成しました CreateTopic()
トピック名付き。 これは、後でサブスクライバー アプリケーションで使用する名前と同じです。 また、配信失敗の通知を受けるためのコールバックを添付しました。
すべての映画のメッセージを公開するために、 PublishAsync()
XNUMX つのパラメーターを使用: NCache Message
DeliveryOption.All
すべてのサブスクライバーに通知するため、および true
障害の通知を受け取る。
NCache Message
送信するオブジェクトのラッパーです。 としてマークする必要があります [Serializable]
. たとえば、ここに Movie
記録する
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Entities; [Serializable] public record Movie(string Name, int ReleaseYear, float? Rating, int DurationInMinutes, Genre[] Genres) { public override string ToString() { return $"Movie: [{Name}] ({ReleaseYear})"; } } public enum Genre { Action, // Some other movie genres } |
を構築するには Message
、私たちが使用したのは、 ToMessage()
有効期限付きの延長方法。
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Extensions; public static class MovieExtensions { public static Message ToMessage(this object self, TimeSpan? expiration = null) => new Message(self, expiration); } |
メッセージの XNUMX つが期限切れになり、誰もそれを受信しない場合、 OnFailureMessageReceived
コールバックが呼び出されます。 詳細については、チェックしてください メッセージの動作とプロパティ.
たとえば、(まだ) サブスクライバーなしでパブリッシャー アプリケーションを実行して、メッセージの有効期限が切れるようにしましょう。
パブリッシャー アプリでは、 PublishAsync()
よりも優先されます。また、 NCache を使用して、XNUMX 回の呼び出しで複数のメッセージをまとめて送信できます。 PublishBulk()
方法。
パブリッシャーの準備が整いました。 Subscriber アプリケーションを書きましょう。
2.新しい映画メッセージを購読する
にサブスクライブする別のコンソール アプリケーションを作成しましょう。 newReleases
のトピックを参照してください。
Program.cs
ファイルは次のようになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.GetTopic(topicName); // 2. Grab the same topic newReleasesTopic.OnTopicDeleted = OnTopicDeleted; // Attach a callback if the topic gets deleted if (newReleasesTopic == null) { Console.WriteLine($"Ooops...Topic [{topicName}] deleted."); } else { ITopicSubscription newReleasesSubscriber = newReleasesTopic.CreateSubscription(MessageReceived, DeliveryMode.Async); // 3. Attach a callback for new movies Console.WriteLine("Press any key to continue"); Console.ReadKey(); newReleasesSubscriber.UnSubscribe(); // 4. Unsubscribe... } void MessageReceived(object sender, MessageEventArgs args) { if (args.Message.Payload is Movie movie) { Console.WriteLine($"New Movie released: {movie}"); } } void OnTopicDeleted(object sender, TopicDeleteEventArgs args) { Console.WriteLine($"[ERROR] Ooops Topic deleted. Topic: [{args.TopicName}]"); } |
今回は、Publisher アプリケーションで使用したのと同じトピック名を使用してトピック XNUMX を参照し、トピック削除の場合のコールバックを添付しました。
次に、非永続サブスクリプションを作成します CreateSubscription()
また、 MessageReceived
コールバックと DeliveryMode.Async
. この配信モードでは、 NCache メッセージの順序は保証されません。 メッセージを順番に受信するには、使用する必要があります DeliveryMode.Sync
.