使用可扩展的 .NET Pub/Sub 消息传递 NCache

录制的网络研讨会
罗恩·侯赛因和爱德华·巴托

在本次网络研讨会中,您将了解如何将强大的 Pub/Sub 消息传递用于高流量 .NET 应用程序并克服性能瓶颈。 NCache 是一个可扩展的 .NET 分布式缓存,提供一组丰富的 Pub/Sub 消息传递功能。

我们的一位高级解决方案架构师将带您了解所有 Pub/Sub 功能及其用例,并向您展示如何将它们整合到您的 .NET 应用程序中。

此网络研讨会涵盖:

  • Pub/Sub 消息传递简介
  • 理解 NCache 发布/订阅功能
  • 在中设置 Pub/Sub NCache
  • 在您的 .NET 应用程序中使用 Pub/Sub 动手实践
  • 通过 Pub/Sub 消息传递的演示 NCache
  • NCache Pub/Sub 的高可用性和可扩展性

今天的主题将围绕 .NET 应用程序中的消息传递,我们将使用 NCache 作为示例产品。 主要重点将是 Pub/Sub 消息传递. 发布者订阅者模型。 为什么它很重要? 你到底在哪里使用它以及为什么 NCache 与您在 .NET 中看到的其他选项相比,它更适合 .NET core 应用程序? 所以,我们有很多东西要讲。 让我们快速开始吧,如果有 Eddie 建议的任何问题,请使用问题答案选项卡并根据需要提出尽可能多的问题。 所以,让我们快速开始吧。

什么是发布/订阅?

前几张幻灯片集中在介绍部分。 什么是发布/订阅? 我很确定每个人都知道,但为了完整起见,我将介绍 Pub/Sub 消息传递范式。

所以,它是一个发布者订阅者模型。 它主要包含三个组件。 我们有发布者,它发送消息,将这些消息广播到通信通道。 然后我们有订阅者,它可能是另一个应用程序,它正在接收这些消息,然后我们有一个通信、一个代理总线、一个服务、一个发布者和订阅者中间组件中的进程,它帮助将这些消息从发布者中继到订阅者。 在某些情况下,您可以存储消息,可以处理消息。 这可能是时间基准,这可能基于您的收件人。

发布/订阅如何运作?

因此,当您计划使用它时,您可以调整不同的选项。 但是,一般来说,有一些准则,一些规则对于典型的 Pub/Sub 消息传递平台是通用的。 它是松耦合的。 发布者和订阅者,他们可以独立工作,互不依赖。 他们也可以完全不知道彼此,并且系统应该可以正常工作。 整个想法是,发布者只是将消息发送到通信通道,然后通信通道中继这些消息,而订阅者接收这些消息。 发布者不需要知道关于订阅者应用程序的身份或信息,同样订阅者也不需要知道发布者信息是什么。 他们可以只处理消息,这是主要的兴趣点,整个想法是每个组件独立工作。 发布者对通信渠道或订阅者没有任何依赖关系,反之亦然。

大多数发布者订阅者模型,Pub/Sub 模型允许您分离关注点,您可以在其中拥有主题、频道或流。 这些用于分离消息。 因此,用于简单单一目的的类似性质的消息可以放在特定主题中,并且您可以在 Pub/Sub 消息传递范式中拥有多个主题、多个流、多个通道,并且您可以接收这些消息。 发布者连接到主题,然后将这些消息传输到某个主题,然后订阅者也可以连接到通信通道中的主题,然后接收这些消息,使用这种基于主题的方法,它工作得非常好。

这是一个图表,它突出显示了架构细节。

什么是发布订阅

我们有一个主题,这是通信渠道,这是我们拥有订阅者应用程序的发布者应用程序。 可能有多个发布者。 同样,可能有多个订阅者,然后在通信渠道中您也可以有多个主题。 例如,您可以有一个主题是处理订单处理信息。 可能还有另一个主题是处理客户信息等等。 因此,根据消息的目的,根据消息的类型,这是负载要求,您可以将消息分发到主题中,然后此订阅者连接到主题 A,其中订阅者 2 连接到主题 A 为以及 B,然后订阅者 3 仅连接到主题 B。因此,发布者发送这些消息,而那些连接到各自主题的订阅者,他们接收这些消息没有问题。

Pub/Sub 的常见用例

好吧,那么,继续下一步。 Pub/Sub 的常见用例有哪些? 它已经在市场上出现了相当长的一段时间,整个 Pub/Sub 消息传递范式也被用于许多高流量的应用程序中。 它有很大的吸引力。

因此,我们主要有后端系统,可以从处理异步数据处理的 Pub/Sub 消息传递中受益。 它可能是一些工作流程、一些后台任务、一些你需要的程序、一些你正在做的计算。 因此,后端系统主要使用这个消息传递平台。

聊天应用程序,这可能是游戏行业,它可能与数百万用户打交道,数百万条消息正在为用户传输。 例如,我们的一位客户在他们的游戏网站上有不同的锦标赛时,使用 Pub/Sub 来共享信息。 因此,他们在用户之间共享了大量与锦标赛相关的信息。

金融行业、电子商务、任何预订,任何面向公众的东西,它们需要做一些后端订单处理、一些预订处理,或者通常在应用程序的不同组件之间提供一些交互。

自动化和制造工厂。 但是,通常,它可能是最重要的用例,可能是技术用例,可能是 微服务 用例。 我们有不同的微服务,它们代表一个大型应用程序,但这些组件服务于单一独立的目的。 为这些微服务实现通信层是非常困难的。 您必须有单独的通信通道再次充当微服务,它也没有其他微服务的能力,这就是您希望拥有一个独立的微服务层的整个想法,微服务实例彼此独立。 一个实例做一个单一的目的,它不依赖于另一个,反之亦然。 因此,对于那些微服务应用程序,我们在应用程序中有微服务,实现通信通道是非常困难的,它很复杂,而且它有很多依赖,也会减慢速度。 因此,使用 Pub/Sub 消息传递平台非常有意义,其中通信通道由平台本身管理,然后微服务充当发布者和订阅者,然后它们在彼此之间共享信息。 因此,这应该有助于减轻负载以及您通常在 Pub/Sub 中看到的架构问题,微服务平台应用程序使用自己的通信通道。 他们应该使用 Pub/Sub 来帮助缓解这种情况。

问题:Pub/Sub 平台的可扩展性问题

有许多 Pub/Sub 平台,但通常存在瓶颈,这就是我们接下来要介绍的内容。 我们有典型的传统消息传递平台。 它们在事物的架构方面非常好。 您发送一条消息,该消息将被传输给所有订阅者,一切正常。 但通常,这在低负载下工作得很好。 当您拥有较少的用户时,负载以及较少数量的请求或较少数量的消息来回传输到您的应用程序。 就负载而言,当应用程序负载增长时,例如,有很多用户,发生大量处理,然后从发布者到订阅者有大量消息传递负载,通常,我们已经看到我们的传统消息传递平台,比如数据库,它们就成了瓶颈。 这是说明这一点的图表。

可扩展性瓶颈
可扩展性瓶颈

我们有发布者应用程序,订阅者。 这可能是 Web 应用程序、某些 .NET 或 .NET Core 后端。 一些工作流程。 它可以是任何类型的 .NET 或 .NET Core 应用程序甚至 Java 应用程序,连接到传统的海量平台,如数据库或消息队列。 首先,这些不如内存存储快,对吧。 所以,这是你需要解决的问题。 但主要问题是,当您的消息负载增加时,当您拥有大量用户时,事情开始变慢,因为您的消息负载可能会阻塞平台本身,因为它是单一来源。 它无法添加越来越多的服务器来帮助适应应用程序增加的负载。

那么,这是主要问题,最终用户会受到什么影响? 例如,一条消息没有得到及时中继,它花费了很多时间,这导致了应用程序端的延迟。 例如,最终用户正在等待处理完成,而该处理依赖于 Pub/Sub 模型平台,而平台速度很慢。 因此,这反过来也会影响最终用户的请求,如果存在毫秒或几秒延迟的因素,您将失去业务,那么这可能会影响最终用户,进而影响您的业务嗯,这很难管理,因为没有办法,你可以改变应用程序架构中的任何东西来帮助解决这个问题。 有一个问题,就是在数据库层的应用架构之外,对吧。 那么,如何应对这种特殊情况呢?

解决方案:内存中分布式缓存

解决方案很简单,你使用一个内存中的 Pub/Sub 消息传递平台,它本质上是分布式的,例如 NCache. 它超快,这就是它与传统消息传递平台不同的原因。 我承认它可能有更少的功能。 自从, NCache的主要重点是拥有对象缓存、数据缓存、会话缓存,用于处理数据和存储以及提高性能。 Pub/Sub 是其中一项功能。 有消息队列之类的产品,例如Java消息服务,MSM队列,这些产品更精细。 但是是什么让 NCache 除了这些产品之外 NCache 是内存中的。 因此,它的启动速度非常快。

可扩展的发布订阅消息

因此,假设您有 XNUMX 个用户并且您的消息负载数量较少,那么与传统消息平台相比,这些用户将获得超快的体验。 所以,它在内存中。 所以,这是一个很大的优势,然后我们有多个服务器托管该消息传递平台。 你可以看到一个团队服务器连接在一起形成一个逻辑能力,它制定了混乱平台,并且在它之上实现了一个 Pub/Sub 机制,对吧。 因此,您的发布者应用程序可以连接到此缓存,并且在缓存中您可以拥有单独的主题。 我将向您展示这是如何工作的,下线然后是订阅者,顺便说一下,他们也可以是发布者,他们也连接到由 NCache 如以分发缓存的形式,然后该通信通道将所有与通信相关的信息提供给这些发布者和订阅者。 这可以中继消息,并且可以进行所有这些不同类型的探索、某种不同类型的持久性和不同的特性,我将在下面重点介绍这些特性。

这个平台的好处是您可以根据需要添加任意数量的服务器,并且您不必担心您的平台会成为可扩展性瓶颈,如这里所示。 因此,它不再是可扩展性瓶颈。 您可以即时添加更多服务器,从两台服务器增加到三台服务器,从三台服务器增加到四台服务器,您可以处理的请求数量几乎翻倍,每秒可以处理多少条消息,而无需在性能上妥协,它将为您的消息传递提供线性可扩展性改进,这将在应用程序方面有很大帮助。 最终用户将受益匪浅,他们将看到更快的响应时间、更低的延迟和更高的应用程序和用户吞吐量。

到目前为止有任何问题吗? 这涵盖了为什么您应该考虑使用 NCache 与传统相比,传统 Pub/Sub 消息传递平台存在哪些问题。 到目前为止有任何问题吗? 接下来的几张幻灯片侧重于建筑细节 NCache Pub/Sub 消息传递,然后我们将讨论作为其中一部分提供的不同功能。 嗨,罗恩,我们有一个问题。 有哪些可用的监控选项? 好的,我认为这是关于 NCache,是的,这是另一个加分项,我在最后安排了一段专门为此。 但我认为,如果我等到那部分,给你产品的动手演示,作为其中的一部分,我也会向你展示监控功能,这将是非常好的。 所以,一旦我在那个部分,我会重新审视这个问题。 我希望没关系。

NCache 发布/订阅消息

好的, NCache 发布/订阅消息。 它与您在这里看到的非常相似。 这是一个传统的 Pub/Sub 消息传递平台。 这是架构分解。 我们有由提供的消息传递渠道 NCache 我们在这里有服务器团队,在该消息传递通道中,我们有主题 1、主题 2,然后我们有可以是任何 .NET 的发布者, .NET Core 应用程序,然后我们的订阅者又是任何 .NET 或 .NET Core,甚至是与之连接的 Java 应用程序。

pubsub 消息传递 with-ncache

可以为此制定专用缓存,然后消息是使用典型的 Pub/Sub 机制从通信通道中继的东西,其中消息从发布者发送到订阅者。 这是一个简单的代码片段。

Order payload = new Order();
payload.OrderID = 10248;
payload.OrderDate = new DateTime(2015, 07, 04);
payload.ShipName = "Vins et alcools Chevalier";
...
                
Cache cache = NCache.InitializeCache("PubSubCache");

ITopic orderTopic = cache.MessagingService.CreateTopic("orderTopic");

Message orderMessage = new Message(payload);

// Publish message to all registered subscribers
// Register delivery failure notification

orderTopic.Publish(orderMessage, DeliveryOption.All, true);

只是为了让您了解构建有效负载,这是感兴趣的对象,初始化缓存,这意味着连接到它,命名它,您可以 创建缓存集群 使用不同的服务器,然后您创建一个主题或连接到现有主题。 主题是一个通道,是类似消息的单独媒介,然后您构建一条消息,然后发布它,就是这样。 这就是基本用例的简单程度。

在订阅者方面,您需要再次连接到一个主题或创建一个新主题,然后使用创建订阅方法成为订阅者,然后每次收到消息时都会调用一个回调,这是一个异步调用-背部。

Cache cache = NCache.InitializeCache("PubSubCache");

ITopic productTopic = cache.MessagingService.GetTopic(topicName);

ITopicSubscription prodSubscriber = prodTopic.CreateSubscription(MessageReceived);

private void MessageReceived(object sender, MessageEventArgs args)
{
 // Perform operations
}

动手演示

我应该很快给你产品的演示,然后我想,一旦我们完成了演示,它会更容易理解。 我会继续,作为其中的一部分,我也会回答监控选项。 好的,所以,我会尽快向您展示 NCache 管理器工具,随它一起安装。

创建缓存

我将创建一个新的缓存。

创建缓存

我们将其命名为 PubSubCache,我将在最后附加 Windows Win。 所有缓存都需要命名。 您可以命名您的 Pub/Sub 缓存,这对您的应用程序有意义。 您可以创建缓存, NCache 允许您在 Windows 环境、Windows 服务器以及 Linux 服务器上创建缓存,我们也有 Windows 单声道版本,也可用。 因此,对于 Linux,您需要使用 .NET Core 在 Windows 上,您可以使用 .NET 以及 .NET Core 的发布 NCache. 所以,我将继续使用 Windows 服务器。 选择分区副本缓存,因为我希望有多个服务器为请求处理能力做出贡献。

分区副本

因此,Partitioned 和 Partitioned Replica 是非常好的选择。 分区缓存允许您在多个服务器上对数据进行分区,而分区副本类似于分区,但它也有备份。 所以,这完全取决于你。 如果你想让消息本质上是重要的,如果来自一台服务器出现故障,你不想丢失仍在通信通道中的消息,那么你可以使用 Partitioned Replica 否则,Partitioned Cache 更表现古怪.

还有另外两种拓扑,但它们适用于较小的配置和我们的 NCache 架构网络研讨会 非常有效地涵盖了这些拓扑的所有细节。 因此,如果您需要了解有关这些拓扑的更多信息,我建议您访问这些网络研讨会并查看该网络研讨会。 我会保持一切简单。 选择异步作为复制选项。

异步复制

那是在活动分区和另一台服务器上的备份之间,在这里我指定了将托管我的缓存集群的服务器团队。 我可以有两个开始,我可以在任何时间点添加更多服务器,如前所述。 顺便说一下,这是一个 TCP 端口。 之间的每一次交流 NCache 通过 TCP/IP 驱动,大小是每台服务器的大小。 因此,基于消息负载,基于数据负载,我们建议您为应用程序配置相关大小。 我会保持一切简单。 在自动启动中启动此缓存,以便每次服务器重新启动时缓存都会自动启动,仅此而已。

高级选项

这就是配置这个缓存是多么简单,接下来,我将向您展示如何运行该应用程序,但在我们继续之前我还需要一个步骤。 它也开始了,所以它需要一点点,好吧,你去。

监控统计

所以,有一个关于监控选项的问题,我想我现在应该解决这个问题。 我要将我的盒子添加为客户端,我将从那里运行发布者和订阅者应用程序。 这些也可以从可以访问此缓存的任何服务器上运行。 我已将我的盒子添加为客户端,所以这本质上是客户端盒子。 我可以添加另一个盒子,我也可以从中运行应用程序。 您注意到管理器工具中提供了一些统计窗口选项,但是对于我最初会执行的 Pub/Sub,我会启动 NCache 监控工具,然后我还会向您展示一些专门用于 Pub/Sub 的计数器。

监控集群

例如,这是我的 Pub/Sub dash。 我会继续,这是 NCache 安装的监控工具 NCache. 我想我需要,1乘3很好。 好吧,首先,如果您注意到我们有主题统计信息。 然后我们每秒发布一些消息,比如说消息计数,然后我们每秒有消息过期。 我认为,这是一个很好的。 顺便说一句,您也可以添加更多。 这里有很多,然后还有 PerfMon 计数器也可用。 让我也启用它们,然后我将运行一个应用程序来向您展示它是如何工作的。 我只需要向您展示计数器,我们的监控工具也在使用 PerfMon 计数器。 因此,您在此处看到的内容也可以在监视器中使用,反之亦然。 好的,所以,我们都准备好了。 所以,我们这里也有一些柜台。 例如,消息计数,每秒传递的过期时间。 因此,您可以看到数字视图、数字视图,然后我们在监视器中也有图形视图。

运行发布者

现在,我将运行一个随它一起安装的应用程序,但我对其进行了一些调整,以便我可以发布尽可能多的消息。 因此,这是发布者的应用程序配置,然后是消费者的应用程序配置,即订阅。 好的,所以,我会运行这个应用程序。 首先,它将启动发布者应用程序并向您展示它是如何工作的。 让我回顾一下,也许我没有保存。 好吧,给我一点时间,我需要在它周围放一个循环。 实际上,有一个循环。 好吧,让我调试一下。 所以,我没有信息可以连接到这个缓存。 所以,让我们弄清楚这一点。 我已经添加了我的盒子,所以请多多包涵。 让我尽快解决这个问题,这需要几分钟。 虽然它添加了,但也许有一个错字。 你去吧。 我指定了错误的缓存名称,我的错。 让我在这里回顾一下这个名字。 这是pubsubcachewin。

发布子缓存

我使用的是昨天的缓存名称,它是一个演示缓存。 它发生了。 好吧,我很高兴我能够快速修复它,否则将是一场彻底的灾难。 所以,我认为我们很好。 因此,这将启动程序,该程序将是 publisher.exe,它将发布十条消息。 我一次又一次地发布相同的消息。 此时发布者已启动,订阅者尚未启动。 那么,让我向您展示监控细节。

监控细节

有一些请求正在进来,当然是围绕这个。 但如果你注意到了,这里有我的话题,那就是名字 10。

我的话题

项目在那里,然后你应该看到这些消息在过期时间过去后过期,我想,我在上面放了五分钟或更长时间。 我现在将继续发布更多内容。 你去那里,你应该有 10 个项目,额外添加。 因此,总共有 20 个项目,您可以看到消息数在增加。

主题2

所以,这就是图表,您也可以在 Windows 性能监视器中看到相同的选项,我们现在的大小和消息数从 20 到 12。 主要是因为有些已经过期。 我想我已经在使用更少的到期时间。 让我看看,我认为消息数显示不正确,让我看看,我认为这是,根据我的显示器,这应该是 20。

运行订阅者

让我们在订阅者的帮助下回顾一下。 让我们看看它收到了多少消息,顺便说一下,我将在完成演示后向您展示代码。

因此,它应该获取现有消息,我认为,它以随机方式获取所有消息。 这是消息的异步调用。 它不保证按该顺序传递消息。 因此,您可以在稍后阶段收到第一条消息,依此类推。

CMD

如果我回到这里,我们的消息数会降为零。 我认为这个计数器,让我们再回顾一下。 我不确定这是否代表给定主题的消息或已发布的整体消息。 好的,所以,它没有显示正确的值,而这应该显示。 我想,那些消息已经收到了。 所以,我需要关闭订阅者才能实现这一点。 我要运行另一个订户。 因此,如果您看到消息被传递到多个,我想我们将结束它并向您展示它的代码示例。 我将再输入几条消息,继续这样做,然后在订阅者端,您可以看到这条消息收到了 10 条消息,然后这条消息又收到了几条消息。 如果有两个传递选项,您可以选择向所有这些订阅者或任何订阅者广播消息。 是的,因此,根据接收到的消息,消息也会从缓存中过期。 所以,这是一个快速演示。 现在,我将向您展示发布消息所涉及的详细信息? 主题方面的特点是什么? 发布方有哪些功能,订阅方有哪些功能? 如果此时有任何问题,请告诉我。 我现在很乐意为您回答这些问题,否则,我将继续向您展示事物的详细架构方面。

中的主要 Pub/Sub 接口 NCache

这些是内部的主要接口 NCache.

话题

我们有话题。 再次,它是沟通的渠道。 它包含订阅者列表,并将发布的消息转发给连接的订阅者。

想说的话

然后我们有消息,它是主要的有效载荷。 兴趣点,订阅者感兴趣的对象。 它是可序列化的有效载荷。 它可以是任何对象,可以是您想要传递的任何消息,准确地说是任何数据。

出版商

Publisher 是一个发布消息的应用程序,它也可以接收可选的传递状态通知。 传递状态是您可以选择打开的东西,但正如之前决定或之前讨论的那样,发布者不需要等待消息被传递。 它可以继续前进。 因此,可选交付状态是您可以根据需要打开或关闭的状态。

订户

订阅者,他们为特定主题注册自己并获取所有消息。

所以,让我们一一回顾这些,图表说明了我们刚刚讨论的所有内容。

发布子架构

发布者创建消息、注册事件、发送消息、主题和存储消息、存储订阅者和发送消息。 订阅者订阅然后接收消息通知。

NCache Pub/Sub 主题界面

那么,让我们回顾一下主题界面。 所以,首先,你可以创建一个 NCache Pub/Sub 主题,然后您也可以连接到现有主题,或者您也可以删除主题。 那么,让我们看看这里的这个接口涉及到什么? 所以,如果我在这里访问这个发布者,我首先需要访问这个程序,如果你在这个程序中注意到,我会得到缓存名称。 我得到了主题名称,它可以是你可以选择的任何主题,我得到一个消息计数器,它是一条随机消息,然后我有 publisher.start 方法,我已经实现了它,在里面我我正在初始化缓存,这意味着我将把它连接到缓存,然后我得到一个创建主题接口,如果我回到这里,我们就有了 IMessagingService。 所以,如果我在这里快速向你展示这个。

IMessagingService Interface for managing Topics in NCache

namespace Alachisoft.NCache.Web.Caching
{
    public interface IMessagingService
    {
        ITopic CreateTopic(string topicName);
        void DeleteTopic(string topicName);
        ITopic GetTopic(string topicName);
    }
}

我们有 IMessagingService 接口,它有一个 CreateTopic 方法,一个 DeleteTopic 方法来删​​除一个主题,然后是 GetTopic。 让我们一个一个地回顾这些。 因此,我们创建了一个主题的消息服务,或者我们可以连接到现有主题。 您真正要做的是,您还可以启用消息传递、传递失败通知,如果这是您想做的事情。 例如,您可以在此处设置回调,然后可以调用该回调,但这是可选的。 如果您不想启用它,则不一定需要它。 所以,就像我之前说的,消息传递通知是可选的,所以你不需要启用它们。 这完全取决于您的用例,如果您想启用这些,那么您可以继续执行。

然后我们有一个发布方法,它是这个应用程序的一个自定义方法,一个包装器,然后在里面我们真正做的是我们正在构建一个消息,然后我们正在使用 topic.publish 消息。 所以,这会返回一条消息,一个主题。 如果我在这里快速向您展示的话,可以说是该主题的句柄,然后 topic.publish 是他们的实际方法。 这是一个 ITopic 接口,它有一个名称,它可以有一个消息计数,它可以有一个过期时间,我稍后会告诉你,然后它还有一些关于主题删除、消息传递失败和的回调然后它允许您订阅该主题,然后也向其发布消息,这是我想展示的第一种方法,您可以使用 topic.publish ,然后如果您注意到有一个传递选项好。 您可以将必须​​交付给所有人或任何人交付给所有人。 所以,这是消息级别,所以,这是传递或发送到该主题的消息的传递选项,然后这里有一个可选的最后一个布尔值,让我们检查一下,它的建议是什么? 对于这个,如果你想有任何失败通知。 因此,在这种情况下,我将其设置为 true,但您可以将其关闭,如果关闭,它将相应地调用发布者端的回调。

所以,我们在这里看到了两种方法。 您可以创建主题,类似地,您也可以获取主题。 因此,这是您可以使用的另一种方法。 但是,由于我是第一次运行它,所以我使用创建主题,向主题发布消息,然后如果需要,我还可以删除主题。 因此,这些是一些场景,正如您所看到的,这些幻灯片显示了相同的示例。

Create Topic: Code snippet to create a new topic in distributed cache

string cacheName = "pubSubCache";
string topicName = "orderTopic";

Cache cache = NCache.InitializeCache(cacheName);

cache.MessagingService.CreateTopic(topicName);

因此,如果您删除主题,将调用已删除主题的回调,而删除主题的回调是您可以通知最终用户或您需要发布的任何信息的回调。

Delete Topic: Deletes an existing Topic distributed

string cacheName = "pubSubCache";
string topicName = "orderTopic";

Cache cache = NCache.InitializeCache(cacheName);

cache.MessagingService.DeleteTopic(topicName);

OnTopicDeleted callback: If registered, it will be triggered upon the Delete Topic call.

using (Subscriber subscriber = new Subscriber())
{
// Subscribes on it, using the provided cache-Id and the topic-name.

subscriber.OnTopicDeleted = TopicDeletedCallback;
}

static void TopicDeletedCallback(object sender, TopicDeleteEventArgs args)
{
 Console.WriteLine("Topic '{0}' deleted.", args.TopicName);
}

NCache 发布/订阅消息接口

让我们看一下消息传递接口,在这里我们讨论有一个消息失效选项,消息接收通知,失败或接收,失败通知,然后支持加密和压缩作为其中的一部分。 所以,让我们看一下 IMessage 接口。 所以,我们在这里有一条消息。

namespace Alachisoft.NCache.Runtime.Caching
{
    public class Message : IMessage
    {
        public Message(object payload, TimeSpan? timeSpan = null);

        public static TimeSpan NoExpiration { get; }
        public string MessageId { get; }
        public TimeSpan? ExpirationTime { get; set; }
        public object Payload { get; }
        public DateTime CreationTime { get; }
    }
}

让我们看看,其中有什么可用的。 所以,首先,我们有一个 TimeSpan,你可以选择 NoExpiration 或者你可以在这里将 TimeSpan ExpirationTime 指定为消息选项,如果我回到这里,如果你注意到我们正在设置一些基于时间的过期时间同样,如果您注意到这是在此处传递的,并且我们有一个时间跨度。 因此,这是由程序本身调用的。 所以,我们给 “时间跨度.FromMinutes(5)”. 因此,这实质上意味着任何未传递的主题,如果没有订阅者或订阅者断开连接,则该消息将在指定的时间段内保留在通信通道中。 这是一种持久性,但由于它违反了 Pub/Sub 平台来无限期地持久化消息,所以时基是正确的选择,您可以将消息保留一段时间,然后清除存储本身。 这些消息也不会从通信通道单独过期,并且此过期可以在主题级别本身。 主题本身可以有期限,或者消息也可以附有各自的期限。 所以,这就是你要做的事情,顺便说一句,如果发布者和订阅者是连接的,那么发布者会发布消息并且这些消息会立即被转发。 它不会等待到期发生。 如果消息已送达,它不会保留它。 因此,一旦消息被传递,它就会立即从缓存中取出。 所以,这就是你设置到期的方式。

然后,您还会收到一些有关消息的通知。 这些是我之前向您展示过的东西。 例如,您可以看到交货, “发布者.消息传递失败” 通知,然后您还可以生成一条消息,也可以从消息中生成成功状态。 因此,这是您可以作为其中一部分打开的另一个选项,然后也可以完成消息加密和压缩,这是一个无代码更改选项。 您所要做的就是来到缓存本身,进入选项,因为一切都是由 NCache 本身。 因此,您可以启用压缩,并且可以右键单击并选择热应用配置。

热供应配置

压缩适用于尺寸较大的对象。 消息的有效负载更大,因此您应该自动打开压缩,加密是这里的另一个选项。 它需要您启用加密,为此您需要先停止通信通道,停止缓存,然后启用加密。 您可以选择许多提供者,所有消息都将被加密,然后才发送回发布者和订阅者。 我希望那是直截了当的。 因此,消息示例就在这里。

//Payload containing OrderId to be sent in message

Order payload = new Order();
payload.OrderID = 10248;
payload.OrderDate = new DateTime(2015, 07, 04);
payload.ShipName = "Vins et alcools Chevalier";
payload.ShipAddress = "59 rue de l'Abbaye";
payload.ShipCity = "Reims";
payload.ShipCountry = "France";

//Create message with payload and expiration time set to 150 seconds

Message message = new Message(payload);
message.ExpirationTime = new TimeSpan(0, 0, 150);

您可以使用订单,构造消息,设置一些过期时间,然后使用 “主题.发布” 并将该消息提供给正确的主题。

消息收到通知

因此,消息收到通知。 在订阅者方面,我也想向您展示这一点。

Callback invoked when a message is published on the topic

string cacheName = "pubSubCache";
string topicName = "orderTopic";

Cache cache = NCache.InitializeCache(cacheName);

//Get Order topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);

//Register subscribers for Order topic

ITopicSubscription ordSubscriber = orderTopic.CreateSubscription(messageReceivedCallback);

private void messageReceivedCallback(object sender, MessageEventArgs args)
{
              //perform operations
if (args.Message.Payload is Order ord)
   {
              //perform operations
   }

从订阅者的角度来看,您所要做的就是连接到缓存以开始。 使用连接到主题 “_cache.MessagingService.GetTopic”. 这是我之前向您展示的方法,或者您可以创建一个主题,如果它确实存在,然后您创建允许您在回调中接收消息的订阅,这里是消息接收回调。 我认为它应该就在程序中。 消息接收回拨。 因此,每次收到消息时都会调用它。 所以,这里的例子就完成了。

一步一步:将消息发布到主题

所以,一步一步地再来一次。 您初始化 Pub/Sub 缓存集群、创建一个专用主​​题或连接到现有主题。 如果需要,注册消息传递事件,这是一个可选的。 创建消息并发布到单独的主题。 启用到期,根据需要启用交付选项,这是一个有助于证明这一点的代码。

public void PublishMessages()
{
string cacheName = "pubSubCache";
string topicName = "orderTopic";
Cache cache = NCache.InitializeCache(cacheName);
ITopic orderTopic = cache.MessagingService.CreateTopic(topicName);

//Register message failure notification
orderTopic.MessageDeliveryFailure += FailureMessageReceived;

//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;

//Payload to be sent in message
Order payload = new Order();
payload.OrderID = 10248;
payload.OrderDate = new DateTime(2015, 07, 04);
payload.ShipName = "Vins et alcools Chevalier";
payload.ShipAddress = "59 rue de l'Abbaye";
payload.ShipCity = "Reims";
payload.ShipCountry = "France";

//Create message with Order and expiration time set to 150 seconds
Message orderMessage = new Message(orderPayload);
orderMessage.ExpirationTime = new TimeSpan(0, 0, 150);

//Publish message to all registered subscribers
//and register message delivery failure notification
orderTopic.Publish(orderMessage, DeliveryOption.All, true);

然后在订阅者端,您连接到缓存,获取现有主题或创建新主题。 为主题创建订阅以接收消息。 注册要接收的事件,这些事件将是异步事件,然后取消订阅主题并删除它,如果需要,可以将删除事件作为其中的一部分。

NCache 监控 Pub/Sub

接下来的几张幻灯片将重点关注事物的监控方面。 让我们全面回顾一下这些。 我们已经介绍了 PerfMon 计数器支持中的基本细节 NCache,我们在仪表板中有不同的选项。 让我们创建另一个仪表板。

创建仪表板

让我们将所有这些都添加到其中。 所以,我们也有一些主题统计数据。 例如,我们有主题统计信息,我已经向您展示过。 主题数,您拥有的主题数。 消息计数。 消息存储大小,即主题的总大小和字节数,这对于能力建设非常重要。 如果您想了解正在传递或发布的消息数量,您可以拖放每秒发布的消息和每秒传递的消息,然后您可以获得一些消息过期状态以及其中的一部分。 所以,这些是一些计数器。 如果我再次运行示例应用程序,您会看到所有这些活动。 好的,我将通过多次按 Enter 来继续增加一些负载,如果我回到这里,您可以看到所有这些活动。

统计

有 220 条消息。 此时主题计数为 1。 消息计数约为 220。消息存储大小也显示出来。 消息被发布。 但是没有消息被传递,也没有消息过期,如果我只是运行一个订阅者,它会很快,你会看到这些消息也会被接收。 你去那里,如果我回到这里,现在你也应该在这里看到活动。 消息也被接收。

收到的消息

因此,这完成了您可以记录这些计数器以查看容量、查看那里的整体主题、那里的消息、存储的大小以及这些消息的到期时间的过程,并且我可以根据需要创建尽可能多的主题根据我的要求。 Windows PerfMon 上也提供相同的计数器。 因此,您在此处拥有相同的设置,此外您还拥有一些 PowerShell Cmdlet。 例如,如果您在此处访问我们的文档,让我快速打开它。 所以,我们有三个不同的选项,PerfMon Counters, NCache 监视器和 PowerShell Cmdlet。 在某些情况下,您只想依赖工具。 因此,为此,您可以使用我们的文档和我们的 PowerShell 指南,让我们看看我是否可以快速搜索它。 它给了我一个通用链接,所以,让我在这里看看一些监控选项。 因此,您可以看到“Get-Topics”,这是一个 PowerShell Cmdlet,它允许您根据缓存名称获取主题名称。 例如,如果我在这里打开 PowerShell。 我可以使用这个命令,让我在这边打开它,很清楚。 好的,例如,我可以从这里的任何盒子运行它,然后 pubSubCachewin 是缓存的名称。 它不会给我任何东西。 我需要在服务器本身上运行它,因为我认为它不是,pubSubCachewin,你去吧。 我的主题、订阅者、发布者和消息计数。

小命令

从我的盒子开始工作的原因是因为它需要一个 IP 地址来知道缓存的存在位置。 因此,在 PowerShell 方面有一些工具,可以让您根据需要查看一些统计信息。 这样,演示部分就完成了。 请让我知道是否有任何问题。 我将花一些时间来了解有关架构的详细信息或作为此沟通渠道的一部分提供的内容, NCache 作为消息传递平台以及为什么 NCache 更合适吗?

分布式缓存架构

首先, NCache 是开源的。 因此,您可以开始使用 Pub/Sub,即使在开源版本的 NCache. 它是线性可扩展的,这是第一点,在此之前它是在内存中的,这是只寻求性能并且他们不希望有高负载要求的应用程序的主要兴趣点。 所以,如果有性能问题 NCache 与传统的 Pub/Sub 消息传递平台相比,再次更适合。

它是线性可扩展的,这实质上意味着您可以根据需要添加任意数量的服务器。 两台服务器就足够了。 但是如果他们达到容量,因为容量可能会受到 NCache 服务器也是如此。 您在数据库中看到的最初问题。 我们不是这么说 NCache 不会看到这个问题。 使用一台服务器,您可以达到容量,就此而言,您可以拥有更多服务器,但这些更多服务器也可以增加容量。 例如,这两个服务器正在被最大化,这是正确的时间,因为您提前知道负载要求或应用程序的负载很高。 因此,您可以通过添加更多服务器并进行更好的容量规划来预先避免这种情况。 因此,可以即时添加更多服务器,这就是它的优点在于它的线性可扩展性。

然后第三个重要特征是,如果您需要,您还可以拥有复制支持,这将确保可靠性。 它已经是高度可用的,它的服务器宕机不会对最终客户端产生影响,或者发布者或订阅者的平台将正常运行而不会出现任何问题。 只要缓存集群中有一个幸存的节点,Pub/Sub 平台就会保持正常运行,为什么它能够做到这一点? 它是高度可用的,因为它本质上是动态的。

动态缓存

这是一个 100% 对等架构的缓存集群。 这使您可以根据需要添加任意数量的服务器,并且这些服务器本质上是独立的。 它们通过分配负载来提高请求处理能力,但它们以 100% 的独立容量运行。 任何关闭的服务器或添加的新服务器都不会对缓存产生任何影响。 不需要为此停止平台。 它可以继续工作,然后客户端不需要停止或重新启动。 这就是传统来源的问题,如果服务器出现故障,您可能必须重新启动客户端才能开始使用幸存的节点。 因此,这本质上是动态的,客户端会自动调整,集群本身具有 100% 的正常运行时间和自我修复机制。 如果添加了一个服务器,集群会通过将负载分配到新添加的服务器并同时通知客户端来适应这种情况。

动态缓存2

类似地,在维护案例或服务器宕机案例中,服务器宕机或在这种情况下发生故障,缓存集群保持启动并运行。 您需要一个幸存的节点才能使其正常运行,并在运行时通知客户端。 连接故障转移支持内置于客户端协议中。 因此,客户端检测到故障,然后进行故障转移并自动开始使用生存节点,这确保了缓存端以及连接到它的最终客户端的 100% 正常运行时间。 因此,它本质上是动态的。 它是 100% 对等架构的。 它内置了连接故障转移支持,并且集群本身可以自我修复。 这是协议中内置的一种机制。 这确保了它的 100% 正常运行时间,一个高度可用的缓存方案。

缓存拓扑:分区缓存

通信通道的拓扑。

分区缓存

Pub/Sub 平台可以对数据进行分区。 您可以在服务器 1 和服务器 2 上对数据进行分区。这些数据元素代表您的消息。 因此,我们有一些消息由服务器 1 传递,一些消息由服务器 2 传递。这是完全随机的,基于我们的分发算法,并且您的发布者和订阅者连接到所有服务器。 因此,有些连接到服务器 1,有些连接到服务器 2,但如果您注意到每个服务器都有,实际上它们连接到所有服务器,但有些消息由服务器 1 传递,有些消息由服务器 2 传递,依此类推向前。 因此,消息负载将被分散,这是执行偏心的,它充分利用所有这些服务器的内存容量,用于提供消息,用于存储消息,用于它们的到期时间,它还将它们的计算能力加入到逻辑容量中也是。 因此,所有这些服务都有助于提高请求处理能力,这就是如果您添加更多服务器,您会从系统中获得更多容量的原因,这就是为什么我们建议您应该使用分区操作副本拓扑,因为您可以线性扩展,如果您需要。

缓存拓扑:分区副本缓存

这是分区副本缓存,它也具有备份支持。

分区副本

我们有服务器 1 活动分区,(服务器)2 上的备份。因此,所有服务器都制定了另一个服务器的活动分区和备份分区,这是另一个服务器的副本。 服务器 1 是服务器 2 上的活动备份,服务器 2 是 3 上的活动备份,服务器 3 是 1 上的活动备份,如果服务器出现故障,集群仍然能够管理这个。 因为,如果服务器 1 出现故障,服务器 1 的备份在 2 上,因此,此备份被激活并合并到服务器 2 和 3 中,集群自我修复并制定 2 节点主动备份,服务器 1 备份的主动被动机制服务器 2 备份在服务器 1 上。而且,它对您的最终用户来说是完全无缝的。 您不必担心最终用户会受到影响。 因此,100% 的正常运行时间 NCache 确保这一切。

好的,这使我们接近演示文稿的结尾。 请让我知道,我们可以在最后集中讨论你们可能有的一些问题。 所以,我现在会把它交给埃迪。 埃迪,你可以从这里拿起它。 再次,非常感谢您抽出宝贵的时间。 希望这是有益的,我们期待在下一次网络研讨会上再次见到您。 在此期间,如果您需要查看您刚刚在线查看的现有网络研讨会 alachisoft.com/资源 再次提醒, NCache 是当今市场上唯一可实现最高性能的本机应用程序。 期待很快再次见到您。 谢谢你。

接下来做什么?

 

联系我们

联系电话
©版权所有 Alachisoft 2002 - 版权所有。 NCache 是 Diyatech Corp. 的注册商标。