タイトル: Pub/Sub メッセージング: トピックのサブスクライブ description: キャッシュを Pub/Sub メッセージング ストアとして使用する場合にトピック メッセージをサブスクライブする方法について説明します。
Pub/Sub メッセージング: トピックのサブスクライブ
Pub/Sub メッセージング モデルでは、サブスクライバー (またはクライアント アプリケーション) は、サブスクリプションを通じて特定のトピックに登録できます。それで、 NCache 複数を提供 Pub/Sub トピックのサブスクリプションの種類.
Pub/Sub メッセージング: 前提条件トピックのサブスクリプション
Pub/Sub メッセージング モデルでトピックをサブスクライブする前に、次の前提条件が満たされていることを確認してください。
- すべてを使用するために必要な標準的な前提条件について学習するには NCache クライアント側の機能については、指定されたページを参照してください クライアント側 API の前提条件.
- APIの詳細については、以下を参照してください。 Iキャッシュ, キャッシュ項目, Iトピック, IDurableトピックサブスクリプション, サブスクリプションの作成, CreateDurableSubscription,トピック名, サブスクリプション名, IMessagingService, トピックの取得, 購読ポリシー, 登録解除, メッセージイベント引数, 配信モード, トピック検索オプション, ITopicサブスクリプション, メッセージ配信失敗, ペイロード.
- すべてを使用するために必要な標準的な前提条件について学習するには NCache クライアント側の機能については、指定されたページを参照してください クライアント側 API の前提条件.
- APIの詳細については、以下を参照してください。 キャッシュ, キャッシュ項目, ご用件, トピック購読, getMessagingService, トピックの取得, createDurableSubscription, 購読ポリシー, 期間, 登録解除, メッセージ受信リスナー, onMessageReceived, メッセージイベント引数, メッセージを取得する, getペイロード, サブスクリプションの作成, 配信モード, トピック検索オプション, addMessageDeliverFailureListener.
- すべてを使用するために必要な標準的な前提条件について学習するには NCache クライアント側の機能については、指定されたページを参照してください クライアント側 API の前提条件.
- APIの詳細については、以下を参照してください。 キャッシュ, キャッシュ項目, トピック購読, getMessagingService, トピックの取得, createDurableSubscription, 購読ポリシー, 期間, 登録解除, メッセージイベント引数, メッセージ受信リスナー, メッセージを取得する, getペイロード, サブスクリプションの作成, 配信モード, トピック検索オプション, トピックリスナー, addMessageDeliverFailureListener.
- すべてを使用するために必要な標準的な前提条件について学習するには NCache クライアント側の機能については、指定されたページを参照してください クライアント側 API の前提条件.
- APIの詳細については、以下を参照してください。 キャッシュ, キャッシュ項目, トピックの取得, get_messaging_service, create_durable_subscription, get_payload, 期間, メッセージイベント引数, get_message, サブスクリプションの作成, 購読解除, add_message_delivery_failure_listener.
サブスクリプションを作成する方法
ここでは、Pub/Sub メッセージング モデルで非永続サブスクリプション、永続サブスクリプション、複数のサブスクリプション、および非同期サブスクリプションを作成する方法について説明します。
非永続サブスクリプション
Note
この機能は以下でも利用できます NCache Professional.
ITopic
/Topic
作成を容易にするインターフェース 非永続的なサブスクリプション トピックに対するメッセージの公開。サブスクリプションの作成メソッドは、トピックが存在する場合、トピックに対して非永続サブスクリプションを登録します。加入者が登録できるようになります MessageReceivedCallback
トピックに対して、公開されたメッセージを受信できるようにします。
次のコードサンプルは次のことを行います。
- 関心のある既存のトピックを取得します。
NewBeverages
.
- 各トピックのサブスクリプションを作成します。
- トピックに公開されたメッセージを受信するサブスクライバーのイベントを登録します。
// Precondition: Cache is already connected
// NewBeverages is the name of the topic created, beforehand
string topicName = "NewBeverages";
// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName);
// If topic exists, Create subscription
if (topic != null)
{
// Create and register subscribers for order topic
// Message received callback is specified
ITopicSubscription subscription = topic.CreateSubscription(MessageReceived);
}
// Precondition: Cache is already connected
// NewBeverages is the name of the topic created beforehand
String topicName = "NewBeverages";
// Get the topic
Topic ordertopic = cache.getMessagingService().getTopic(topicName);
// If topic exists, Create subscription
if (ordertopic != null) {
// Create and register subscribers for order topic
ordertopic.createSubscription(new PubSubMessageReceivedListener());
System.out.println("Subscriber registered for topic: " + ordertopic.getName());
} else {
System.out.println("Topic does not exist.");
}
// This is an async method
// Topic "NewBeverages" already exists in the cache
let topicName = "NewBeverages";
// Get the topic
let ordertopic = await ncache.getMessagingService().getTopic(topicName);
if (ordertopic != null) {
// Create and register subscribers for topic
// Message received callback is specified
let ordersubscriber = await ordertopic.createSubscription(ncache.onMessageReceived());
}
else {
// No topic exists
}
# Topic "NewBeverages" already exists in the cache
topic_name = "NewBeverages"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create and register subscribers for the topic
# Message received callback is specified
order_subscriber = order_topic.create_subscription(on_message_received)
else:
# No topic exists
print("Topic not found")
Note
操作がフェイルセーフであることを保証するために、で説明されているように、アプリケーション内の潜在的な例外を処理することをお勧めします。 失敗の処理.
耐久性のあるサブスクリプション
Note
この機能は以下でも利用できます NCache Professional.
IDurableTopicSubscription
作成を容易にするインターフェース 永続的なサブスクリプション トピックが存在する場合は、トピックに対してメッセージをパブリッシュします。これにより、加入者は MessageReceivedCallback
トピックに対して、公開されたメッセージを受信できるようにします。
共有永続サブスクリプション
次のコードは、共有サブスクリプション ポリシーを示しています。
// DiscountedBeverages is the name of the topic created, beforehand
string topicName = "DiscountedBeverages";
string subscriptionName = "DiscountedBeveragesSubscription";
// Get the topic
ITopic ordertopic = cache.MessagingService.GetTopic(topicName);
if (ordertopic != null)
{
// Create and register subscribers for topic
// Message received callback is specified
// The subscription policy is shared which means that the subscription can have more than one subscribers
IDurableTopicSubscription ordersubscription = ordertopic.CreateDurableSubscription (subscriptionName, SubscriptionPolicy.Shared, MessageReceived, TimeSpan.FromMinutes(20));
}
// DiscountedBeverages is the name of the topic created beforehand
String topicName = "DiscountedBeverages";
Topic ordertopic = cache.getMessagingService().getTopic(topicName);
// Get the topic
if (ordertopic != null) {
// Create and register subscribers for topic
// The subscription policy is shared which means that the subscription can have more than one subscribers
System.out.println("Topic retrieved: " + ordertopic.getName());
ordertopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Shared, new PubSubMessageReceivedListener(), TimeSpan.FromMinutes(20));
System.out.println("Durable subscriber registered for topic: " + ordertopic.getName());
} else {
System.out.println("Topic does not exist.");
}
// Topic "DiscountedBeverages" already exists in the cache
let topicName = "DiscountedBeverages";
let subscriptionName = "DiscountedBeveragesSubscription";
// Get the topic
let ordertopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for order Topic
// Message received callback is specified below
// The subscription policy is Shared which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
// Topic "DiscountedBeverages" already exists in the cache
let topicName = "DiscountedBeverages";
let subscriptionName = "DiscountedBeveragesSubscription";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for orderTopic
// MessageReceived callback is specified below
// The subscription policy is Shared which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
排他的永続サブスクリプション
次のコードは、排他的サブスクリプション ポリシーを示しています。
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
// Create and register subscribers for Order topic
// The subscription policy is exclusive which means that the subscription can have only one subscriber
IDurableTopicSubscription orderSubscriber = orderTopic.CreateDurableSubscription(subscriptionName, SubscriptionPolicy.Exclusive, MessageReceived, TimeSpan.FromMinutes(20));
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
System.out.println("Topic retrieved: " + orderTopic.getName());
// Create and register subscribers for Order topic
// The subscription policy is exclusive which means that the subscription can have only one subscriber
orderTopic.createDurableSubscription(subscriptionName,SubscriptionPolicy.Exclusive, new PubSubMessageReceivedListener(),TimeSpan.FromMinutes(20));
System.out.println("Durable subscriber registered for topic: " + orderTopic.getName());
} else {
System.out.println("Topic does not exist.");
}
// Topic "orderTopic" already exists in the cache
let topicName = "orderTopic";
let subscriptionName = "orderTopicName";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for orderTopic
// The subscription policy is Exclusive which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Exclusive, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
# Topic "orderTopic" already exists in the cache
topic_name = "orderTopic"
subscription_name = "orderTopicName"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
# Create and register subscribers for orderTopic
# The subscription policy is exlusive which means that the subscription can have more than one subscriber
order_subscriber = order_topic.create_durable_subscription(subscription_name, ncache.SubscriptionPolicy.EXCLUSIVE, on_message_received, ncache.TimeSpan.from_minutes(20))
複数のサブスクリプション
このサブスクリプション方法を使用すると、サブスクライバーは 1 回の呼び出しで複数のトピックをサブスクライブするパターンを提供できます。このためには、パターンに一致するトピックがサーバー上にすでに存在していることが重要です。パターンベースのトピックでサブスクリプションが正常に作成されると、サブスクライバーはパターンに一致するトピックでパブリッシュされたメッセージを受信します。また、パターンベースのサブスクリプションがパターンに対して登録された後にトピックが作成された場合、そのサブスクライバーがそのトピックに対して登録されます。
同様に、トピックからの登録解除に関して、このメソッドは、この呼び出しを使用する他のサブスクリプションに影響を与えることなく、提供されたパターンを持つすべての一致するトピックに対してサブスクライバーを登録解除します。
Note
- サブスクライバーは、パターン ベースのトピックのみを取得でき、作成することはできません。
- パターンは、発行者が失敗通知を受信する目的でのみ使用できます。
サポートされているワイルドカード
パターンベースのサブスクリプション方法では、次の XNUMX つのワイルドカードがサポートされます。
*
: XNUMX 個または多数の文字。 例えば、 bl*
黒、青、ぼかしなどを購読します。
?
: 任意の XNUMX 文字。 例えば、 h?t
hit、hot、hatなどを購読しています。
[]
: 文字の範囲。 例えば、 b[ae]t
bet と but には登録していますが、bit には登録していません。
ワイルドカードを使用したサブスクリプションの作成
次の例では、パターンに一致するトピックがサブスクライブされるパターンを提供することにより、サブスクリプションを作成します。
// Create topic name for all topics with suffix Beverages
// Only ? * [] wildcards supported
string topicName = "*Beverages";
// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
// If topic exists, Create subscription
if (topic != null)
{
// Create and register subscribers for Order topic
ITopicSubscription subscription = topic.CreateSubscription(MessageReceived);
}
// Define topic name pattern
String topicNamePattern = "*ages";
// Get all topics that fulfill the pattern
Topic topic = cache.getMessagingService().getTopic(topicNamePattern, TopicSearchOptions.ByPattern);
if (topic != null) {
System.out.println("Matching topic retrieved: " + topic.getName());
// Create and register notifications for topic
topic.createSubscription(new PubSubMessageReceivedListener());
System.out.println("Subscription created for topic: " + topic.getName());
} else {
System.out.println("No matching topic found.");
}
// Topic "Beverages" exists in the cache
// Only ? * [] wildcards supported
let topicName = "*Beverages";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName, ncache.TopicSearchOptions.ByPattern);
// Create and register subscribers for orderTopic
let orderSubscriber = orderTopic.createSubscription(this.messageReceived());
# Topic "Beverages" exists in the cache
# Only ? * [] wildcards supported
topic_name = "*Beverages*"
subscription_name = "orderTopicName"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)
# Create and register subscribers for orderTopic
order_subscriber = order_topic.create_subscription(on_message_received)
非同期サブスクリプション
配信モードは、注文したメッセージのサブスクリプションを作成するときに指定でき、 sync
or async
。 次の例では、を使用してサブスクリプションを作成します CreateSubscription
同期配信モードのメソッド。 使用をお勧めします sync
順序付けられたメッセージのモードと async
それ以外の場合は、高性能を実現するためのモード。
サブスクリプションの作成中に、同期または非同期の配信モードを指定できます。 特に、同期配信モードは次の目的で使用できます。 注文したメッセージ、順序付きメッセージを使用していない場合は、非同期モードでパフォーマンスが向上します。
Note
デフォルトでは、メッセージの配信モードは同期に設定されています。
// NewBeverages is the name of the topic created beforehand
string topicName = "NewBeverages";
// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName);
// If topic exists, Create subscription
if (topic != null)
{
// Create and register subscribers for Order topic
// Message received callback is specified
// DeliveryMode is set to async
ITopicSubscription subscription = topic.CreateSubscription(MessageReceived, DeliveryMode.Async);
}
// NewBeverages is the name of the topic created beforehand
String topicName = "NewBeverages";
// Get the topic
Topic topic = cache.getMessagingService().getTopic(topicName);
if (topic != null) {
// Create and register subscribers for Topic
// Message received callback is specified
// DeliveryMode is set to async
topic.createSubscription(new PubSubMessageReceivedListener(),DeliveryMode.Async);
} else {
System.out.println("Topic does not exist.");
}
// This is an async method
// Topic "NewBeverages" already exists in the cache
let topicName = "NewBeverages";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create and register subscribers for orderTopic
// Message received callback is specified
let orderSubscriber = await orderTopic.createSubscription(ncache.onMessageReceived(), ncache.DeliveryMode.Sync);
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
}
else {
// No topic exists
}
# Topic "NewBeverages" already exists in the cache
topic_name = "NewBeverages"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create and register subscribers for orderTopic
# Message received callback is specified
order_subscriber = order_topic.create_subscription(on_message_received, ncache.DeliveryMode.SYNC)
else:
# No topic exists
print("Topic not found")
その他のリソース
NCache Pub/Subのサンプルアプリケーションを提供します GitHubの.
も参照してください
。ネット: Alachisoft.NCache.ランタイム.キャッシュ 名前空間
Java: comの。alachisoft.ncache.ランタイムキャッシュ 名前空間
Node.js: トピックサブスクリプション クラス とに提供されます。
Python: ncache.client.services とに提供されます。