NCache 具有许多功能:ASP.NET 缓存提供程序和全文搜索等。 但是,这一次,让我们看看另一个 NCache 功能:发布者/订阅者 (Pub/Sub) 消息传递。 让我们学习如何实现 Pub/Sub 消息传递模式 NCache.
什么是发布/订阅消息?
Pub/Sub 是一种消息传递模式,其中发送者(或发布者)通过通道与多个接收者(或订阅者)共享消息,避免发送者和接收者之间的耦合。
通过 Pub/Sub 消息传递,我们可以在应用程序之间共享消息并将耗时的操作卸载到后台处理器。 例如,在预订管理系统中,我们可以在后台处理器中通过“监听”由 Web 应用程序触发的事件来调整房间图像的大小。
NCache 提供内存中的 Pub/Sub 消息传递,以实现 .NET 应用程序之间的实时信息共享。
由于 分布式架构, NCache 提供可扩展、高可用性和存储高效的发布/订阅消息传递机制。
如何使用 Pub/Sub 消息传递 NCache
在转到示例应用程序之前,让我们先了解一些术语。
NCache,交换消息的通道称为 话题. 每个订阅者订阅一个主题以接收发送给它的消息。
NCache 在订阅者和发布者之间提供多种类型的订阅。 在 Pub/Sub 消息传递中,订阅是一个或多个订阅者对某个主题的“兴趣”。
NCache 有两种订阅类型:
- 不耐用: 使用此订阅类型,订阅者在断开连接时不会收到任何发送的消息。 这是默认订阅类型。
- 耐用: 订阅者在断开连接时不会丢失任何消息。 使用持久订阅,消息会一直存储到订阅者重新加入或消息过期。
也, NCache 有这两种类型的策略:
- 共享: 在此策略中,订阅一次可以有多个活动订阅者。 如果订阅者离开网络,订阅将保持活动状态。 此策略仅受持久订阅支持。
- 独家: 与共享策略不同,具有独占策略的订阅一次只能有一个订阅者。 此政策适用于持久和非持久订阅。
在 Pub/Sub 消息传递中,订阅者按名称注册主题。 稍后在我们的示例应用程序中,我们将使用此订阅方法。 但, NCache 还支持基于模式的订阅方法。 这意味着订阅者 listens
在一次通话中处理多个主题。 例如,订阅模式的订阅者 new-*
将收到发送到的消息 new-movies
和 new-series
话题。
有关主题优先级和消息参数的更多详细信息,请查看 Pub/Sub 消息传递组件和使用。
有了这个术语,让我们构建一个示例 .NET 应用程序来通知影迷有关每部新电影的发布。 让我们编写两个控制台应用程序:一个作为发布者,另一个作为订阅者。
1.发布新的电影消息
首先,让我们创建一个控制台应用程序来发布一些电影。 另外,让我们安装 Alachisoft.NCache.SDK
NuGet 包,版本 5.3.0。
在 Program.cs
文件,让我们从 IMDb 发布一些随机电影到 newReleases
话题。 像这样的东西,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; using Movies.Shared.Extensions; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.CreateTopic(topicName); // 2. Create a new topic newReleasesTopic.MessageDeliveryFailure += OnFailureMessageReceived; // Attach a callback in case of delivery failures var newReleases = new List { new Movie("Top Gun: Maverick", 2022, 8.60f, 130, new []{ Genre.Action, Genre.Drama }), // Some other new movies here... }; foreach (var movie in newReleases) { var message = movie.ToMessage(Config.Expiration); // 3. Create a Message await newReleasesTopic.PublishAsync(message, DeliveryOption.All, true); // 4. Publish it } Console.WriteLine("Press any key to continue"); Console.ReadKey(); static void OnFailureMessageReceived(object sender, MessageFailedEventArgs args) { Console.WriteLine($"[ERROR] Failed to delivered message '{args.Message.Payload}'. Topic: [{args.TopicName}], Reason: [{args.MessageFailureReason}]"); } |
让我们来看看它。 首先,我们首先获取一个对 NCache 缓存 GetCache()
. 我们使用默认值 demoCache
期间创建的 NCache 安装。
然后,我们使用创建了一个新主题 CreateTopic()
带有主题名称。 这与我们稍后将在订阅者应用程序中使用的名称相同。 此外,我们附加了一个回调以获取有关交付失败的通知。
为了为每部电影发布一条消息,我们使用了 PublishAsync()
带有三个参数:一个 NCache Message
是, DeliveryOption.All
通知所有订阅者,以及 true
获得失败通知。
NCache Message
是要发送的对象的包装器。 我们应该将其标记为 [Serializable]
. 例如,这里是 Movie
记录,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Entities; [Serializable] public record Movie(string Name, int ReleaseYear, float? Rating, int DurationInMinutes, Genre[] Genres) { public override string ToString() { return $"Movie: [{Name}] ({ReleaseYear})"; } } public enum Genre { Action, // Some other movie genres } |
建立 Message
,我们使用了 ToMessage()
到期的扩展方法。
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Movies.Shared.Extensions; public static class MovieExtensions { public static Message ToMessage(this object self, TimeSpan? expiration = null) => new Message(self, expiration); } |
如果我们的其中一条消息过期而没有人收到,则 OnFailureMessageReceived
回调被调用。 有关更多详细信息,请查看 消息行为和属性.
例如,让我们在没有任何订阅者(还)的情况下运行我们的 Publisher 应用程序,让消息过期,
在我们的 Publisher 应用程序中,我们使用了 PublishAsync()
。 另外, NCache 支持在一次调用中批量发送多条消息 PublishBulk()
方法。
发布者已准备就绪。 让我们编写订阅者应用程序。
2.订阅新电影消息
让我们创建另一个控制台应用程序来订阅 newReleases
主题。
Program.cs
文件看起来像这样,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
using Alachisoft.NCache.Client; using Alachisoft.NCache.Runtime.Caching; using Movies.Shared; using Movies.Shared.Entities; ICache cache = CacheManager.GetCache(Config.CacheName); // 1. Create an NCache cache instance string topicName = Config.Topics.NewReleases; ITopic newReleasesTopic = cache.MessagingService.GetTopic(topicName); // 2. Grab the same topic newReleasesTopic.OnTopicDeleted = OnTopicDeleted; // Attach a callback if the topic gets deleted if (newReleasesTopic == null) { Console.WriteLine($"Ooops...Topic [{topicName}] deleted."); } else { ITopicSubscription newReleasesSubscriber = newReleasesTopic.CreateSubscription(MessageReceived, DeliveryMode.Async); // 3. Attach a callback for new movies Console.WriteLine("Press any key to continue"); Console.ReadKey(); newReleasesSubscriber.UnSubscribe(); // 4. Unsubscribe... } void MessageReceived(object sender, MessageEventArgs args) { if (args.Message.Payload is Movie movie) { Console.WriteLine($"New Movie released: {movie}"); } } void OnTopicDeleted(object sender, TopicDeleteEventArgs args) { Console.WriteLine($"[ERROR] Ooops Topic deleted. Topic: [{args.TopicName}]"); } |
这一次,我们使用与 Publisher 应用程序中使用的相同主题名称引用了一个主题,并附加了一个回调以防主题删除。
接下来,我们使用创建一个非持久订阅 CreateSubscription()
用 MessageReceived
回调和 DeliveryMode.Async
. 通过这种交付方式, NCache 不保证消息的顺序。 要按顺序接收消息,我们应该使用 DeliveryMode.Sync
.