Pub/Sub モデルのトピックにメッセージをパブリッシュする
ITopic
/Topic
pub/Sub モデルのインターフェイスにより、トピックに対するメッセージのパブリッシュが容易になります。これにより、メッセージ配信の失敗、メッセージの受信、およびトピックの削除に関するイベント登録も提供されます。
Note
この機能は以下でも利用できます NCache Professional.
それは提供します Publish
このメソッドは、キャッシュ内の特定のトピックにメッセージを発行します。 メッセージをトピックにパブリッシュする際、パブリッシャーは 配送オプション メッセージの場合、 メッセージの有効期限, メッセージ配信失敗通知, トピック削除通知.
ここでは、その方法について説明します メッセージを公開する, メッセージを非同期にパブリッシュする, 一括メッセージを発行する & 順序付けられたメッセージを発行する.
前提条件
- すべてを使用するために必要な標準的な前提条件について学習するには NCache パブリッシュ/サブスクライブ モデルを含むクライアント側の機能については、次のページを参照してください。 クライアント側 API の前提条件.
- APIの詳細については、以下を参照してください。 Iキャッシュ, キャッシュ項目, Iトピック, パブリッシュ, トピックの取得, パブリッシュ非同期, 有効期限, メッセージ配信失敗, トピック削除済み, 配送オプション, メッセージ, 一括公開, MessageFailedEventArgs, トピック削除イベント引数.
- すべてを使用するために必要な標準的な前提条件について学習するには NCache パブリッシュ/サブスクライブ モデルを含むクライアント側の機能については、次のページを参照してください。 クライアント側 API の前提条件.
- APIの詳細については、以下を参照してください。 キャッシュ, キャッシュ項目, ご用件, トピックの取得, パブリッシュ, メッセージ, setExpirationTime, トピックリスナー, addMessageDeliverFailureListener, addTopic削除リスナー, 配送オプション, すべて, パブリッシュ非同期, パブリッシュバルク, トピック削除イベント引数, MessageFailedEventArgs, メッセージイベント引数.
- すべてを使用するために必要な標準的な前提条件について学習するには NCache パブリッシュ/サブスクライブ モデルを含むクライアント側の機能については、次のページを参照してください。 クライアント側 API の前提条件.
- APIの詳細については、以下を参照してください。 キャッシュ, キャッシュ項目, ご用件, トピックの取得, パブリッシュ, getMessagingService, 期間, setExpirationTime, トピックリスナー, addMessageDeliverFailureListener, addTopic削除リスナー, 配送オプション, パブリッシュバルク, メッセージ受信リスナー.
- すべてを使用するために必要な標準的な前提条件について学習するには NCache パブリッシュ/サブスクライブ モデルを含むクライアント側の機能については、次のページを参照してください。 クライアント側 API の前提条件.
- APIの詳細については、以下を参照してください。 キャッシュ, キャッシュ項目, トピックの取得, get_messaging_service, 期間, set_expiration_time, add_message_delivery_failure_listener, add_topic_deleted_listener, パブリッシュ_非同期, 一括発行, メッセージイベント引数, トピック名を取得する, MessageFailedEventArgs, get_message_failure_reason, トピック削除イベント引数.
メッセージを公開する
次のコードサンプルは次のことを行います。
- 注文関連のメッセージ専用のトピックを作成します。
- を登録する
MessageDeliveryFailure
話題のイベント。
- を登録する
OnTopicDeleted
話題のイベント。
- トピックごとにメッセージを作成し、有効期限と配信オプションを有効にします。
- メッセージを公開します。
// Precondition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
orderTopic.Publish(orderMessage, DeliveryOption.All, true);
}
else
{
// No topic exists
}
// Precondition: Cache is already connected
// Already existing topic
String topicName = "orderTopic";
// Get topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, DeliveryOption.All, true);
} else {
// No topic exists
}
// This is an async method
// Precondition: Cache is already connected
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (!(orderTopic == null)) {
// Create object to be sent in message
let order = await this.fetchOrdersFromDb(this.orderId);
// Create a new message with the object order
let orderMessage = new ncache.Message(order);
// Set the expiration time of the message
let expiryTime = new ncache.TimeSpan(12, 12, 12);
orderMessage.setExpirationTime(expiryTime);
let topicListener = new ncache.MyTopicListener();
// Register message delivery failure
orderTopic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, ncache.DeliveryOption.All, true);
} else {
// No topic exists
}
# Precondition: Cache is already connected
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True)
else:
# No topic exists
print("Topic not found")
Note
操作がフェイルセーフであることを保証するために、で説明されているように、アプリケーション内の潜在的な例外を処理することをお勧めします。 失敗の処理.
非同期で公開
メッセージは、次を使用してトピックに非同期的に公開できます。 パブリッシュ非同期 これにより、アプリケーションは次の操作を実行するために操作の完了を待たなくなります。 ユーザーは、さらなる処理のために制御をすぐに返します。
次の例では、メッセージを非同期で公開できます。
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
Task task = orderTopic.PublishAsync(orderMessage, DeliveryOption.All, true);
if(task.IsFaulted)
{
// Task Failed
}
}
else
{
// No topic exists
}
// Topic orderTopic already exists in the cache
String topicName = "orderTopic";
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
TimeScheduler.Task task = (TimeScheduler.Task) orderTopic.publishAsync(orderMessage, DeliveryOption.All, true);
if (task.IsCancelled()) {
// task cancelled
}
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
task = order_topic.publish_async(order_message, ncache.DeliveryOption.ALL, True)
# Use this task object as per your business logic
else:
# No topic exists
print("Topic not found")
一括メッセージを公開する
を使用して、XNUMX 回の呼び出しで複数のメッセージをパブリッシュできます。 一括公開 方法。 これにより、大量のメッセージがXNUMX回の呼び出しで結合および公開されるため、パフォーマンスとメモリ使用量が向上します。
以下のコードは、すでに作成されたトピックのインスタンスを取ります orderTopic
、トピックへのメッセージの一括パブリッシュを示します。
// Topic "orderTopic" exists in cache
ITopic topic = cache.MessagingService.GetTopic("orderTopic");
if (topic != null)
{
// create dictionary for storing bulk
List<Tuple<Message, DeliveryOption>> messageList = new List<Tuple<Message, DeliveryOption>>();
Order[] orders = FetchOrdersFromDB();
for (int i = 0; i < 100; i++)
{
Message message = new Message(orders[i]);
message.ExpirationTime = TimeSpan.FromSeconds(10000);
messageList.Add(new Tuple<Message, DeliveryOption>(message, DeliveryOption.All));
}
// Register message delivery failure
topic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
topic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
// In case of failed publishing of messages, exceptions
// will be returned
IDictionary<Message, Exception> keys = topic.PublishBulk(messageList, true);
}
// topic already exists
String topicName = "orderTopic";
String customerID = "DUMON";
Topic topic = cache.getMessagingService().getTopic(topicName);
if (topic != null) {
// create dictionary for storing bulk
Map messageMap = new HashMap();
Order[] orders = fetchOrdersFromDb(customerID);
for (int i = 0; i < 100; i++) {
Message message = new Message(orders[i]);
message.setExpirationTime(TimeSpan.FromSeconds(10000));
messageMap.put(message, DeliveryOption.All);
}
MyTopicListener topicListener = new MyTopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
Map<Message, Exception> keys = topic.publishBulk(messageMap, true);
} else {
// topic is null
}
// This is an async method
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let topic = ncache.getMessagingService().getTopic(topicName);
if (topic != null) {
// Create Map for storing messages in bulk
let messageMap = new Map();
let orders = this.fetchOrdersByCustomerId(this.customerId);
var i;
for (i = 0; i < 100; i++) {
let message = new ncache.Message(orders[i]);
message.setExpirationTime(ncache.TimeSpan.FromSeconds(10000));
messageMap.set(message, ncache.DeliveryOption.All);
}
let topicListener = new ncache.TopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
// In case of failed publishing of messages, exception will be returned
let keys = topic.publishBulk(messageMap, true);
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
topic = cache.get_messaging_service().get_topic(topic_name)
if topic is not None:
# Create Dict for storing messages in bulk
messages_map = {}
orders = fetch_orders_by_customer_id("ALFKI")
for i in range(0, 100):
message = ncache.Message(orders[++i])
message.set_expiration_time(ncache.TimeSpan.from_seconds(10000))
messages_map[message] = ncache.DeliveryOption.ALL
# Register message delivery failure
topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
# In case of failed publishing of messages, exception will be returned
keys = topic.publish_bulk(messages_map, True)
else:
# No topic exists
print("Topic not found")
注文したメッセージを公開する
Note
この機能はでのみ利用可能です NCache 5.2以降
シーケンス名を指定することでメッセージを公開でき、メッセージが特定の順序で公開されます。 指定します 注文したメッセージ、文字列シーケンス名がメッセージのチェーンに追加され、特定のシーケンス名に属するすべてのメッセージが同じサーバー ノード上で確実にパブリッシュされます。
以下の例では、シーケンス名がメッセージに追加され、メッセージは Publish
方法。
// Specify the topic name that already exists
string topicName = "orderTopic";
// Get the topic with the specified name
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (topicName != null)
{
for (int i = 0; i < 30; i++)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the messages
string sequenceName = "OrderMessages";
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publish message with the sequence name
orderTopic.Publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
}
else
{
// No topic found
}
// Specify the topic name that already exists
String topicName = "orderTopic";
// Get the topic with the specified name
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
for (int i = 0; i < 30; i++) {
// Create the object to be sent in the message
Order order = fetchOrdersFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the message
String sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
} else {
// No topic found
}
// This is an async method
// Specify the topic name that already exists
let topicName = "orderTopic";
// Get the topic with the specified name
let orderTopic = await this.cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
var i;
for (i = 0; i < 30; i++) {
// Create the object to be sent in the message
let order = await this.fetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new ncache.Message(order);
// Specify a unique sequence name for the message
let sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(ncache.TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(
orderMessage,
ncache.DeliveryOption.All,
sequenceName,
true
);
}
} else {
// No topic found
}
# Specify the topic name that already exists
topic_name = "orderTopic"
# Get the topic with the specified name
order_topic = cache.get_messaging_service().get_topic(topic_name)
if topic_name is not None:
for i in range(0, 30):
# Create the object to be sent in the message
order = fetch_order_from_db(10248)
# Create the new message with the object order
order_message = ncache.Message(order)
# Specify a unique sequence name for the message
sequence_name = "OrderMessage"
# Set the expiration time of the message
order_message.set_expiration_time(ncache.TimeSpan.from_seconds(5000))
# Publish message with the sequence name
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True, sequence_name)
else:
# No topic found
print("Topic not found")
コールバックを登録する
private void OnFailureMessageReceived(object sender, MessageFailedEventArgs args)
{
// Failure reason can be get from args.MessageFailureReason
}
private void TopicDeleted(object sender, TopicDeleteEventArgs args)
{
// Deleted topic is args.TopicName
}
@Override
public void onTopicDeleted(Object sender, TopicDeleteEventArgs args) {
// Deleted topic is args.getTopicName();
}
@Override
public void onMessageDeliveryFailure(Object sender, MessageFailedEventArgs args) {
// Failure reason is args.getMessageFailureReason();
}
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// Perform operations
}
onTopicDeleted(sender, args) {
// Deleted topic is args.getTopicName()
}
onMessageDeliveryFailure(sender, args) {
// Failure reason is args.getMessageFailureReason()
}
onMessageReceived(sender, args) {
// Perform operations
}
def on_topic_deleted(sender: object, args: ncache.TopicDeleteEventArgs):
# Perform operations
print("Deleted topic is " + args.get_topic_name())
def on_message_delivery_failure(sender: object, args: ncache.MessageFailedEventArgs):
# Perform operations
print("Failure reason is " + str(args.get_message_failure_reason()))
def on_message_received(sender: object, args: ncache.MessageEventArgs):
# Perform operations
print("Message received from topic " + args.get_topic_name())
その他のリソース
NCache Pub/Subのサンプルアプリケーションを提供します GitHubの.
も参照してください
。ネット: Alachisoft.NCache.ランタイム.キャッシュ 名前空間
Java: comの。alachisoft.ncache.ランタイムキャッシュ 名前空間
Node.js: ご用件 とに提供されます。
Python: ncache.client.services とに提供されます。