title: Pub/Sub Messaging: Subscribe to a Topic
description: Learn how to Subscribe for Topic Messages when using the cache as a Pub/Sub messaging store.
Pub/Sub Messaging: Subscribe to a Topic
In Pub/Sub messaging model subscriber (or client application) can register to a particular topic through a subscription. So, NCache 提供多种 Pub/Sub 主题订阅的类型.
Pub/Sub Messaging: Prerequisites Topic Subscription
Before subscribing a topic in Pub/Sub Messaging model ensure that the following prerequisites are fulfilled:
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参考给定页面 客户端 API 先决条件.
- 有关 API 详细信息,请参阅: 缓存, 缓存项, 主题, IDurableTopic订阅, 创建订阅, 创建持久订阅,主题名称, 订阅名称, 即时通讯服务, 获取主题, 订阅政策, 退订, 消息事件参数, 交付模式, 主题搜索选项, 主题订阅, 消息传递失败, 有效载荷.
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参考给定页面 客户端 API 先决条件.
- 有关 API 详细信息,请参阅: 缓存, 缓存项, 话题, 主题订阅, 获取消息服务, 获取主题, 创建持久订阅, 订阅政策, 时间跨度, 取消订阅, 消息接收侦听器, onMessageReceived, 消息事件参数, 获取消息, 获取有效载荷, 创建订阅, 交付模式, 主题搜索选项, 添加消息传递失败监听器.
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参考给定页面 客户端 API 先决条件.
- 有关 API 详细信息,请参阅: 缓存, 缓存项, 主题订阅, 获取消息服务, 获取主题, 创建持久订阅, 订阅政策, 时间跨度, 取消订阅, 消息事件参数, 消息接收侦听器, 获取消息, 获取有效载荷, 创建订阅, 交付模式, 主题搜索选项, 主题监听器, 添加消息传递失败监听器.
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参考给定页面 客户端 API 先决条件.
- 有关 API 详细信息,请参阅: 缓存, 缓存项, 获取主题, 获取消息服务, 创建持久订阅, 获取有效载荷, 时间跨度, 消息事件参数, 获取消息, 创建订阅, 取消订阅, add_message_delivery_failure_listener.
创建订阅的方法
Here we describe how you can create non-durable subscription, durable subscription, multiple subscriptions, and asynchronous subscription in Pub/Sub Messaging Model.
非持久订阅
备注
此功能也可用于 NCache Professional.
ITopic
/Topic
界面便于创建 非持久订阅 并发布针对该主题的消息。如果主题存在,则创建订阅方法会针对该主题注册非持久订阅。它允许订阅者注册 MessageReceivedCallback
针对主题,以便它可以接收已发布的消息。
以下代码示例执行以下操作:
- 获取现有的感兴趣的主题,即
NewBeverages
.
- 为每个主题创建订阅。
- 为订阅者注册事件,以便在发布到主题后接收消息。
// Precondition: Cache is already connected
// NewBeverages is the name of the topic created, beforehand
string topicName = "NewBeverages";
// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName);
// If topic exists, Create subscription
if (topic != null)
{
// Create and register subscribers for order topic
// Message received callback is specified
ITopicSubscription subscription = topic.CreateSubscription(MessageReceived);
}
// Precondition: Cache is already connected
// NewBeverages is the name of the topic created beforehand
String topicName = "NewBeverages";
// Get the topic
Topic ordertopic = cache.getMessagingService().getTopic(topicName);
// If topic exists, Create subscription
if (ordertopic != null) {
// Create and register subscribers for order topic
ordertopic.createSubscription(new PubSubMessageReceivedListener());
System.out.println("Subscriber registered for topic: " + ordertopic.getName());
} else {
System.out.println("Topic does not exist.");
}
// This is an async method
// Topic "NewBeverages" already exists in the cache
let topicName = "NewBeverages";
// Get the topic
let ordertopic = await ncache.getMessagingService().getTopic(topicName);
if (ordertopic != null) {
// Create and register subscribers for topic
// Message received callback is specified
let ordersubscriber = await ordertopic.createSubscription(ncache.onMessageReceived());
}
else {
// No topic exists
}
# Topic "NewBeverages" already exists in the cache
topic_name = "NewBeverages"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create and register subscribers for the topic
# Message received callback is specified
order_subscriber = order_topic.create_subscription(on_message_received)
else:
# No topic exists
print("Topic not found")
备注
为确保操作是故障安全的,建议处理应用程序中的任何潜在异常,如中所述 处理故障.
持久订阅
备注
此功能也可用于 NCache Professional.
IDurableTopicSubscription
界面便于创建 持久订阅 并针对该主题发布消息(如果存在)。它允许订阅者注册 MessageReceivedCallback
针对主题,以便其可以接收已发布的消息。
共享持久订阅
以下代码演示了共享订阅策略。
// DiscountedBeverages is the name of the topic created, beforehand
string topicName = "DiscountedBeverages";
string subscriptionName = "DiscountedBeveragesSubscription";
// Get the topic
ITopic ordertopic = cache.MessagingService.GetTopic(topicName);
if (ordertopic != null)
{
// Create and register subscribers for topic
// Message received callback is specified
// The subscription policy is shared which means that the subscription can have more than one subscribers
IDurableTopicSubscription ordersubscription = ordertopic.CreateDurableSubscription (subscriptionName, SubscriptionPolicy.Shared, MessageReceived, TimeSpan.FromMinutes(20));
}
// DiscountedBeverages is the name of the topic created beforehand
String topicName = "DiscountedBeverages";
Topic ordertopic = cache.getMessagingService().getTopic(topicName);
// Get the topic
if (ordertopic != null) {
// Create and register subscribers for topic
// The subscription policy is shared which means that the subscription can have more than one subscribers
System.out.println("Topic retrieved: " + ordertopic.getName());
ordertopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Shared, new PubSubMessageReceivedListener(), TimeSpan.FromMinutes(20));
System.out.println("Durable subscriber registered for topic: " + ordertopic.getName());
} else {
System.out.println("Topic does not exist.");
}
// Topic "DiscountedBeverages" already exists in the cache
let topicName = "DiscountedBeverages";
let subscriptionName = "DiscountedBeveragesSubscription";
// Get the topic
let ordertopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for order Topic
// Message received callback is specified below
// The subscription policy is Shared which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
// Topic "DiscountedBeverages" already exists in the cache
let topicName = "DiscountedBeverages";
let subscriptionName = "DiscountedBeveragesSubscription";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for orderTopic
// MessageReceived callback is specified below
// The subscription policy is Shared which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
独家持久订阅
下面的代码演示了独占订阅策略。
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
// Create and register subscribers for Order topic
// The subscription policy is exclusive which means that the subscription can have only one subscriber
IDurableTopicSubscription orderSubscriber = orderTopic.CreateDurableSubscription(subscriptionName, SubscriptionPolicy.Exclusive, MessageReceived, TimeSpan.FromMinutes(20));
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
System.out.println("Topic retrieved: " + orderTopic.getName());
// Create and register subscribers for Order topic
// The subscription policy is exclusive which means that the subscription can have only one subscriber
orderTopic.createDurableSubscription(subscriptionName,SubscriptionPolicy.Exclusive, new PubSubMessageReceivedListener(),TimeSpan.FromMinutes(20));
System.out.println("Durable subscriber registered for topic: " + orderTopic.getName());
} else {
System.out.println("Topic does not exist.");
}
// Topic "orderTopic" already exists in the cache
let topicName = "orderTopic";
let subscriptionName = "orderTopicName";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for orderTopic
// The subscription policy is Exclusive which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Exclusive, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
# Topic "orderTopic" already exists in the cache
topic_name = "orderTopic"
subscription_name = "orderTopicName"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
# Create and register subscribers for orderTopic
# The subscription policy is exlusive which means that the subscription can have more than one subscriber
order_subscriber = order_topic.create_durable_subscription(subscription_name, ncache.SubscriptionPolicy.EXCLUSIVE, on_message_received, ncache.TimeSpan.from_minutes(20))
多个订阅
使用这种订阅方法,订阅者可以提供通过一次调用订阅多个主题的模式。为此,与模式匹配的主题必须已存在于服务器上,这一点很重要。一旦在基于模式的主题上成功创建订阅,订阅者就会收到在与该模式匹配的主题上发布的消息。此外,如果在针对模式注册基于模式的订阅之后创建主题,它将针对该主题注册该订阅者。
类似地,关于从主题取消注册,此方法会根据提供的模式取消订阅者对所有匹配主题的订阅,而不会影响使用此调用的任何其他订阅。
备注
- 订阅者只能获取基于模式的主题,并且不允许创建它。
- 发布者只能使用模式来接收失败通知。
支持的通配符
基于模式的订阅方法支持以下三个通配符:
*
: 零个或多个字符。 例如, bl*
订阅黑色、蓝色和模糊等。
?
: 任意一个字符。 例如, h?t
订阅 hit、hot 和 hat 等。
[]
: 字符范围。 例如, b[ae]t
订阅 bet and bat,但不是 bit。
使用通配符创建订阅
下面的示例通过提供一个模式来创建订阅,该模式基于该模式订阅了与该模式匹配的主题。
// Create topic name for all topics with suffix Beverages
// Only ? * [] wildcards supported
string topicName = "*Beverages";
// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
// If topic exists, Create subscription
if (topic != null)
{
// Create and register subscribers for Order topic
ITopicSubscription subscription = topic.CreateSubscription(MessageReceived);
}
// Define topic name pattern
String topicNamePattern = "*ages";
// Get all topics that fulfill the pattern
Topic topic = cache.getMessagingService().getTopic(topicNamePattern, TopicSearchOptions.ByPattern);
if (topic != null) {
System.out.println("Matching topic retrieved: " + topic.getName());
// Create and register notifications for topic
topic.createSubscription(new PubSubMessageReceivedListener());
System.out.println("Subscription created for topic: " + topic.getName());
} else {
System.out.println("No matching topic found.");
}
// Topic "Beverages" exists in the cache
// Only ? * [] wildcards supported
let topicName = "*Beverages";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName, ncache.TopicSearchOptions.ByPattern);
// Create and register subscribers for orderTopic
let orderSubscriber = orderTopic.createSubscription(this.messageReceived());
# Topic "Beverages" exists in the cache
# Only ? * [] wildcards supported
topic_name = "*Beverages*"
subscription_name = "orderTopicName"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)
# Create and register subscribers for orderTopic
order_subscriber = order_topic.create_subscription(on_message_received)
异步订阅
可以在为有序消息创建订阅时指定传递模式,并且可以 sync
or async
. 以下示例使用创建订阅 CreateSubscription
具有同步交付模式的方法。 建议使用 sync
有序消息的模式和 async
模式,否则以实现高性能。
创建订阅时,您可以指定同步或异步的传递模式。 特别是,同步交付模式可用于 有序消息,而如果您不使用有序消息,则异步模式会提高性能。
// NewBeverages is the name of the topic created beforehand
string topicName = "NewBeverages";
// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName);
// If topic exists, Create subscription
if (topic != null)
{
// Create and register subscribers for Order topic
// Message received callback is specified
// DeliveryMode is set to async
ITopicSubscription subscription = topic.CreateSubscription(MessageReceived, DeliveryMode.Async);
}
// NewBeverages is the name of the topic created beforehand
String topicName = "NewBeverages";
// Get the topic
Topic topic = cache.getMessagingService().getTopic(topicName);
if (topic != null) {
// Create and register subscribers for Topic
// Message received callback is specified
// DeliveryMode is set to async
topic.createSubscription(new PubSubMessageReceivedListener(),DeliveryMode.Async);
} else {
System.out.println("Topic does not exist.");
}
// This is an async method
// Topic "NewBeverages" already exists in the cache
let topicName = "NewBeverages";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create and register subscribers for orderTopic
// Message received callback is specified
let orderSubscriber = await orderTopic.createSubscription(ncache.onMessageReceived(), ncache.DeliveryMode.Sync);
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
}
else {
// No topic exists
}
# Topic "NewBeverages" already exists in the cache
topic_name = "NewBeverages"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create and register subscribers for orderTopic
# Message received callback is specified
order_subscriber = order_topic.create_subscription(on_message_received, ncache.DeliveryMode.SYNC)
else:
# No topic exists
print("Topic not found")
更多资讯
NCache 提供 Pub/Sub 的示例应用程序 GitHub上.
参见
.NET: Alachisoft.NCache.运行时.缓存 命名空间。
Java的: COM。alachisoft.ncache.runtime.caching 命名空间。
节点.js: 主题订阅类 类。
Python: ncache.client.services 类。