Publier des messages sur un sujet dans un modèle Pub/Sub
Les ITopic
/Topic
l'interface dans le modèle pub/Sub facilite la publication de messages sur le sujet. Cela fournit également des enregistrements d'événements pour l'échec de la livraison des messages, la réception de messages et la suppression de sujets.
Notes
Cette fonctionnalité est également disponible dans NCache Professional.
Il fournit Publish
méthode, qui publie le message dans une rubrique spécifique du cache. Lors de la publication de messages dans un sujet, l'éditeur peut définir le option de livraison pour les messages, expiration des messages, notification d'échec de livraison de messageet notification de suppression de sujet.
Ici, nous décrivons comment publier des messages, publier des messages de manière asynchrone, publier des messages en masse ainsi que le publier des messages ordonnés.
Pré-requis
- Pour en savoir plus sur les prérequis standard requis pour travailler avec tous NCache fonctionnalités côté client, y compris le modèle pub/sub, veuillez vous référer à la page donnée sur Prérequis de l'API côté client.
- Pour plus de détails sur l'API, reportez-vous à : ICache, CacheItem, ISujet, Publier, Obtenir le sujet, PublierAsync, Date d'expiration, MessageDeliveryFailureMessageDeliveryFailure, SurSujetSupprimé, Option de livraison, Message, Publier en masse, MessageFailedEventArgsMessageFailedEventArgs, SujetDeleteEventArgs.
- Pour en savoir plus sur les prérequis standard requis pour travailler avec tous NCache fonctionnalités côté client, y compris le modèle pub/sub, veuillez vous référer à la page donnée sur Prérequis de l'API côté client.
- Pour plus de détails sur l'API, reportez-vous à : Cache, CacheItem, Sujet, obtenir le sujet, publier, Message, setExpirationTime, SujetListener, addMessageDeliveryFailureListener, addTopicDeletedListenerAddTopicDeletedListener, Option de livraison, Tous, publierAsynchrone, publier en bloc, SujetDeleteEventArgs, MessageFailedEventArgsMessageFailedEventArgs, MessageEventArgs.
- Pour en savoir plus sur les prérequis standard requis pour travailler avec tous NCache fonctionnalités côté client, y compris le modèle pub/sub, veuillez vous référer à la page donnée sur Prérequis de l'API côté client.
- Pour plus de détails sur l'API, reportez-vous à : Cache, CacheItem, Sujet, obtenir le sujet, publier, getMessagingServicegetMessagingService, Durée, setExpirationTime, SujetListener, addMessageDeliveryFailureListener, addTopicDeletedListenerAddTopicDeletedListener, Option de livraison, publier en bloc, MessageReceivedListenerMessageReceivedListener.
- Pour en savoir plus sur les prérequis standard requis pour travailler avec tous NCache fonctionnalités côté client, y compris le modèle pub/sub, veuillez vous référer à la page donnée sur Prérequis de l'API côté client.
- Pour plus de détails sur l'API, reportez-vous à : Cache, CacheItem, get_topic, get_messaging_service, Durée, set_expiration_time, add_message_delivery_failure_listener, add_topic_deleted_listener, publier_asynchrone, publier_en masse, MessageEventArgs, get_topic_name, MessageFailedEventArgsMessageFailedEventArgs, get_message_failure_reason, SujetDeleteEventArgs.
Publier des messages
L'exemple de code suivant effectue les opérations suivantes :
- Créez des sujets dédiés pour les messages liés à la commande.
- Enregistrez le
MessageDeliveryFailure
événement pour le sujet.
- Enregistrez le
OnTopicDeleted
événement pour le sujet.
- Créez des messages pour chaque sujet, en activant les options d'expiration et de livraison.
- Publiez les messages.
// Precondition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
orderTopic.Publish(orderMessage, DeliveryOption.All, true);
}
else
{
// No topic exists
}
// Precondition: Cache is already connected
// Already existing topic
String topicName = "orderTopic";
// Get topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, DeliveryOption.All, true);
} else {
// No topic exists
}
// This is an async method
// Precondition: Cache is already connected
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (!(orderTopic == null)) {
// Create object to be sent in message
let order = await this.fetchOrdersFromDb(this.orderId);
// Create a new message with the object order
let orderMessage = new ncache.Message(order);
// Set the expiration time of the message
let expiryTime = new ncache.TimeSpan(12, 12, 12);
orderMessage.setExpirationTime(expiryTime);
let topicListener = new ncache.MyTopicListener();
// Register message delivery failure
orderTopic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, ncache.DeliveryOption.All, true);
} else {
// No topic exists
}
# Precondition: Cache is already connected
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True)
else:
# No topic exists
print("Topic not found")
Notes
Pour garantir la sécurité de l'opération, il est recommandé de gérer toutes les exceptions potentielles au sein de votre application, comme expliqué dans Gestion des échecs.
Publier de manière asynchrone
Les messages peuvent être publiés sur le sujet de manière asynchrone à l'aide de PublierAsync afin que l'application n'attende pas la fin de l'opération pour effectuer l'opération suivante. L'utilisateur a le contrôle qui lui est rendu immédiatement pour un traitement ultérieur.
L'exemple suivant vous permet de publier un message de manière asynchrone.
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
Task task = orderTopic.PublishAsync(orderMessage, DeliveryOption.All, true);
if(task.IsFaulted)
{
// Task Failed
}
}
else
{
// No topic exists
}
// Topic orderTopic already exists in the cache
String topicName = "orderTopic";
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
TimeScheduler.Task task = (TimeScheduler.Task) orderTopic.publishAsync(orderMessage, DeliveryOption.All, true);
if (task.IsCancelled()) {
// task cancelled
}
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
task = order_topic.publish_async(order_message, ncache.DeliveryOption.ALL, True)
# Use this task object as per your business logic
else:
# No topic exists
print("Topic not found")
Publier des messages en masse
Plusieurs messages peuvent être publiés dans un seul appel à l'aide de la Publier en masse méthode. Cela améliore les performances et l'utilisation de la mémoire car un grand nombre de messages seront combinés et publiés en un seul appel.
Le code ci-dessous prend une instance d'un sujet déjà créé orderTopic
, et affiche la publication groupée de messages dans le sujet.
// Topic "orderTopic" exists in cache
ITopic topic = cache.MessagingService.GetTopic("orderTopic");
if (topic != null)
{
// create dictionary for storing bulk
List<Tuple<Message, DeliveryOption>> messageList = new List<Tuple<Message, DeliveryOption>>();
Order[] orders = FetchOrdersFromDB();
for (int i = 0; i < 100; i++)
{
Message message = new Message(orders[i]);
message.ExpirationTime = TimeSpan.FromSeconds(10000);
messageList.Add(new Tuple<Message, DeliveryOption>(message, DeliveryOption.All));
}
// Register message delivery failure
topic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
topic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
// In case of failed publishing of messages, exceptions
// will be returned
IDictionary<Message, Exception> keys = topic.PublishBulk(messageList, true);
}
// topic already exists
String topicName = "orderTopic";
String customerID = "DUMON";
Topic topic = cache.getMessagingService().getTopic(topicName);
if (topic != null) {
// create dictionary for storing bulk
Map messageMap = new HashMap();
Order[] orders = fetchOrdersFromDb(customerID);
for (int i = 0; i < 100; i++) {
Message message = new Message(orders[i]);
message.setExpirationTime(TimeSpan.FromSeconds(10000));
messageMap.put(message, DeliveryOption.All);
}
MyTopicListener topicListener = new MyTopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
Map<Message, Exception> keys = topic.publishBulk(messageMap, true);
} else {
// topic is null
}
// This is an async method
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let topic = ncache.getMessagingService().getTopic(topicName);
if (topic != null) {
// Create Map for storing messages in bulk
let messageMap = new Map();
let orders = this.fetchOrdersByCustomerId(this.customerId);
var i;
for (i = 0; i < 100; i++) {
let message = new ncache.Message(orders[i]);
message.setExpirationTime(ncache.TimeSpan.FromSeconds(10000));
messageMap.set(message, ncache.DeliveryOption.All);
}
let topicListener = new ncache.TopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
// In case of failed publishing of messages, exception will be returned
let keys = topic.publishBulk(messageMap, true);
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
topic = cache.get_messaging_service().get_topic(topic_name)
if topic is not None:
# Create Dict for storing messages in bulk
messages_map = {}
orders = fetch_orders_by_customer_id("ALFKI")
for i in range(0, 100):
message = ncache.Message(orders[++i])
message.set_expiration_time(ncache.TimeSpan.from_seconds(10000))
messages_map[message] = ncache.DeliveryOption.ALL
# Register message delivery failure
topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
# In case of failed publishing of messages, exception will be returned
keys = topic.publish_bulk(messages_map, True)
else:
# No topic exists
print("Topic not found")
Publier les messages commandés
Notes
Cette fonctionnalité est uniquement disponible dans NCache 5.2 et suivants
Les messages peuvent être publiés en mentionnant un nom de séquence, ce qui entraîne la publication des messages dans un ordre spécifique. Spécifier messages ordonnés, un nom de séquence de chaîne est ajouté à la chaîne des messages qui garantit la publication de tous les messages appartenant à un nom de séquence spécifique sur le même nœud de serveur.
Dans l'exemple ci-dessous, un nom de séquence est ajouté avec les messages, et les messages sont ensuite publiés à l'aide de la Publish
méthode.
// Specify the topic name that already exists
string topicName = "orderTopic";
// Get the topic with the specified name
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (topicName != null)
{
for (int i = 0; i < 30; i++)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the messages
string sequenceName = "OrderMessages";
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publish message with the sequence name
orderTopic.Publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
}
else
{
// No topic found
}
// Specify the topic name that already exists
String topicName = "orderTopic";
// Get the topic with the specified name
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
for (int i = 0; i < 30; i++) {
// Create the object to be sent in the message
Order order = fetchOrdersFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the message
String sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
} else {
// No topic found
}
// This is an async method
// Specify the topic name that already exists
let topicName = "orderTopic";
// Get the topic with the specified name
let orderTopic = await this.cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
var i;
for (i = 0; i < 30; i++) {
// Create the object to be sent in the message
let order = await this.fetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new ncache.Message(order);
// Specify a unique sequence name for the message
let sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(ncache.TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(
orderMessage,
ncache.DeliveryOption.All,
sequenceName,
true
);
}
} else {
// No topic found
}
# Specify the topic name that already exists
topic_name = "orderTopic"
# Get the topic with the specified name
order_topic = cache.get_messaging_service().get_topic(topic_name)
if topic_name is not None:
for i in range(0, 30):
# Create the object to be sent in the message
order = fetch_order_from_db(10248)
# Create the new message with the object order
order_message = ncache.Message(order)
# Specify a unique sequence name for the message
sequence_name = "OrderMessage"
# Set the expiration time of the message
order_message.set_expiration_time(ncache.TimeSpan.from_seconds(5000))
# Publish message with the sequence name
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True, sequence_name)
else:
# No topic found
print("Topic not found")
Enregistrer les rappels
private void OnFailureMessageReceived(object sender, MessageFailedEventArgs args)
{
// Failure reason can be get from args.MessageFailureReason
}
private void TopicDeleted(object sender, TopicDeleteEventArgs args)
{
// Deleted topic is args.TopicName
}
@Override
public void onTopicDeleted(Object sender, TopicDeleteEventArgs args) {
// Deleted topic is args.getTopicName();
}
@Override
public void onMessageDeliveryFailure(Object sender, MessageFailedEventArgs args) {
// Failure reason is args.getMessageFailureReason();
}
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// Perform operations
}
onTopicDeleted(sender, args) {
// Deleted topic is args.getTopicName()
}
onMessageDeliveryFailure(sender, args) {
// Failure reason is args.getMessageFailureReason()
}
onMessageReceived(sender, args) {
// Perform operations
}
def on_topic_deleted(sender: object, args: ncache.TopicDeleteEventArgs):
# Perform operations
print("Deleted topic is " + args.get_topic_name())
def on_message_delivery_failure(sender: object, args: ncache.MessageFailedEventArgs):
# Perform operations
print("Failure reason is " + str(args.get_message_failure_reason()))
def on_message_received(sender: object, args: ncache.MessageEventArgs):
# Perform operations
print("Message received from topic " + args.get_topic_name())
Ressources additionnelles
NCache fournit un exemple d'application pour Pub/Sub sur GitHub.
Voir aussi
.RAPPORTER: Alachisoft.NCache.Runtime.Caching espace de noms.
Java: com.alachisoft.ncache.runtime.caching espace de noms.
Node.js : Sujet classe.
python: ncache.services clients classe.