título: Mensajería Pub/Sub: Suscríbete a un tema descripción: Aprenda cómo suscribirse a mensajes de temas cuando usa el caché como tienda de mensajería Pub/Sub.
Mensajería Pub/Sub: suscríbete a un tema
En el modelo de mensajería Pub/Sub, el suscriptor (o aplicación cliente) puede registrarse en un tema en particular a través de una suscripción. Entonces, NCache proporciona múltiples tipos de suscripciones a temas de Pub/Sub.
Mensajería Pub/Sub: requisitos previos Suscripción al tema
Antes de suscribir un tema en el modelo de mensajería Pub/Sub, asegúrese de que se cumplan los siguientes requisitos previos:
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, consulte la página dada en Requisitos previos de la API del lado del cliente.
- Para obtener detalles de la API, consulte: Dolor, Artículo de caché, ITema, IDurableTopicSuscripción, Crear suscripción, Crear suscripción duradera,Nombre del tema, SuscripciónNombre, Servicio de mensajería instantánea, ObtenerTema, Política de suscripción, Darse de baja, MensajeEventArgs, Modo de entrega, Opciones de búsqueda de temas, ITopicSuscripción, Fallo en la entrega de mensaje, Carga útil.
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, consulte la página dada en Requisitos previos de la API del lado del cliente.
- Para obtener detalles de la API, consulte: cache, Artículo de caché, Tema, TemaSuscripción, obtener servicio de mensajería, obtenerTema, crearDurableSubscription, Política de suscripción, Espacio de tiempo, darse de baja, MensajeRecibidoOyente, enMensajeRecibido, MensajeEventArgs, obtener mensaje, obtener la carga útil, crear suscripción, Modo de entrega, Opciones de búsqueda de temas, agregarMessageDeliveryFailureListener.
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, consulte la página dada en Requisitos previos de la API del lado del cliente.
- Para obtener detalles de la API, consulte: cache, Artículo de caché, TemaSuscripción, obtener servicio de mensajería, obtenerTema, crearDurableSubscription, Política de suscripción, Espacio de tiempo, darse de baja, MensajeEventArgs, MensajeRecibidoOyente, obtener mensaje, obtener la carga útil, crear suscripción, Modo de entrega, Opciones de búsqueda de temas, Oyente de tema, agregarMessageDeliveryFailureListener.
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, consulte la página dada en Requisitos previos de la API del lado del cliente.
- Para obtener detalles de la API, consulte: cache, Artículo de caché, obtener_tema, get_messaging_service, crear_suscripción_duradera, obtener_carga útil, Espacio de tiempo, MensajeEventArgs, recibir_mensaje, crear_suscripción, darse de baja, add_message_delivery_failure_listener.
Métodos para crear una suscripción
Aquí describimos cómo puede crear una suscripción no duradera, una suscripción duradera, una suscripción múltiple y una suscripción asincrónica en el modelo de mensajería Pub/Sub.
Suscripciones no duraderas
Note
Esta característica también está disponible en NCache Professional.
El ITopic
/Topic
interfaz facilita la creación suscripción no duradera y publicación de mensajes sobre el tema. El método de creación de suscripción registra una suscripción no duradera contra un tema si el tema existe. Permite al suscriptor registrarse MessageReceivedCallback
contra el tema para que pueda recibir los mensajes publicados.
El siguiente ejemplo de código hace lo siguiente:
- Obtenga un tema de interés existente, es decir
NewBeverages
.
- Crear suscripción para cada tema.
- Registre eventos para que los suscriptores reciban mensajes una vez publicados en el tema.
// Precondition: Cache is already connected
// NewBeverages is the name of the topic created, beforehand
string topicName = "NewBeverages";
// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName);
// If topic exists, Create subscription
if (topic != null)
{
// Create and register subscribers for order topic
// Message received callback is specified
ITopicSubscription subscription = topic.CreateSubscription(MessageReceived);
}
// Precondition: Cache is already connected
// NewBeverages is the name of the topic created beforehand
String topicName = "NewBeverages";
// Get the topic
Topic ordertopic = cache.getMessagingService().getTopic(topicName);
// If topic exists, Create subscription
if (ordertopic != null) {
// Create and register subscribers for order topic
ordertopic.createSubscription(new PubSubMessageReceivedListener());
System.out.println("Subscriber registered for topic: " + ordertopic.getName());
} else {
System.out.println("Topic does not exist.");
}
// This is an async method
// Topic "NewBeverages" already exists in the cache
let topicName = "NewBeverages";
// Get the topic
let ordertopic = await ncache.getMessagingService().getTopic(topicName);
if (ordertopic != null) {
// Create and register subscribers for topic
// Message received callback is specified
let ordersubscriber = await ordertopic.createSubscription(ncache.onMessageReceived());
}
else {
// No topic exists
}
# Topic "NewBeverages" already exists in the cache
topic_name = "NewBeverages"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create and register subscribers for the topic
# Message received callback is specified
order_subscriber = order_topic.create_subscription(on_message_received)
else:
# No topic exists
print("Topic not found")
Note
Para garantizar que la operación sea a prueba de fallas, se recomienda manejar cualquier posible excepción dentro de su aplicación, como se explica en Manejo de fallas.
Suscripciones duraderas
Note
Esta característica también está disponible en NCache Professional.
El IDurableTopicSubscription
interfaz facilita la creación suscripciones duraderas y publicar mensajes sobre el tema si existe. Permite al suscriptor registrar un MessageReceivedCallback
contra el tema, para que pueda recibir los mensajes publicados.
Suscripciones duraderas compartidas
El siguiente código demuestra la política de suscripción compartida.
// DiscountedBeverages is the name of the topic created, beforehand
string topicName = "DiscountedBeverages";
string subscriptionName = "DiscountedBeveragesSubscription";
// Get the topic
ITopic ordertopic = cache.MessagingService.GetTopic(topicName);
if (ordertopic != null)
{
// Create and register subscribers for topic
// Message received callback is specified
// The subscription policy is shared which means that the subscription can have more than one subscribers
IDurableTopicSubscription ordersubscription = ordertopic.CreateDurableSubscription (subscriptionName, SubscriptionPolicy.Shared, MessageReceived, TimeSpan.FromMinutes(20));
}
// DiscountedBeverages is the name of the topic created beforehand
String topicName = "DiscountedBeverages";
Topic ordertopic = cache.getMessagingService().getTopic(topicName);
// Get the topic
if (ordertopic != null) {
// Create and register subscribers for topic
// The subscription policy is shared which means that the subscription can have more than one subscribers
System.out.println("Topic retrieved: " + ordertopic.getName());
ordertopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Shared, new PubSubMessageReceivedListener(), TimeSpan.FromMinutes(20));
System.out.println("Durable subscriber registered for topic: " + ordertopic.getName());
} else {
System.out.println("Topic does not exist.");
}
// Topic "DiscountedBeverages" already exists in the cache
let topicName = "DiscountedBeverages";
let subscriptionName = "DiscountedBeveragesSubscription";
// Get the topic
let ordertopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for order Topic
// Message received callback is specified below
// The subscription policy is Shared which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
// Topic "DiscountedBeverages" already exists in the cache
let topicName = "DiscountedBeverages";
let subscriptionName = "DiscountedBeveragesSubscription";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for orderTopic
// MessageReceived callback is specified below
// The subscription policy is Shared which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
Suscripciones duraderas exclusivas
El siguiente código demuestra la política de suscripción exclusiva.
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
// Create and register subscribers for Order topic
// The subscription policy is exclusive which means that the subscription can have only one subscriber
IDurableTopicSubscription orderSubscriber = orderTopic.CreateDurableSubscription(subscriptionName, SubscriptionPolicy.Exclusive, MessageReceived, TimeSpan.FromMinutes(20));
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
System.out.println("Topic retrieved: " + orderTopic.getName());
// Create and register subscribers for Order topic
// The subscription policy is exclusive which means that the subscription can have only one subscriber
orderTopic.createDurableSubscription(subscriptionName,SubscriptionPolicy.Exclusive, new PubSubMessageReceivedListener(),TimeSpan.FromMinutes(20));
System.out.println("Durable subscriber registered for topic: " + orderTopic.getName());
} else {
System.out.println("Topic does not exist.");
}
// Topic "orderTopic" already exists in the cache
let topicName = "orderTopic";
let subscriptionName = "orderTopicName";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for orderTopic
// The subscription policy is Exclusive which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Exclusive, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
# Topic "orderTopic" already exists in the cache
topic_name = "orderTopic"
subscription_name = "orderTopicName"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
# Create and register subscribers for orderTopic
# The subscription policy is exlusive which means that the subscription can have more than one subscriber
order_subscriber = order_topic.create_durable_subscription(subscription_name, ncache.SubscriptionPolicy.EXCLUSIVE, on_message_received, ncache.TimeSpan.from_minutes(20))
Varias suscripciones
Al utilizar este método de suscripción, los suscriptores pueden proporcionar patrones para suscribirse a múltiples temas con una sola llamada. Para ello, es importante que los temas que coincidan con el patrón ya existan en el servidor. Una vez que las suscripciones se crean correctamente en el tema basado en patrón, los suscriptores reciben mensajes publicados en temas que coinciden con el patrón. Además, si se crea un tema después de que la suscripción basada en patrón se haya registrado con el patrón, registrará a ese suscriptor con ese tema.
De manera similar, con respecto a cancelar el registro de un tema, este método cancela la suscripción del suscriptor de todos los temas coincidentes con el patrón proporcionado sin afectar ninguna otra suscripción que utilice esta llamada.
Note
- Un suscriptor solo puede obtener un tema basado en patrones y no puede crearlo.
- El editor puede usar un patrón solo para recibir notificaciones de error.
Comodines admitidos
El método de suscripción basado en patrones admite los tres comodines siguientes:
*
: cero o muchos caracteres. Por ejemplo, bl*
se suscribe a negro, azul y desenfoque, etc.
?
: cualquier carácter. Por ejemplo, h?t
se suscribe a hit, hot y hat, etc.
[]
: gama de caracteres. Por ejemplo, b[ae]t
se suscribe a apostar y batear, pero no poco.
Creación de una suscripción con comodín
El siguiente ejemplo crea una suscripción al proporcionar un patrón basado en el cual se suscribe el tema que coincide con el patrón.
// Create topic name for all topics with suffix Beverages
// Only ? * [] wildcards supported
string topicName = "*Beverages";
// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
// If topic exists, Create subscription
if (topic != null)
{
// Create and register subscribers for Order topic
ITopicSubscription subscription = topic.CreateSubscription(MessageReceived);
}
// Define topic name pattern
String topicNamePattern = "*ages";
// Get all topics that fulfill the pattern
Topic topic = cache.getMessagingService().getTopic(topicNamePattern, TopicSearchOptions.ByPattern);
if (topic != null) {
System.out.println("Matching topic retrieved: " + topic.getName());
// Create and register notifications for topic
topic.createSubscription(new PubSubMessageReceivedListener());
System.out.println("Subscription created for topic: " + topic.getName());
} else {
System.out.println("No matching topic found.");
}
// Topic "Beverages" exists in the cache
// Only ? * [] wildcards supported
let topicName = "*Beverages";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName, ncache.TopicSearchOptions.ByPattern);
// Create and register subscribers for orderTopic
let orderSubscriber = orderTopic.createSubscription(this.messageReceived());
# Topic "Beverages" exists in the cache
# Only ? * [] wildcards supported
topic_name = "*Beverages*"
subscription_name = "orderTopicName"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)
# Create and register subscribers for orderTopic
order_subscriber = order_topic.create_subscription(on_message_received)
Suscripciones asíncronas
El modo de entrega se puede especificar al crear suscripciones para mensajes pedidos y se puede sync
or async
. El siguiente ejemplo crea una suscripción usando CreateSubscription
método con el modo de entrega de sincronización. Se recomienda utilizar sync
modo para mensajes pedidos y async
Modo de lo contrario para lograr un alto rendimiento.
Al crear una suscripción, puede especificar el modo de entrega que puede ser síncrono o asíncrono. En particular, el modo de entrega síncrona se puede utilizar para mensajes ordenados, mientras que si no usa mensajes ordenados, el modo asíncrono mejora el rendimiento.
Note
De forma predeterminada, el modo de entrega de mensajes se establece en sincrónico.
// NewBeverages is the name of the topic created beforehand
string topicName = "NewBeverages";
// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName);
// If topic exists, Create subscription
if (topic != null)
{
// Create and register subscribers for Order topic
// Message received callback is specified
// DeliveryMode is set to async
ITopicSubscription subscription = topic.CreateSubscription(MessageReceived, DeliveryMode.Async);
}
// NewBeverages is the name of the topic created beforehand
String topicName = "NewBeverages";
// Get the topic
Topic topic = cache.getMessagingService().getTopic(topicName);
if (topic != null) {
// Create and register subscribers for Topic
// Message received callback is specified
// DeliveryMode is set to async
topic.createSubscription(new PubSubMessageReceivedListener(),DeliveryMode.Async);
} else {
System.out.println("Topic does not exist.");
}
// This is an async method
// Topic "NewBeverages" already exists in the cache
let topicName = "NewBeverages";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create and register subscribers for orderTopic
// Message received callback is specified
let orderSubscriber = await orderTopic.createSubscription(ncache.onMessageReceived(), ncache.DeliveryMode.Sync);
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
}
else {
// No topic exists
}
# Topic "NewBeverages" already exists in the cache
topic_name = "NewBeverages"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create and register subscribers for orderTopic
# Message received callback is specified
order_subscriber = order_topic.create_subscription(on_message_received, ncache.DeliveryMode.SYNC)
else:
# No topic exists
print("Topic not found")
Note
También puedes darte de baja de un tema.
Recursos adicionales
NCache proporciona una aplicación de muestra para Pub/Sub en GitHub.
Vea también
.NETO: Alachisoft.NCache.Runtime.Caching espacio de nombres
Java: com.alachisoft.ncache.runtime.caching espacio de nombres
Nodo.js: Clase de suscripción de tema clase.
Pitón: ncache.servicios al cliente clase.