Publicar mensajes en un tema en un modelo Pub/Sub
El ITopic
/Topic
La interfaz en el modelo pub/Sub facilita la publicación de mensajes sobre el tema. Esto también proporciona registros de eventos para fallas en la entrega de mensajes, recepción de mensajes y eliminación de temas.
Note
Esta característica también está disponible en NCache Professional.
Proporciona el Publish
método, que publica el mensaje en un tema específico en el caché. Al publicar mensajes en un tema, el publicador puede configurar el opción de entrega para mensajes, caducidad del mensaje, notificación de falla en la entrega del mensajey notificación de eliminación de tema.
Aquí describimos cómo publicar mensajes, publicar mensajes de forma asíncrona, publicar mensajes masivos y publicar mensajes ordenados.
Requisitos previos
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, incluido el modelo pub/sub, consulte la página proporcionada en Requisitos previos de la API del lado del cliente.
- Para obtener detalles de la API, consulte: Dolor, Artículo de caché, ITema, Publicar, ObtenerTema, PublicarAsync, Tiempo de expiración, Fallo en la entrega de mensaje, SobreTemaEliminado, Opción de entrega, Mensaje, Publicar a granel, Mensaje fallidoEventArgs, TemaDeleteEventArgs.
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, incluido el modelo pub/sub, consulte la página proporcionada en Requisitos previos de la API del lado del cliente.
- Para obtener detalles de la API, consulte: cache, Artículo de caché, Tema, obtenerTema, publicar, Mensaje, establecer el tiempo de caducidad, Oyente de tema, agregarMessageDeliveryFailureListener, agregar tema eliminado oyente, Opción de entrega, Todos, publicarAsync, publicar a granel, TemaDeleteEventArgs, Mensaje fallidoEventArgs, MensajeEventArgs.
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, incluido el modelo pub/sub, consulte la página proporcionada en Requisitos previos de la API del lado del cliente.
- Para obtener detalles de la API, consulte: cache, Artículo de caché, Tema, obtenerTema, publicar, obtener servicio de mensajería, Espacio de tiempo, establecer el tiempo de caducidad, Oyente de tema, agregarMessageDeliveryFailureListener, agregar tema eliminado oyente, Opción de entrega, publicar a granel, MensajeRecibidoOyente.
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, incluido el modelo pub/sub, consulte la página proporcionada en Requisitos previos de la API del lado del cliente.
- Para obtener detalles de la API, consulte: cache, Artículo de caché, obtener_tema, get_messaging_service, Espacio de tiempo, establecer_tiempo_de_caducidad, add_message_delivery_failure_listener, add_topic_eliminado_escucha, publicar_async, publicar_en bloque, MensajeEventArgs, obtener_nombre_tema, Mensaje fallidoEventArgs, get_message_failure_reason, TemaDeleteEventArgs.
Publicar mensajes
El siguiente ejemplo de código hace lo siguiente:
- Cree temas dedicados para mensajes relacionados con pedidos.
- Registre el
MessageDeliveryFailure
evento para el tema.
- Registre el
OnTopicDeleted
evento para el tema.
- Cree mensajes para cada tema, habilitando opciones de vencimiento y entrega.
- Publicar los mensajes.
// Precondition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
orderTopic.Publish(orderMessage, DeliveryOption.All, true);
}
else
{
// No topic exists
}
// Precondition: Cache is already connected
// Already existing topic
String topicName = "orderTopic";
// Get topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, DeliveryOption.All, true);
} else {
// No topic exists
}
// This is an async method
// Precondition: Cache is already connected
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (!(orderTopic == null)) {
// Create object to be sent in message
let order = await this.fetchOrdersFromDb(this.orderId);
// Create a new message with the object order
let orderMessage = new ncache.Message(order);
// Set the expiration time of the message
let expiryTime = new ncache.TimeSpan(12, 12, 12);
orderMessage.setExpirationTime(expiryTime);
let topicListener = new ncache.MyTopicListener();
// Register message delivery failure
orderTopic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, ncache.DeliveryOption.All, true);
} else {
// No topic exists
}
# Precondition: Cache is already connected
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True)
else:
# No topic exists
print("Topic not found")
Note
Para garantizar que la operación sea a prueba de fallas, se recomienda manejar cualquier posible excepción dentro de su aplicación, como se explica en Manejo de fallas.
Publicar de forma asíncrona
Los mensajes se pueden publicar sobre el tema de forma asíncrona usando PublicarAsync para que la aplicación no espere a que finalice la operación para realizar la siguiente operación. Al usuario se le devuelve el control inmediatamente para su posterior procesamiento.
El siguiente ejemplo le permite publicar un mensaje de forma asíncrona.
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
Task task = orderTopic.PublishAsync(orderMessage, DeliveryOption.All, true);
if(task.IsFaulted)
{
// Task Failed
}
}
else
{
// No topic exists
}
// Topic orderTopic already exists in the cache
String topicName = "orderTopic";
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
TimeScheduler.Task task = (TimeScheduler.Task) orderTopic.publishAsync(orderMessage, DeliveryOption.All, true);
if (task.IsCancelled()) {
// task cancelled
}
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
task = order_topic.publish_async(order_message, ncache.DeliveryOption.ALL, True)
# Use this task object as per your business logic
else:
# No topic exists
print("Topic not found")
Publicar mensajes masivos
Se pueden publicar varios mensajes en una sola llamada utilizando el Publicar a granel método. Esto mejora el rendimiento y el uso de la memoria, ya que la mayor parte de los mensajes se combinarán y publicarán en una sola llamada.
El siguiente código toma una instancia de un tema ya creado orderTopic
y muestra la publicación masiva de mensajes al tema.
// Topic "orderTopic" exists in cache
ITopic topic = cache.MessagingService.GetTopic("orderTopic");
if (topic != null)
{
// create dictionary for storing bulk
List<Tuple<Message, DeliveryOption>> messageList = new List<Tuple<Message, DeliveryOption>>();
Order[] orders = FetchOrdersFromDB();
for (int i = 0; i < 100; i++)
{
Message message = new Message(orders[i]);
message.ExpirationTime = TimeSpan.FromSeconds(10000);
messageList.Add(new Tuple<Message, DeliveryOption>(message, DeliveryOption.All));
}
// Register message delivery failure
topic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
topic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
// In case of failed publishing of messages, exceptions
// will be returned
IDictionary<Message, Exception> keys = topic.PublishBulk(messageList, true);
}
// topic already exists
String topicName = "orderTopic";
String customerID = "DUMON";
Topic topic = cache.getMessagingService().getTopic(topicName);
if (topic != null) {
// create dictionary for storing bulk
Map messageMap = new HashMap();
Order[] orders = fetchOrdersFromDb(customerID);
for (int i = 0; i < 100; i++) {
Message message = new Message(orders[i]);
message.setExpirationTime(TimeSpan.FromSeconds(10000));
messageMap.put(message, DeliveryOption.All);
}
MyTopicListener topicListener = new MyTopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
Map<Message, Exception> keys = topic.publishBulk(messageMap, true);
} else {
// topic is null
}
// This is an async method
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let topic = ncache.getMessagingService().getTopic(topicName);
if (topic != null) {
// Create Map for storing messages in bulk
let messageMap = new Map();
let orders = this.fetchOrdersByCustomerId(this.customerId);
var i;
for (i = 0; i < 100; i++) {
let message = new ncache.Message(orders[i]);
message.setExpirationTime(ncache.TimeSpan.FromSeconds(10000));
messageMap.set(message, ncache.DeliveryOption.All);
}
let topicListener = new ncache.TopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
// In case of failed publishing of messages, exception will be returned
let keys = topic.publishBulk(messageMap, true);
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
topic = cache.get_messaging_service().get_topic(topic_name)
if topic is not None:
# Create Dict for storing messages in bulk
messages_map = {}
orders = fetch_orders_by_customer_id("ALFKI")
for i in range(0, 100):
message = ncache.Message(orders[++i])
message.set_expiration_time(ncache.TimeSpan.from_seconds(10000))
messages_map[message] = ncache.DeliveryOption.ALL
# Register message delivery failure
topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
# In case of failed publishing of messages, exception will be returned
keys = topic.publish_bulk(messages_map, True)
else:
# No topic exists
print("Topic not found")
Publicar mensajes ordenados
Note
Esta función solo está disponible en NCache 5.2 y adelante
Los mensajes se pueden publicar mencionando un nombre de secuencia, lo que da como resultado que los mensajes se publiquen en un orden específico. Para especificar mensajes ordenados, se agrega un nombre de secuencia de cadena a la cadena de mensajes que asegura publicar todos los mensajes que pertenecen a un nombre de secuencia específico en el mismo nodo del servidor.
En el ejemplo que se proporciona a continuación, se agrega un nombre de secuencia con los mensajes, y luego los mensajes se publican usando el Publish
método.
// Specify the topic name that already exists
string topicName = "orderTopic";
// Get the topic with the specified name
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (topicName != null)
{
for (int i = 0; i < 30; i++)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the messages
string sequenceName = "OrderMessages";
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publish message with the sequence name
orderTopic.Publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
}
else
{
// No topic found
}
// Specify the topic name that already exists
String topicName = "orderTopic";
// Get the topic with the specified name
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
for (int i = 0; i < 30; i++) {
// Create the object to be sent in the message
Order order = fetchOrdersFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the message
String sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
} else {
// No topic found
}
// This is an async method
// Specify the topic name that already exists
let topicName = "orderTopic";
// Get the topic with the specified name
let orderTopic = await this.cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
var i;
for (i = 0; i < 30; i++) {
// Create the object to be sent in the message
let order = await this.fetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new ncache.Message(order);
// Specify a unique sequence name for the message
let sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(ncache.TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(
orderMessage,
ncache.DeliveryOption.All,
sequenceName,
true
);
}
} else {
// No topic found
}
# Specify the topic name that already exists
topic_name = "orderTopic"
# Get the topic with the specified name
order_topic = cache.get_messaging_service().get_topic(topic_name)
if topic_name is not None:
for i in range(0, 30):
# Create the object to be sent in the message
order = fetch_order_from_db(10248)
# Create the new message with the object order
order_message = ncache.Message(order)
# Specify a unique sequence name for the message
sequence_name = "OrderMessage"
# Set the expiration time of the message
order_message.set_expiration_time(ncache.TimeSpan.from_seconds(5000))
# Publish message with the sequence name
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True, sequence_name)
else:
# No topic found
print("Topic not found")
Registrar devoluciones de llamada
private void OnFailureMessageReceived(object sender, MessageFailedEventArgs args)
{
// Failure reason can be get from args.MessageFailureReason
}
private void TopicDeleted(object sender, TopicDeleteEventArgs args)
{
// Deleted topic is args.TopicName
}
@Override
public void onTopicDeleted(Object sender, TopicDeleteEventArgs args) {
// Deleted topic is args.getTopicName();
}
@Override
public void onMessageDeliveryFailure(Object sender, MessageFailedEventArgs args) {
// Failure reason is args.getMessageFailureReason();
}
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// Perform operations
}
onTopicDeleted(sender, args) {
// Deleted topic is args.getTopicName()
}
onMessageDeliveryFailure(sender, args) {
// Failure reason is args.getMessageFailureReason()
}
onMessageReceived(sender, args) {
// Perform operations
}
def on_topic_deleted(sender: object, args: ncache.TopicDeleteEventArgs):
# Perform operations
print("Deleted topic is " + args.get_topic_name())
def on_message_delivery_failure(sender: object, args: ncache.MessageFailedEventArgs):
# Perform operations
print("Failure reason is " + str(args.get_message_failure_reason()))
def on_message_received(sender: object, args: ncache.MessageEventArgs):
# Perform operations
print("Message received from topic " + args.get_topic_name())
Recursos adicionales
NCache proporciona una aplicación de muestra para Pub/Sub en GitHub.
Vea también
.NETO: Alachisoft.NCache.Runtime.Caching espacio de nombres
Java: com.alachisoft.ncache.runtime.caching espacio de nombres
Nodo.js: Tema clase.
Pitón: ncache.servicios al cliente clase.