Subscribe for Topic Messages
The ITopic
/Topic
interface facilitates creating non-durable subscription and publishing of messages against the topic. The IDurableTopicSubscription
interface facilitates creating durable subscription and publishing of messages against the topic. These interfaces also provide event registrations for message delivery failure, receiving messages and deleting topics.
You can also add delivery mode while creating subscriptions that can be synchronous or asynchronous. Synchronous delivery mode can be used for ordered messages whereas if you are not using ordered messages, asynchronous mode improves performance. Delivery mode by default is set to synchronous.
Prerequisites
- Install either of the following NuGet packages in your application based on your NCahce edition:
- Include the following namespace in your application:
Alachisoft.NCache.Client
Alachisoft.NCache.Runtime
Alachisoft.NCache.Runtime.Exceptions
- Cache must be running.
- The application must be connected to cache before performing the operation.
- Make sure that the data being added is serializable.
- For API details, refer to: ICache, CacheItem, ITopic, IDurableTopicSubscription, CreateSubscription, CreateDurableSubscription.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
- To handle any unseen exceptions, refer to the Troubleshooting section.
- Add the following Maven dependencies in your pom.xml file:
<dependency>
<groupId>com.alachisoft.ncache</groupId>
<!--for NCache Enterprise Edition-->
<artifactId>ncache-client</artifactId>
<!--for NCache Professional Edition-->
<artifactId>ncache-professional-client</artifactId>
<version>x.x.x</version>
</dependency>
- Import the following packages in your application:
import com.alachisoft.ncache.client.*;
import com.alachisoft.ncache.runtime.exceptions.*;
- Cache must be running.
- The application must be connected to cache before performing the operation.
- Make sure that the data being added is serializable.
- For API details, refer to: Cache, CacheItem, Topic, TopicSubscription.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
- To handle any unseen exceptions, refer to the Troubleshooting section.
- Install and include either of the following modules in your application based on your NCache edition:
- Enterprise:
const ncache = require('ncache-client')
- Professional:
const ncache = require('ncache-professional-client')
- Cache must be running.
- The application must be connected to cache before performing the operation.
- For API details, refer to: Cache, CacheItem, TopicSubscription.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
- To handle any unseen exceptions, refer to the Troubleshooting section.
- Install the NCache Python client by executing the following command:
# Enterprise Client
pip install ncache-client
# Professional Client
pip install ncache-professional-client
- Import the NCache module in your application.
- Cache must be running.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
- To handle any unseen exceptions, refer to the Troubleshooting section.
- Add the following Maven dependencies in your
pom.xml
file:
<dependency>
<groupId>com.alachisoft.ncache</groupId>
<!--for NCache Enterprise Edition-->
<artifactId>ncache-scala-client</artifactId>
<!--for NCache Professional Edition-->
<artifactId>ncache-scala-professional-client</artifactId>
<version>x.x.x</version>
</dependency>
- Import the following packages in your application:
import com.alachisoft.ncache.scala.client.*;
- Cache must be running.
- The application must be connected to cache before performing the operation.
- Make sure that the data being added is serializable.
- To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
- To handle any unseen exceptions, refer to the Troubleshooting section.
Non-Durable Subscriptions
The CreateSubscription method registers a non-durable subscription against a topic if the topic exists. It allows the subscriber to register MessageReceivedCallback
against the topic so that it can receive the published messages.
The following code sample does the following:
- Get existing topic of interest i.e.
orderTopic
.
- Create subscription for each topic.
- Register events for subscribers to receive messages once published to the topic.
- Unsubscribe subscribers registered to
orderTopic
.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create and register subscribers for Order topic
// MessageReceived callback is specified
ITopicSubscription orderSubscriber =
orderTopic.CreateSubscription(MessageReceived);
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
//------------- MessageReceivedCallback --------------//
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Message failed to receive
}
}
try {
// Precondition: Cache is already connected
// Topic "orderTopic" already exists in the cache
String topicName = "orderTopic";
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create and register subscribers for orderTopic
// Message received callback is specified
TopicSubscription orderSubscriber = orderTopic.createSubscription(onMessageReceived());
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
} else {
// No topic exists
}
} catch (OperationFailedException ex) {
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED) {
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS) {
// Operation cannot be performed on default topics,
// Get user-defined topics instead
} else {
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
} catch (Exception ex) {
// Any other generic exception like IllegalArgumentException or NullPointerException
}
//-------------- MessageReceivedCallback --------------//
private MessageReceivedListener messageReceived() {
MessageReceivedListener messageReceivedListener = new MessageReceivedListener() {
@Override
public void onMessageReceived(Object o, MessageEventArgs messageEventArgs) {
// perform operations
boolean temp = messageEventArgs.getMessage().getPayload() instanceof Order;
Order ord = temp ? (Order) messageEventArgs.getMessage().getPayload() : null;
if (temp) {
// Perform operations
} else {
// Message failed to receive
}
}
};
return messageReceivedListener;
}
// This is an async method
try {
// Precondition: Cache is already connected
// Topic "orderTopic" already exists in the cache
let topicName = "orderTopic";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create and register subscribers for orderTopic
// Message received callback is specified
let orderSubscriber = await orderTopic.createSubscription(ncache.onMessageReceived());
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
}
else {
// No topic exists
}
}
catch (error) {
// Handle any errors
}
// ---------- MessageReceivedCallback ----------- //
messageReceived(sender, messageEventArgs) {
let messageReceivedListener = new ncache.MessageReceivedListener() {
function onMessageReceived() {
// perform operations
let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
let ord = temp ? messageEventArgs.getMessage().getPayload() : null;
if (temp) {
// Perform operations
}
else {
// Message failed to receive
}
}
};
return messageReceivedListener;
}
try:
# Precondition: Cache is already connected
# Topic "orderTopic" already exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create and register subscribers for orderTopic
# Message received callback is specified
order_subscriber = order_topic.create_subscription(on_message_received)
# Topics can also be unsubscribed
order_subscriber.un_subscribe()
else:
# No topic exists
print("Topic not found")
except Exception as exp:
# Handle errors
# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
# perform operations
temp = isinstance(message_event_args.get_message().get_payload(), Order)
order = message_event_args.get_message().get_payload() if temp else None
if temp:
# Perform operations
print("Message is of order type")
else:
# Message failed to receive
print("Message is of order type")
try {
// Precondition: Cache is already connected
// Topic "orderTopic" already exists in the cache
val topicName = "orderTopic"
// Specify Message received listener
val onMessageReceived = MessageListener()
// Get the topic
val orderTopic = cache.getMessagingService.getTopic(topicName)
if (orderTopic != null) {
// Create and register subscribers for orderTopic
// Message received callback is specified
val orderSubscriber = orderTopic.createSubscription(onMessageReceived)
// Topics can also be unsubscribed
orderSubscriber.unSubscribe()
}
else {
// No topic exists
}
}
catch {
case exception: Exception => {
// Handle any errors
}
}
Create Subscriptions with Delivery Mode
Delivery mode can be specified while creating subscriptions for ordered messages and can be sync
or async
. The following example creates subscription using CreateSubscription method with sync delivery mode. It is recommended to use sync
mode for ordered messages and async
mode otherwise to achieve high performance.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create and register subscribers for Order topic
// MessageReceived callback is specified
// DeliveryMode is set to async
ITopicSubscription orderSubscriber =
orderTopic.CreateSubscription(MessageReceived, DeliveryMode.Async);
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
//------------- MessageReceivedCallback --------------//
private void MessageReceived(object sender, MessageEventArgs args) {
// Perform operations
if(args.Message.Payload is Order ord){
// Perform operations
}
else
{
// Message failed to receive
}
}
try {
// Precondition: Cache is already connected
// Topic "orderTopic" already exists in the cache
String topicName = "orderTopic";
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create and register subscribers for orderTopic
// Message received callback is specified
TopicSubscription orderSubscriber = orderTopic.createSubscription(onMessageReceived(), DeliveryMode.Async);
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
} else {
// No topic exists
}
} catch (OperationFailedException ex) {
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED) {
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS) {
// Operation cannot be performed on default topics,
// Get user-defined topics instead
} else {
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
} catch (Exception ex) {
// Any other generic exception like IllegalArgumentException or NullPointerException
}
//-------------- MessageReceivedCallback --------------//
private MessageReceivedListener messageReceived() {
MessageReceivedListener messageReceivedListener = new MessageReceivedListener() {
@Override
public void onMessageReceived(Object o, MessageEventArgs messageEventArgs) {
// perform operations
boolean temp = messageEventArgs.getMessage().getPayload() instanceof Order;
Order ord = temp ? (Order) messageEventArgs.getMessage().getPayload() : null;
if (temp) {
// Perform operations
} else {
// Message failed to receive
}
}
};
return messageReceivedListener;
}
// This is an async method
try {
// Precondition: Cache is already connected
// Topic "orderTopic" already exists in the cache
let topicName = "orderTopic";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create and register subscribers for orderTopic
// Message received callback is specified
let orderSubscriber = await orderTopic.createSubscription(ncache.onMessageReceived(), ncache.DeliveryMode.Sync);
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
}
else {
// No topic exists
}
}
catch (error) {
// Handle any errors
}
// ---------- MessageReceivedCallback ----------- //
messageReceived(sender, messageEventArgs) {
let messageReceivedListener = new ncache.MessageReceivedListener() {
function onMessageReceived() {
// perform operations
let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
let ord = temp ? messageEventArgs.getMessage().getPayload() : null;
if (temp) {
// Perform operations
}
else {
// Message failed to receive
}
}
};
return messageReceivedListener;
}
try:
# Precondition: Cache is already connected
# Topic "orderTopic" already exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create and register subscribers for orderTopic
# Message received callback is specified
order_subscriber = order_topic.create_subscription(on_message_received, ncache.DeliveryMode.SYNC)
# Topics can also be unsubscribed
order_subscriber.un_subscribe()
else:
# No topic exists
print("Topic not found")
except Exception as exp:
# Handle errors
# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
# perform operations
temp = isinstance(message_event_args.get_message().get_payload(), Order)
order = message_event_args.get_message().get_payload() if temp else None
if temp:
# Perform operations
print("Message is of order type")
else:
# Message failed to receive
print("Message is of order type")
try {
// Precondition: Cache is already connected
// Topic "orderTopic" already exists in the cache
val topicName = "orderTopic"
// Specify Message received listener
val onMessageReceived = MessageListener()
// Get the topic
val orderTopic = cache.getMessagingService.getTopic(topicName)
if (orderTopic != null) {
// Create and register subscribers for orderTopic
// Message received callback is specified
val orderSubscriber = orderTopic.createSubscription(onMessageReceived, DeliveryMode.Async)
// Topics can also be unsubscribed
orderSubscriber.unSubscribe()
}
else {
// No topic exists
}
}
catch {
case exception: Exception => {
// Handle any errors
}
}
Durable Subscriptions
The CreateDurableSubscription method registers a durable subscription against a topic if the topic exists. It allows the subscriber to register MessageReceivedCallback
against the topic so that it can receive the published messages.
The following code sample does the following:
- Get existing topic of interest i.e.
orderTopic
.
- Create durable subscription with shared policy for each topic with a timespan for 20 minutes.
- Register events for subscribers to receive messages once published to the topic.
- Unsubscribe subscribers registered to
orderTopic
.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
// Create and register subscribers for Order topic
// MessageReceived callback is specified below
// The subscription policy is shared which means that
// the subscription can have more than one subscribers
IDurableTopicSubscription orderSubscriber =
orderTopic.CreateDurableSubscription(subscriptionName,
SubscriptionPolicy.Shared,
MessageReceived, TimeSpan.FromMinutes(20));
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
// -------- MessageReceivedCallback ------------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Message failed to receive
}
}
try {
// Precondition: cache is connected
String topicName = "orderTopic";
String subscriptionName = "productTopicName";
// Get topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create and register subscription for order topic
TopicSubscription orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Shared,
messageReceived(), TimeSpan.FromMinutes(20));
// You can unsubscribe to topics
orderSubscriber.unSubscribe();
} else {
// No topic exists
}
} catch (OperationFailedException exception) {
// Order topic does not exist
} catch (Exception exception) {
// Any generic exception like IllegalArgumentException or NullPointerException
}
//-------------- MessageReceivedCallback --------------//
private MessageReceivedListener messageReceived() {
MessageReceivedListener messageReceivedListener = new MessageReceivedListener() {
@Override
public void onMessageReceived(Object o, MessageEventArgs messageEventArgs) {
// perform operations
boolean temp = messageEventArgs.getMessage().getPayload() instanceof Order;
Order ord = temp ? (Order) messageEventArgs.getMessage().getPayload() : null;
if (temp) {
// Perform operations
} else {
// Message failed to receive
}
}
};
return messageReceivedListener;
}
try {
// Precondition: Cache is connected
// Topic "orderTopic" already exists in the cache
let topicName = "orderTopic";
let subscriptionName = "orderTopicName";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName);
// Create and register subscribers for orderTopic
// MessageReceived callback is specified below
// The subscription policy is Shared which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName,
ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
}
catch (error) {
// Handle any errors
}
// -------- MessageReceivedCallback -------- //
messageReceived(o, messageEventArgs) {
let messageReceivedListener = new ncache.MessageReceivedListener() {
function onMessageReceived() {
// perform operations
let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
let ord = temp ? messageEventArgs.getMessage().getPayload() : null;
if (temp) {
// Perform operations
}
else {
// Message failed to receive
}
}
};
return messageReceivedListener;
}
try:
# Precondition: Cache is connected
# Topic "orderTopic" already exists in the cache
topic_name = "orderTopic"
subscription_name = "orderTopicName"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
# Create and register subscribers for orderTopic
# MessageReceived callback is specified below
# The subscription policy is SHARED which means that the subscription can have more than one subscriber
order_subscriber = order_topic.create_durable_subscription(subscription_name,
ncache.SubscriptionPolicy.SHARED, on_message_received, ncache.TimeSpan.from_minutes(20))
# Topics can also be unsubscribed
order_subscriber.un_subscribe()
except Exception as exp:
# Handle errors
# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
# perform operations
temp = isinstance(message_event_args.get_message().get_payload(), Order)
order = message_event_args.get_message().get_payload() if temp else None
if temp:
# Perform operations
print("Message is of order type")
else:
# Message failed to receive
print("Message is of order type")
try {
// Precondition: cache is connected
val topicName = "orderTopic"
val subscriptionName = "productTopicName"
// Specify Message received listener
val messageReceived = MessageListener()
// Get topic
val orderTopic = cache.getMessagingService.getTopic(topicName)
if (orderTopic != null) { // Create and register subscription for order topic
val orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Shared, messageReceived, TimeSpan.FromMinutes(20))
// You can unsubscribe to topics
orderSubscriber.unSubscribe()
}
else {
// No topic exists
}
}
catch {
case exception: Exception => {
// Handle any errors
}
}
Note
To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Pattern Based Subscriptions
NCache lets you search and subscribe to topics based on a specific pattern that you provide. This is known as a pattern based subscription. Pattern based subscriptions provided by NCache support the following wildcards as already explained in the Pub/Sub Subscriptions section:
*
: zero or many characters
?
: any one character
[]
: specify a range
The following example provides a pattern based on which the topic is subscribed matching the pattern.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
// Only ? * [] wildcards supported
string topicName = "order*";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
// Create and register subscribers for Order topic
ITopicSubscription orderSubscriber = orderTopic.CreateSubscription(MessageReceived);
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
{
// Active subscription with this name already exists
// Specific to Exclusive subscription
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
// -------- MessageReceivedCallback ---------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Message failed to receive
}
}
try {
// Precondition: cache is connected
String topicName = "$GeneralEvents";
String subscriptionName = "orderTopicName";
// Get all topics that fulfill the pattern
Topic orderTopic = cache.getMessagingService().getTopic(topicName, TopicSearchOptions.ByPattern);
if (orderTopic != null) {
// Create and register notifications for Order topic
TopicSubscription orderSubscriber = orderTopic.createSubscription(messageReceived());
// To unsubscribe to topics
orderSubscriber.unSubscribe();
} else {
// No topic exists
}
}
catch (OperationFailedException exception) {
if (exception.getErrorCode() == NCacheErrorCodes.SUBSCRIPTION_EXISTS) {
// Order topic does not exist
}
if (exception.getErrorCode() == NCacheErrorCodes.DEFAULT_TOPICS) {
// Operation cannot be performed on default topic
}
if (exception.getErrorCode() == 20007) {
// Topic is disposed
}
if (exception.getErrorCode() == 20008) {
// Topic name cannot be null or an empty string
} else {
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
} catch (Exception exception) {
// Any generic exception like IllegalArgumentException or NullPointerException
}
//-------------- MessageReceivedCallback --------------//
private MessageReceivedListener messageReceived() {
MessageReceivedListener messageReceivedListener = new MessageReceivedListener() {
@Override
public void onMessageReceived(Object o, MessageEventArgs messageEventArgs) {
// Perform operations
boolean temp = messageEventArgs.getMessage().getPayload() instanceof Order;
Order ord = temp ? (Order) messageEventArgs.getMessage().getPayload() : null;
if (temp) {
// Perform operations
} else {
// Message failed to receive
}
}
};
return messageReceivedListener;
}
try {
// Precondition: Cache is already connected
// Topic "orderTopic" exists in the cache
// Only ? * [] wildcards supported
let topicName = "order*";
let subscriptionName = "orderTopicName";
// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName, ncache.TopicSearchOptions.ByPattern);
// Create and register subscribers for orderTopic
let orderSubscriber = orderTopic.createSubscription(this.messageReceived());
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
}
catch (error) {
// Handle errors
}
// -------- MessageReceivedCallback --------- //
messageReceived(o, messageEventArgs) {
let messageReceivedListener = new ncache.MessageReceivedListener() {
function onMessageReceived() {
// perform operations
let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
let ord = temp ? messageEventArgs.getMessage().getPayload() : null;
if (temp) {
// Perform operations
}
else {
// Message failed to receive
}
}
};
return messageReceivedListener;
}
try:
# Precondition: Cache is already connected
# Topic "orderTopic" exists in the cache
# Only ? * [] wildcards supported
topic_name = "order*"
subscription_name = "orderTopicName"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)
# Create and register subscribers for orderTopic
order_subscriber = order_topic.create_subscription(on_message_received)
# Topics can also be unsubscribed
order_subscriber.un_subscribe()
except Exception as exp:
# Handle errors
# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
# perform operations
temp = isinstance(message_event_args.get_message().get_payload(), Order)
order = message_event_args.get_message().get_payload() if temp else None
if temp:
# Perform operations
print("Message is of order type")
else:
# Message failed to receive
print("Message is of order type")
try {
// Precondition: cache is connected
val topicName = "$GeneralEvents"
val subscriptionName = "orderTopicName"
// Specify Message received listener
val messageReceived = MessageListener()
// Get all topics that fulfill the pattern
val orderTopic = cache.getMessagingService.getTopic(topicName, TopicSearchOptions.ByPattern)
if (orderTopic != null) { // Create and register notifications for Order topic
val orderSubscriber = orderTopic.createSubscription(messageReceived)
// To unsubscribe to topics
orderSubscriber.unSubscribe()
}
else {
// No topic exists
}
}
catch {
case exception: Exception => {
// Handle any errors
}
}
Pattern Based Durable Subscription
Durable Pattern-based Subscription combines the characteristics of Pattern-based Subscriptions and Durable Subscriptions. In this mode of subscription, the subscriber can receive pattern-based messages. In case this subscriber gets disconnected and rejoins the network, it will receive any messages that it may have missed during disconnection based on the pattern of the topic it subscribed to.
The following example provides a pattern based on which the topic is subscribed matching the pattern and the subscription made is durable subscription.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
// Only ? * [] wildcards supported
string topicName = "order*";
string subscriptionName = "orderTopicName";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
// Create and register subscribers for Order topic
// MessageReceived callback is specified below
// The subscription policy is exclusive which means that
// one subscription can have only one subscriber at a time
IDurableTopicSubscription orderSubscriber =
orderTopic.CreateDurableSubscription(subscriptionName,
SubscriptionPolicy.Exclusive,
MessageReceived, TimeSpan.FromMinutes(20));
// Topics can also be unsubscribed
orderSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
{
// Active subscription with this name already exists
// Specific to Exclusive subscription
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
// -------- MessageReceivedCallback -------------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
// Perform operations
if(args.Message.Payload is Order ord)
{
// Perform operations
}
else
{
// Message failed to receive
}
}
try {
// Precondition: cache is connected
String topicName = "order?opic";
String subscriptionName = "orderTopicName";
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName, TopicSearchOptions.ByPattern);
if (orderTopic != null) {
// Create and register subscribers for order topic
TopicSubscription orderSubscriber = orderTopic.createDurableSubscription(subscriptionName,
SubscriptionPolicy.Exclusive, messageReceived(), TimeSpan.FromMinutes(20));
// Topics can also be unsubscribed
orderSubscriber.unsubscribe();
} else {
// Topic does not exist
}
} catch (OperationFailedException exception) {
if (exception.getErrorCode() == NCacheErrorCodes.TOPIC_DISPOSED) {
// Specified topic has been disposed
}
if (exception.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS) {
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
if (exception.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS) {
// Active subscription with this name already exists
// Specific to Exclusive subscription
} else {
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
} catch (Exception exception) {
// Any generic exception like IllegalArgumentException or NullPointerException
}
// This is an async method
try {
// Precondition: Cache is already connected
// Topic "orderTopic" exists in the cache
// Only ? * [] wildcards supported
let topicName = "order?opic";
let subscriptionName = "orderTopicName";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName, ncache.TopicSearchOptions.ByPattern);
// Create and register subscribers for orderTopic
// MessageReceived callback is specified below
// The subscription policy is exclusive which means one subscription can have only one subscriber at a time
let orderSubscriber = await orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Exclusive, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
// Topics can also be unsubscribed
orderSubscriber.unSubscribe();
}
catch (error) {
// Handle any errors
}
// --------- MessageReceivedCallback ------- //
messageReceived(o, messageEventArgs) {
let messageReceivedListener = new ncache.MessageReceivedListener(){
function onMessageReceived() {
// perform operations
let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
let ord = temp ? messageEventArgs.getMessage().getPayload() : null;
if (temp) {
// Perform operations
}
else {
// Message failed to receive
}
}
};
return messageReceivedListener;
}
try:
# Precondition: Cache is already connected
# Topic "orderTopic" exists in the cache
# Only ? * [] wildcards supported
topic_name = "order?opic"
subscription_name = "orderTopicName"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)
# Create and register subscribers for orderTopic
# MessageReceived callback is specified below
# The subscription policy is exclusive which means one subscription can have only one subscriber at a time
order_subscriber = order_topic.create_durable_subscription(subscription_name, ncache.SubscriptionPolicy.EXCLUSIVE, on_message_received, ncache.TimeSpan.from_minutes(20))
# Topics can also be unsubscribed
order_subscriber.un_subscribe()
except Exception as exp:
# Handle errors
# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
# perform operations
temp = isinstance(message_event_args.get_message().get_payload(), Order)
order = message_event_args.get_message().get_payload() if temp else None
if temp:
# Perform operations
print("Message is of order type")
else:
# Message failed to receive
print("Message is of order type")
try {
// Precondition: cache is connected
val topicName = "order?opic"
val subscriptionName = "orderTopicName"
// Specify Message received listener
val messageReceived = MessageListener()
// Get the topic
val orderTopic = cache.getMessagingService.getTopic(topicName, TopicSearchOptions.ByPattern)
if (orderTopic != null) { // Create and register subscribers for order topic
val orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Exclusive, messageReceived, TimeSpan.FromMinutes(20))
// Topics can also be unsubscribed
orderSubscriber.unSubscribe()
}
else {
// Topic does not exist
}
}
catch {
case exception: Exception => {
// Handle any errors
}
}
Pattern Based Subscription with Failure Notification
Pattern based subscriptions can also be made with a failure notification registered with them. The following example subscribes to topics which fulfil the pattern with callback registered for message delivery failure.
try
{
// Pre-Condition: Cache is already connected
// orderTopic is the name of the topic created beforehand
// Only ? * [] wildcards supported
string topicName = "*Topic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
if (orderTopic != null)
{
// Register message delivery failure
// Since the instances of pattern based topics are get and registered for message failure
// Message failure will be notified for these topics
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
{
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
{
// Active subscription with this name already exists
// Specific to Exclusive subscription
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
try {
// precondition: cache is connected
String topicName = "*Topic";
// Get topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName, TopicSearchOptions.ByPattern);
if (orderTopic != null) {
MyTopicListener topicListener = new MyTopicListener();
// register message delivery failure
orderTopic.addMessageDeliveryFailureListener(topicListener);
} else {
// No topic exists
}
} catch (OperationFailedException exception) {
if (exception.getErrorCode() == NCacheErrorCodes.TOPIC_DISPOSED) {
// Specified topic has been disposed
}
if (exception.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS) {
// Operation cannot be performed on default topics,
// Get user-defined topics instead
}
if (exception.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS) {
// Active subscription with this name already exists
// Specific to Exclusive subscription
} else {
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
} catch (Exception exception) {
// Any generic exception like IllegalArgumentException or NullPointerException
}
try {
// Precondition: Cache is already connected
// Topic "orderTopic" exists in the cache
// Only ? * [] wildcards supported
let topicName = "orderTopic";
// Get the topic
let orderTopic = ncache
.getMessagingService()
.getTopic(topicName, ncache.TopicSearchOptions.ByPattern);
if (orderTopic != null) {
let topicListener = new ncache.TopicListener();
// Register message delivery failure
// Since the instances of patter based topics are get and registered for message failure
// message failure will be notified for these topics
orderTopic.addMessageDeliveryFailureListener(topicListener);
} else {
// No topic exists
}
} catch (error) {
// Handle any errors
}
try:
# Precondition: Cache is already connected
# Topic "orderTopic" exists in the cache
# Only ? * [] wildcards supported
topic_name = "order?opic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)
if order_topic is not None:
# Register message delivery failure
# Since the instances of pattern based topics are get and registered for message failure,
# message failure will be notified for these topics
order_topic.add_message_delivery_failure_listener(topic_listener)
else:
# No topic exists
print("Topic not found")
except Exception as exp:
# Handle errors
try {
// precondition: cache is connected
val topicName = "*Topic"
// Get topic
val orderTopic = cache.getMessagingService.getTopic(topicName, TopicSearchOptions.ByPattern)
if (orderTopic != null) {
val topicListener = TopicEventListener()
// register message delivery failure
orderTopic.addMessageDeliveryFailureListener(topicListener)
}
else {
// No topic exists
}
}
catch {
case exception: Exception => {
// Handle any errors
}
}
Note
To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Additional Resources
NCache provides sample application for Pub/Sub on GitHub.
See Also
Event Notifications in Cache
Pub/Sub Topics
Pub/Sub Messages
Publish Messages to Topic
Continuous Query