Veröffentlichen Sie Nachrichten zu einem Thema
Note
Diese Funktion ist auch in verfügbar NCache Professional.
Das ITopic
/Topic
Die Schnittstelle erleichtert das Veröffentlichen von Nachrichten zum Thema. Dies ermöglicht auch Ereignisregistrierungen für fehlgeschlagene Nachrichtenübermittlungen, den Empfang von Nachrichten und das Löschen von Themen.
Es bietet die Publish
-Methode, die die Nachricht zu einem bestimmten Thema im Cache veröffentlicht. Beim Veröffentlichen von Nachrichten zu einem Thema kann der Herausgeber das festlegen Liefermöglichkeit für Nachrichten, Ablauf der Nachricht, Benachrichtigung über fehlgeschlagene Nachrichtenübermittlung und Benachrichtigung zum Löschen des Themas.
Hier beschreiben wir, wie es geht Nachrichten veröffentlichen, Nachrichten asynchron veröffentlichen, Massennachrichten veröffentlichen und geordnete Nachrichten veröffentlichen.
Voraussetzungen:
- Lernen Sie die Standardvoraussetzungen kennen, die für die Arbeit mit allen erforderlich sind NCache Weitere Informationen zu clientseitigen Funktionen finden Sie auf der angegebenen Seite Clientseitige API-Voraussetzungen.
- Einzelheiten zur API finden Sie unter: ICache, CacheItem, IThema, Veröffentlichen, GetThema, PublishAsync, Ablaufzeit, Senden der nachricht fehlgeschlagen, OnTopicGelöscht, Liefermöglichkeit, Nachricht, Bulk veröffentlichen, MessageFailedEventArgs, ThemaDeleteEventArgs.
- Lernen Sie die Standardvoraussetzungen kennen, die für die Arbeit mit allen erforderlich sind NCache Weitere Informationen zu clientseitigen Funktionen finden Sie auf der angegebenen Seite Clientseitige API-Voraussetzungen.
- Einzelheiten zur API finden Sie unter: Cache-Speicher, CacheItem, Betreff, getThema, veröffentlichen, Nachricht, setExpirationTime, TopicListener, addMessageDeliveryFailureListener, addTopicDeletedListener, Liefermöglichkeit, Alle, PublishAsync, veröffentlichenBulk, ThemaDeleteEventArgs, MessageFailedEventArgs, MessageEventArgs.
- Lernen Sie die Standardvoraussetzungen kennen, die für die Arbeit mit allen erforderlich sind NCache Weitere Informationen zu clientseitigen Funktionen finden Sie auf der angegebenen Seite Clientseitige API-Voraussetzungen.
- Einzelheiten zur API finden Sie unter: Cache-Speicher, CacheItem, Betreff, getThema, veröffentlichen, getMessagingService, Zeitspanne, setExpirationTime, TopicListener, addMessageDeliveryFailureListener, addTopicDeletedListener, Liefermöglichkeit, veröffentlichenBulk, MessageReceivedListener.
- Lernen Sie die Standardvoraussetzungen kennen, die für die Arbeit mit allen erforderlich sind NCache Weitere Informationen zu clientseitigen Funktionen finden Sie auf der angegebenen Seite Clientseitige API-Voraussetzungen.
- Einzelheiten zur API finden Sie unter: Cache-Speicher, CacheItem, get_topic, get_messaging_service, Zeitspanne, set_expiration_time, add_message_delivery_failure_listener, add_topic_deleted_listener, Publish_async, Publish_Bulk, MessageEventArgs, get_topic_name, MessageFailedEventArgs, get_message_failure_reason, ThemaDeleteEventArgs.
Nachrichten veröffentlichen
Das folgende Codebeispiel bewirkt Folgendes:
- Erstellen Sie spezielle Themen für auftragsbezogene Nachrichten.
- Registrieren Sie die
MessageDeliveryFailure
Veranstaltung zum Thema.
- Registrieren Sie die
OnTopicDeleted
Veranstaltung zum Thema.
- Erstellen Sie Nachrichten für jedes Thema und aktivieren Sie Ablauf- und Zustelloptionen.
- Veröffentlichen Sie die Nachrichten.
// Precondition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
orderTopic.Publish(orderMessage, DeliveryOption.All, true);
}
else
{
// No topic exists
}
// Precondition: Cache is already connected
// Already existing topic
String topicName = "orderTopic";
// Get topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, DeliveryOption.All, true);
} else {
// No topic exists
}
// This is an async method
// Precondition: Cache is already connected
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (!(orderTopic == null)) {
// Create object to be sent in message
let order = await this.fetchOrdersFromDb(this.orderId);
// Create a new message with the object order
let orderMessage = new ncache.Message(order);
// Set the expiration time of the message
let expiryTime = new ncache.TimeSpan(12, 12, 12);
orderMessage.setExpirationTime(expiryTime);
let topicListener = new ncache.MyTopicListener();
// Register message delivery failure
orderTopic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, ncache.DeliveryOption.All, true);
} else {
// No topic exists
}
# Precondition: Cache is already connected
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True)
else:
# No topic exists
print("Topic not found")
Note
Um sicherzustellen, dass der Vorgang ausfallsicher ist, wird empfohlen, alle potenziellen Ausnahmen in Ihrer Anwendung zu behandeln, wie in erläutert Umgang mit Fehlern.
Veröffentlichen Sie asynchron
Nachrichten können zum Thema asynchron mit veröffentlicht werden PublishAsync sodass die Anwendung nicht auf den Abschluss der Operation wartet, um die nächste Operation auszuführen. Der Benutzer erhält die Kontrolle sofort zur weiteren Bearbeitung zurück.
Im folgenden Beispiel können Sie eine Nachricht asynchron veröffentlichen.
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
Task task = orderTopic.PublishAsync(orderMessage, DeliveryOption.All, true);
if(task.IsFaulted)
{
// Task Failed
}
}
else
{
// No topic exists
}
// Topic orderTopic already exists in the cache
String topicName = "orderTopic";
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
TimeScheduler.Task task = (TimeScheduler.Task) orderTopic.publishAsync(orderMessage, DeliveryOption.All, true);
if (task.IsCancelled()) {
// task cancelled
}
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
task = order_topic.publish_async(order_message, ncache.DeliveryOption.ALL, True)
# Use this task object as per your business logic
else:
# No topic exists
print("Topic not found")
Massennachrichten veröffentlichen
Mehrere Nachrichten können in einem einzigen Aufruf mit veröffentlicht werden Bulk veröffentlichen Methode. Dies verbessert die Leistung und die Speicherauslastung, da ein Großteil der Nachrichten kombiniert und in einem einzigen Aufruf veröffentlicht wird.
Der folgende Code nimmt eine Instanz eines bereits erstellten Themas orderTopic
, und zeigt die Massenveröffentlichung von Nachrichten zum Thema an.
// Topic "orderTopic" exists in cache
ITopic topic = cache.MessagingService.GetTopic("orderTopic");
if (topic != null)
{
// create dictionary for storing bulk
List<Tuple<Message, DeliveryOption>> messageList = new List<Tuple<Message, DeliveryOption>>();
Order[] orders = FetchOrdersFromDB();
for (int i = 0; i < 100; i++)
{
Message message = new Message(orders[i]);
message.ExpirationTime = TimeSpan.FromSeconds(10000);
messageList.Add(new Tuple<Message, DeliveryOption>(message, DeliveryOption.All));
}
// Register message delivery failure
topic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
topic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
// In case of failed publishing of messages, exceptions
// will be returned
IDictionary<Message, Exception> keys = topic.PublishBulk(messageList, true);
}
// topic already exists
String topicName = "orderTopic";
String customerID = "DUMON";
Topic topic = cache.getMessagingService().getTopic(topicName);
if (topic != null) {
// create dictionary for storing bulk
Map messageMap = new HashMap();
Order[] orders = fetchOrdersFromDb(customerID);
for (int i = 0; i < 100; i++) {
Message message = new Message(orders[i]);
message.setExpirationTime(TimeSpan.FromSeconds(10000));
messageMap.put(message, DeliveryOption.All);
}
MyTopicListener topicListener = new MyTopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
Map<Message, Exception> keys = topic.publishBulk(messageMap, true);
} else {
// topic is null
}
// This is an async method
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let topic = ncache.getMessagingService().getTopic(topicName);
if (topic != null) {
// Create Map for storing messages in bulk
let messageMap = new Map();
let orders = this.fetchOrdersByCustomerId(this.customerId);
var i;
for (i = 0; i < 100; i++) {
let message = new ncache.Message(orders[i]);
message.setExpirationTime(ncache.TimeSpan.FromSeconds(10000));
messageMap.set(message, ncache.DeliveryOption.All);
}
let topicListener = new ncache.TopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
// In case of failed publishing of messages, exception will be returned
let keys = topic.publishBulk(messageMap, true);
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
topic = cache.get_messaging_service().get_topic(topic_name)
if topic is not None:
# Create Dict for storing messages in bulk
messages_map = {}
orders = fetch_orders_by_customer_id("ALFKI")
for i in range(0, 100):
message = ncache.Message(orders[++i])
message.set_expiration_time(ncache.TimeSpan.from_seconds(10000))
messages_map[message] = ncache.DeliveryOption.ALL
# Register message delivery failure
topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
# In case of failed publishing of messages, exception will be returned
keys = topic.publish_bulk(messages_map, True)
else:
# No topic exists
print("Topic not found")
Geordnete Nachrichten veröffentlichen
Note
Diese Funktion ist nur verfügbar in NCache 5.2 und weiter
Nachrichten können durch Angabe eines Sequenznamens veröffentlicht werden, was dazu führt, dass die Nachrichten in einer bestimmten Reihenfolge veröffentlicht werden. Konkretisieren geordnete Nachrichtenwird der Kette der Nachrichten ein String-Sequenzname hinzugefügt, der sicherstellt, dass alle Nachrichten, die zu einem bestimmten Sequenznamen gehören, auf demselben Serverknoten veröffentlicht werden.
Im folgenden Beispiel wird den Nachrichten ein Sequenzname hinzugefügt, und die Nachrichten werden dann mit veröffentlicht Publish
Methode.
// Specify the topic name that already exists
string topicName = "orderTopic";
// Get the topic with the specified name
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (topicName != null)
{
for (int i = 0; i < 30; i++)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the messages
string sequenceName = "OrderMessages";
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publish message with the sequence name
orderTopic.Publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
}
else
{
// No topic found
}
// Specify the topic name that already exists
String topicName = "orderTopic";
// Get the topic with the specified name
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
for (int i = 0; i < 30; i++) {
// Create the object to be sent in the message
Order order = fetchOrdersFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the message
String sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
} else {
// No topic found
}
// This is an async method
// Specify the topic name that already exists
let topicName = "orderTopic";
// Get the topic with the specified name
let orderTopic = await this.cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
var i;
for (i = 0; i < 30; i++) {
// Create the object to be sent in the message
let order = await this.fetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new ncache.Message(order);
// Specify a unique sequence name for the message
let sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(ncache.TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(
orderMessage,
ncache.DeliveryOption.All,
sequenceName,
true
);
}
} else {
// No topic found
}
# Specify the topic name that already exists
topic_name = "orderTopic"
# Get the topic with the specified name
order_topic = cache.get_messaging_service().get_topic(topic_name)
if topic_name is not None:
for i in range(0, 30):
# Create the object to be sent in the message
order = fetch_order_from_db(10248)
# Create the new message with the object order
order_message = ncache.Message(order)
# Specify a unique sequence name for the message
sequence_name = "OrderMessage"
# Set the expiration time of the message
order_message.set_expiration_time(ncache.TimeSpan.from_seconds(5000))
# Publish message with the sequence name
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True, sequence_name)
else:
# No topic found
print("Topic not found")
Rückrufe registrieren
private void OnFailureMessageReceived(object sender, MessageFailedEventArgs args)
{
// Failure reason can be get from args.MessageFailureReason
}
private void TopicDeleted(object sender, TopicDeleteEventArgs args)
{
// Deleted topic is args.TopicName
}
@Override
public void onTopicDeleted(Object sender, TopicDeleteEventArgs args) {
// Deleted topic is args.getTopicName();
}
@Override
public void onMessageDeliveryFailure(Object sender, MessageFailedEventArgs args) {
// Failure reason is args.getMessageFailureReason();
}
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// Perform operations
}
onTopicDeleted(sender, args) {
// Deleted topic is args.getTopicName()
}
onMessageDeliveryFailure(sender, args) {
// Failure reason is args.getMessageFailureReason()
}
onMessageReceived(sender, args) {
// Perform operations
}
def on_topic_deleted(sender: object, args: ncache.TopicDeleteEventArgs):
# Perform operations
print("Deleted topic is " + args.get_topic_name())
def on_message_delivery_failure(sender: object, args: ncache.MessageFailedEventArgs):
# Perform operations
print("Failure reason is " + str(args.get_message_failure_reason()))
def on_message_received(sender: object, args: ncache.MessageEventArgs):
# Perform operations
print("Message received from topic " + args.get_topic_name())
Weitere Informationen
NCache stellt eine Beispielanwendung für Pub/Sub bereit GitHub.
Siehe auch
.NETZ: Alachisoft.NCache.Runtime.Caching Namespace.
Java: com.alachisoft.ncache.runtime.caching Namespace.
Node.js: Betreff Klasse.
Python: ncache.Kundenservice Klasse.