• Webinars
  • Docs
  • Download
  • Blogs
  • Contact Us
Try Free
Show / Hide Table of Contents

Subscribe for Topic Messages

The ITopic/Topic interface facilitates creating non-durable subscription and publishing of messages against the topic. The IDurableTopicSubscription interface facilitates creating durable subscription and publishing of messages against the topic. These interfaces also provide event registrations for message delivery failure, receiving messages and deleting topics.

You can also add delivery mode while creating subscriptions that can be synchronous or asynchronous. Synchronous delivery mode can be used for ordered messages whereas if you are not using ordered messages, asynchronous mode improves performance. Delivery mode by default is set to synchronous.

Prerequisites

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
  • Install either of the following NuGet packages in your application based on your NCahce edition:
    • Enterprise: Alachisoft.NCache.SDK
    • Professional: Alachisoft.NCache.Professional.SDK
  • Include the following namespace in your application:
    • Alachisoft.NCache.Client
    • Alachisoft.NCache.Runtime
    • Alachisoft.NCache.Runtime.Exceptions
  • Cache must be running.
  • The application must be connected to cache before performing the operation.
  • Make sure that the data being added is serializable.
  • For API details, refer to: ICache, CacheItem, ITopic, IDurableTopicSubscription, CreateSubscription, CreateDurableSubscription.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.
  • Add the following Maven dependencies in your pom.xml file:
<dependency>
    <groupId>com.alachisoft.ncache</groupId>
    <!--for NCache Enterprise Edition--> 
    <artifactId>ncache-client</artifactId>
    <!--for NCache Professional Edition-->
    <artifactId>ncache-professional-client</artifactId>
    <version>x.x.x</version>
</dependency>
  • Import the following packages in your application:
    • import com.alachisoft.ncache.client.*;
    • import com.alachisoft.ncache.runtime.exceptions.*;
  • Cache must be running.
  • The application must be connected to cache before performing the operation.
  • Make sure that the data being added is serializable.
  • For API details, refer to: Cache, CacheItem, Topic, TopicSubscription.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.
  • Install and include either of the following modules in your application based on your NCache edition:
    • Enterprise: const ncache = require('ncache-client')
    • Professional: const ncache = require('ncache-professional-client')
  • Cache must be running.
  • The application must be connected to cache before performing the operation.
  • For API details, refer to: Cache, CacheItem, TopicSubscription.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.
  • Install the NCache Python client by executing the following command:
# Enterprise Client
pip install ncache-client

# Professional Client
pip install ncache-professional-client
  • Import the NCache module in your application.
  • Cache must be running.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.
  • Add the following Maven dependencies in your pom.xml file:
<dependency>
    <groupId>com.alachisoft.ncache</groupId>
    <!--for NCache Enterprise Edition--> 
    <artifactId>ncache-scala-client</artifactId>
    <!--for NCache Professional Edition-->
    <artifactId>ncache-scala-professional-client</artifactId> 
    <version>x.x.x</version>
</dependency>
  • Import the following packages in your application:
    • import com.alachisoft.ncache.scala.client.*;
  • Cache must be running.
  • The application must be connected to cache before performing the operation.
  • Make sure that the data being added is serializable.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.

Non-Durable Subscriptions

The CreateSubscription method registers a non-durable subscription against a topic if the topic exists. It allows the subscriber to register MessageReceivedCallback against the topic so that it can receive the published messages.

The following code sample does the following:

  1. Get existing topic of interest i.e. orderTopic.
  2. Create subscription for each topic.
  3. Register events for subscribers to receive messages once published to the topic.
  4. Unsubscribe subscribers registered to orderTopic.
  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-Condition: Cache is already connected

    // orderTopic is the name of the topic created beforehand
    string topicName = "orderTopic";

    // Get the topic
    ITopic orderTopic = cache.MessagingService.GetTopic(topicName);

    if (orderTopic != null)
    {
        // Create and register subscribers for Order topic
        // MessageReceived callback is specified
        ITopicSubscription orderSubscriber =
        orderTopic.CreateSubscription(MessageReceived);

        // Topics can also be unsubscribed
        orderSubscriber.UnSubscribe();
    }
    else
    {
        // No topic exists
    }
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
    {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
    {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}

//------------- MessageReceivedCallback --------------//
private void MessageReceived(object sender, MessageEventArgs args)
{
    // Perform operations

    if(args.Message.Payload is Order ord)
    {
        // Perform operations
    }
    else
    {
        // Message failed to receive
    }
}
try {
    // Precondition: Cache is already connected

    // Topic "orderTopic" already exists in the cache
    String topicName = "orderTopic";

    // Get the topic
    Topic orderTopic = cache.getMessagingService().getTopic(topicName);

    if (orderTopic != null) {
        // Create and register subscribers for orderTopic
        // Message received callback is specified
        TopicSubscription orderSubscriber = orderTopic.createSubscription(onMessageReceived());

        // Topics can also be unsubscribed
        orderSubscriber.unSubscribe();
    } else {
        // No topic exists
    }
} catch (OperationFailedException ex) {
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED) {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS) {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    } else {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
} catch (Exception ex) {
    // Any other generic exception like IllegalArgumentException or NullPointerException
}

//-------------- MessageReceivedCallback --------------//
private MessageReceivedListener messageReceived() {

    MessageReceivedListener messageReceivedListener = new MessageReceivedListener() {
        @Override
        public void onMessageReceived(Object o, MessageEventArgs messageEventArgs) {

            // perform operations

            boolean temp = messageEventArgs.getMessage().getPayload() instanceof Order;
            Order ord = temp ? (Order) messageEventArgs.getMessage().getPayload() : null;
            if (temp) {
                // Perform operations
            } else {
                // Message failed to receive
            }
        }
    };
    return messageReceivedListener;
}
// This is an async method
try {
    // Precondition: Cache is already connected

    // Topic "orderTopic" already exists in the cache
    let topicName = "orderTopic";

    // Get the topic
    let orderTopic = await ncache.getMessagingService().getTopic(topicName);

    if (orderTopic != null) {
        // Create and register subscribers for orderTopic
        // Message received callback is specified
        let orderSubscriber = await orderTopic.createSubscription(ncache.onMessageReceived());

        // Topics can also be unsubscribed
        orderSubscriber.unSubscribe();
    }
    else {
        // No topic exists
    }
}
catch (error) {
    // Handle any errors
}

// ---------- MessageReceivedCallback ----------- //
messageReceived(sender, messageEventArgs) {

    let messageReceivedListener = new ncache.MessageReceivedListener() {
        function onMessageReceived() {
            // perform operations

            let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
            let ord = temp ? messageEventArgs.getMessage().getPayload() : null;
            if (temp) {
                // Perform operations
            }
            else {
                // Message failed to receive
            }
        }
    };
    return messageReceivedListener;
}
try:
    # Precondition: Cache is already connected 

    # Topic "orderTopic" already exists in the cache
    topic_name = "orderTopic"

    # Get the topic 
    order_topic = cache.get_messaging_service().get_topic(topic_name)

    if order_topic is not None:
        # Create and register subscribers for orderTopic
        # Message received callback is specified
        order_subscriber = order_topic.create_subscription(on_message_received)

        # Topics can also be unsubscribed 
        order_subscriber.un_subscribe()
    else:
        # No topic exists
        print("Topic not found")
except Exception as exp:
    # Handle errors

# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
    # perform operations
    temp = isinstance(message_event_args.get_message().get_payload(), Order)

    order = message_event_args.get_message().get_payload() if temp else None

    if temp:
        # Perform operations
        print("Message is of order type")
    else:
        # Message failed to receive
        print("Message is of order type")
try {
    // Precondition: Cache is already connected


    // Topic "orderTopic" already exists in the cache
    val topicName = "orderTopic"

    // Specify Message received listener
    val onMessageReceived = MessageListener()

    // Get the topic
    val orderTopic = cache.getMessagingService.getTopic(topicName)

    if (orderTopic != null) {
      // Create and register subscribers for orderTopic
      // Message received callback is specified
      val orderSubscriber = orderTopic.createSubscription(onMessageReceived)

      // Topics can also be unsubscribed
      orderSubscriber.unSubscribe()
    }
    else {
      // No topic exists
    }
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}

Create Subscriptions with Delivery Mode

Delivery mode can be specified while creating subscriptions for ordered messages and can be sync or async. The following example creates subscription using CreateSubscription method with sync delivery mode. It is recommended to use sync mode for ordered messages and async mode otherwise to achieve high performance.

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-Condition: Cache is already connected

    // orderTopic is the name of the topic created beforehand
    string topicName = "orderTopic";

    // Get the topic
    ITopic orderTopic = cache.MessagingService.GetTopic(topicName);

    if (orderTopic != null)
    {
        // Create and register subscribers for Order topic
        // MessageReceived callback is specified
        // DeliveryMode is set to async
        ITopicSubscription orderSubscriber =
        orderTopic.CreateSubscription(MessageReceived, DeliveryMode.Async);

        // Topics can also be unsubscribed
        orderSubscriber.UnSubscribe();
    }
    else
    {
        // No topic exists
    }
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
    {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
    {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}

//------------- MessageReceivedCallback --------------//
private void MessageReceived(object sender, MessageEventArgs args) {
    // Perform operations

    if(args.Message.Payload is Order ord){
        // Perform operations
    }
    else
    {
        // Message failed to receive
    }
}
try {
    // Precondition: Cache is already connected

    // Topic "orderTopic" already exists in the cache
    String topicName = "orderTopic";

    // Get the topic
    Topic orderTopic = cache.getMessagingService().getTopic(topicName);

    if (orderTopic != null) {
        // Create and register subscribers for orderTopic
        // Message received callback is specified
        TopicSubscription orderSubscriber = orderTopic.createSubscription(onMessageReceived(), DeliveryMode.Async);

        // Topics can also be unsubscribed
        orderSubscriber.unSubscribe();
    } else {
        // No topic exists
    }
} catch (OperationFailedException ex) {
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED) {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS) {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    } else {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
} catch (Exception ex) {

    // Any other generic exception like IllegalArgumentException or NullPointerException
}

//-------------- MessageReceivedCallback --------------//
private MessageReceivedListener messageReceived() {

    MessageReceivedListener messageReceivedListener = new MessageReceivedListener() {

        @Override
        public void onMessageReceived(Object o, MessageEventArgs messageEventArgs) {

            // perform operations

            boolean temp = messageEventArgs.getMessage().getPayload() instanceof Order;
            Order ord = temp ? (Order) messageEventArgs.getMessage().getPayload() : null;
            if (temp) {
                // Perform operations
            } else {
                // Message failed to receive
            }
        }
    };
    return messageReceivedListener;
}
// This is an async method
try {
    // Precondition: Cache is already connected

    // Topic "orderTopic" already exists in the cache
    let topicName = "orderTopic";

    // Get the topic
    let orderTopic = await ncache.getMessagingService().getTopic(topicName);

    if (orderTopic != null) {

        // Create and register subscribers for orderTopic
        // Message received callback is specified
        let orderSubscriber = await orderTopic.createSubscription(ncache.onMessageReceived(), ncache.DeliveryMode.Sync);

        // Topics can also be unsubscribed
        orderSubscriber.unSubscribe();
    }
    else {
        // No topic exists
    }
}
catch (error) {
    // Handle any errors
}

// ---------- MessageReceivedCallback ----------- //
messageReceived(sender, messageEventArgs) {

    let messageReceivedListener = new ncache.MessageReceivedListener() {

        function onMessageReceived() {

            // perform operations

            let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
            let ord = temp ? messageEventArgs.getMessage().getPayload() : null;
            if (temp) {
                // Perform operations
            }
            else {
                // Message failed to receive
            }
        }
    };
    return messageReceivedListener;
}
try:
    # Precondition: Cache is already connected 

    # Topic "orderTopic" already exists in the cache
    topic_name = "orderTopic"

    # Get the topic 
    order_topic = cache.get_messaging_service().get_topic(topic_name)

    if order_topic is not None:
        # Create and register subscribers for orderTopic
        # Message received callback is specified
        order_subscriber = order_topic.create_subscription(on_message_received, ncache.DeliveryMode.SYNC)

        # Topics can also be unsubscribed 
        order_subscriber.un_subscribe()
    else:
        # No topic exists
        print("Topic not found")
except Exception as exp:
    # Handle errors

# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
    # perform operations
    temp = isinstance(message_event_args.get_message().get_payload(), Order)

    order = message_event_args.get_message().get_payload() if temp else None

    if temp:
        # Perform operations
        print("Message is of order type")
    else:
        # Message failed to receive
        print("Message is of order type")
try {
    // Precondition: Cache is already connected

    // Topic "orderTopic" already exists in the cache
    val topicName = "orderTopic"

    // Specify Message received listener
    val onMessageReceived = MessageListener()

    // Get the topic
    val orderTopic = cache.getMessagingService.getTopic(topicName)

    if (orderTopic != null) {
      // Create and register subscribers for orderTopic
      // Message received callback is specified
      val orderSubscriber = orderTopic.createSubscription(onMessageReceived, DeliveryMode.Async)

      // Topics can also be unsubscribed
      orderSubscriber.unSubscribe()
    }
    else {
      // No topic exists
    }
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}

Durable Subscriptions

The CreateDurableSubscription method registers a durable subscription against a topic if the topic exists. It allows the subscriber to register MessageReceivedCallback against the topic so that it can receive the published messages.

The following code sample does the following:

  1. Get existing topic of interest i.e. orderTopic.
  2. Create durable subscription with shared policy for each topic with a timespan for 20 minutes.
  3. Register events for subscribers to receive messages once published to the topic.
  4. Unsubscribe subscribers registered to orderTopic.
  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-Condition: Cache is already connected

    // orderTopic is the name of the topic created beforehand
    string topicName = "orderTopic";

    string subscriptionName = "orderTopicName";

    // Get the topic
    ITopic orderTopic = cache.MessagingService.GetTopic(topicName);

    // Create and register subscribers for Order topic
    // MessageReceived callback is specified below
    // The subscription policy is shared which means that
    // the subscription can have more than one subscribers
    IDurableTopicSubscription orderSubscriber =
        orderTopic.CreateDurableSubscription(subscriptionName,
        SubscriptionPolicy.Shared,
        MessageReceived, TimeSpan.FromMinutes(20));

    // Topics can also be unsubscribed
    orderSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
    {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
    {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}

// -------- MessageReceivedCallback ------------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
    // Perform operations

    if(args.Message.Payload is Order ord)
    {
        // Perform operations
    }
    else
    {
        // Message failed to receive
    }
}
try {
    // Precondition: cache is connected

    String topicName = "orderTopic";

    String subscriptionName = "productTopicName";

    // Get topic
    Topic orderTopic = cache.getMessagingService().getTopic(topicName);

    if (orderTopic != null) {
        // Create and register subscription for order topic
        TopicSubscription orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Shared,
                messageReceived(), TimeSpan.FromMinutes(20));

        // You can unsubscribe to topics
        orderSubscriber.unSubscribe();
    } else {
        // No topic exists
    }
} catch (OperationFailedException exception) {
    // Order topic does not exist
} catch (Exception exception) {
    // Any generic exception like IllegalArgumentException or NullPointerException
}

//-------------- MessageReceivedCallback --------------//
private MessageReceivedListener messageReceived() {

    MessageReceivedListener messageReceivedListener = new MessageReceivedListener() {

        @Override
        public void onMessageReceived(Object o, MessageEventArgs messageEventArgs) {
            // perform operations

            boolean temp = messageEventArgs.getMessage().getPayload() instanceof Order;
            Order ord = temp ? (Order) messageEventArgs.getMessage().getPayload() : null;
            if (temp) {
                // Perform operations
            } else {
                // Message failed to receive
            }
        }
    };
    return messageReceivedListener;
}
try {
    // Precondition: Cache is connected

    // Topic "orderTopic" already exists in the cache
    let topicName = "orderTopic";
    let subscriptionName = "orderTopicName";

    // Get the topic
    let orderTopic = ncache.getMessagingService().getTopic(topicName);

    // Create and register subscribers for orderTopic
    // MessageReceived callback is specified below
    // The subscription policy is Shared which means that the subscription can have more than one subscribers
    let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName,
        ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));

    // Topics can also be unsubscribed
    orderSubscriber.unSubscribe();
}
catch (error) {
    // Handle any errors
}

// -------- MessageReceivedCallback -------- //
messageReceived(o, messageEventArgs) {

    let messageReceivedListener = new ncache.MessageReceivedListener() {
        function onMessageReceived() {
            // perform operations

            let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
            let ord = temp ? messageEventArgs.getMessage().getPayload() : null;
            if (temp) {
                // Perform operations
            }
            else {
                // Message failed to receive
            }
        }
    };
    return messageReceivedListener;
}
try:
    # Precondition: Cache is connected 

    # Topic "orderTopic" already exists in the cache
    topic_name = "orderTopic"
    subscription_name = "orderTopicName"

    # Get the topic
    order_topic = cache.get_messaging_service().get_topic(topic_name)

    # Create and register subscribers for orderTopic
    # MessageReceived callback is specified below
    # The subscription policy is SHARED which means that the subscription can have more than one subscriber
    order_subscriber = order_topic.create_durable_subscription(subscription_name,
        ncache.SubscriptionPolicy.SHARED, on_message_received, ncache.TimeSpan.from_minutes(20))

    # Topics can also be unsubscribed 
    order_subscriber.un_subscribe()
except Exception as exp:
    # Handle errors

# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
    # perform operations
    temp = isinstance(message_event_args.get_message().get_payload(), Order)

    order = message_event_args.get_message().get_payload() if temp else None

    if temp:
        # Perform operations
        print("Message is of order type")
    else:
        # Message failed to receive
        print("Message is of order type")
try {
    // Precondition: cache is connected

    val topicName = "orderTopic"
    val subscriptionName = "productTopicName"

    // Specify Message received listener
    val messageReceived = MessageListener()

    // Get topic
    val orderTopic = cache.getMessagingService.getTopic(topicName)

    if (orderTopic != null) { // Create and register subscription for order topic
      val orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Shared, messageReceived, TimeSpan.FromMinutes(20))
      // You can unsubscribe to topics
      orderSubscriber.unSubscribe()
    }
    else {
      // No topic exists
    }
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}
Note

To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.

Pattern Based Subscriptions

NCache lets you search and subscribe to topics based on a specific pattern that you provide. This is known as a pattern based subscription. Pattern based subscriptions provided by NCache support the following wildcards as already explained in the Pub/Sub Subscriptions section:

  1. * : zero or many characters
  2. ? : any one character
  3. [] : specify a range

The following example provides a pattern based on which the topic is subscribed matching the pattern.

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-Condition: Cache is already connected

    // orderTopic is the name of the topic created beforehand
    // Only ? * [] wildcards supported
    string topicName = "order*";
    string subscriptionName = "orderTopicName";

    // Get the topic
    ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);

    // Create and register subscribers for Order topic
    ITopicSubscription orderSubscriber = orderTopic.CreateSubscription(MessageReceived);

    // Topics can also be unsubscribed
    orderSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
    {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
    {
        // Active subscription with this name already exists
        // Specific to Exclusive subscription
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
    {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}

// -------- MessageReceivedCallback ---------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
    // Perform operations

    if(args.Message.Payload is Order ord)
    {
        // Perform operations
    }
    else
    {
        // Message failed to receive
    }
}
try {
    // Precondition: cache is connected

    String topicName = "$GeneralEvents";

    String subscriptionName = "orderTopicName";

    // Get all topics that fulfill the pattern
    Topic orderTopic = cache.getMessagingService().getTopic(topicName, TopicSearchOptions.ByPattern);

    if (orderTopic != null) {

        // Create and register notifications for Order topic
        TopicSubscription orderSubscriber = orderTopic.createSubscription(messageReceived());

        // To unsubscribe to topics
        orderSubscriber.unSubscribe();
    } else {
        // No topic exists
    }
}
catch (OperationFailedException exception) {
    if (exception.getErrorCode() == NCacheErrorCodes.SUBSCRIPTION_EXISTS) {
        // Order topic does not exist
    }
    if (exception.getErrorCode() == NCacheErrorCodes.DEFAULT_TOPICS) {
        // Operation cannot be performed on default topic
    }
    if (exception.getErrorCode() == 20007) {
        // Topic is disposed
    }
    if (exception.getErrorCode() == 20008) {
        // Topic name cannot be null or an empty string
    } else {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
} catch (Exception exception) {
    // Any generic exception like IllegalArgumentException or NullPointerException
}

//-------------- MessageReceivedCallback --------------//
private MessageReceivedListener messageReceived() {

    MessageReceivedListener messageReceivedListener = new MessageReceivedListener() {

        @Override
        public void onMessageReceived(Object o, MessageEventArgs messageEventArgs) {
            // Perform operations

            boolean temp = messageEventArgs.getMessage().getPayload() instanceof Order;
            Order ord = temp ? (Order) messageEventArgs.getMessage().getPayload() : null;
            if (temp) {

                // Perform operations
            } else {

                // Message failed to receive
            }
        }
    };
    return messageReceivedListener;
}
try {
    // Precondition: Cache is already connected

    // Topic "orderTopic" exists in the cache
    // Only ? * [] wildcards supported
    let topicName = "order*";
    let subscriptionName = "orderTopicName";

    // Get the topic
    let orderTopic = ncache.getMessagingService().getTopic(topicName, ncache.TopicSearchOptions.ByPattern);

    // Create and register subscribers for orderTopic
    let orderSubscriber = orderTopic.createSubscription(this.messageReceived());

    // Topics can also be unsubscribed
    orderSubscriber.unSubscribe();
}
catch (error) {
    // Handle errors
}

// -------- MessageReceivedCallback --------- //
messageReceived(o, messageEventArgs) {

    let messageReceivedListener = new ncache.MessageReceivedListener() {
        function onMessageReceived() {

            // perform operations

            let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
            let ord = temp ? messageEventArgs.getMessage().getPayload() : null;
            if (temp) {
                // Perform operations
            }
            else {
                // Message failed to receive
            }
        }
    };
    return messageReceivedListener;
}
try:
    # Precondition: Cache is already connected 

    # Topic "orderTopic" exists in the cache
    # Only ? * [] wildcards supported 
    topic_name = "order*"
    subscription_name = "orderTopicName"

    # Get the topic
    order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)

    # Create and register subscribers for orderTopic
    order_subscriber = order_topic.create_subscription(on_message_received)

    # Topics can also be unsubscribed 
    order_subscriber.un_subscribe()
except Exception as exp:
    # Handle errors

# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
    # perform operations
    temp = isinstance(message_event_args.get_message().get_payload(), Order)

    order = message_event_args.get_message().get_payload() if temp else None

    if temp:
        # Perform operations
        print("Message is of order type")
    else:
        # Message failed to receive
        print("Message is of order type")
try {
    // Precondition: cache is connected

    val topicName = "$GeneralEvents"

    val subscriptionName = "orderTopicName"

    // Specify Message received listener
    val messageReceived = MessageListener()

    // Get all topics that fulfill the pattern
    val orderTopic = cache.getMessagingService.getTopic(topicName, TopicSearchOptions.ByPattern)

    if (orderTopic != null) { // Create and register notifications for Order topic
      val orderSubscriber = orderTopic.createSubscription(messageReceived)
      // To unsubscribe to topics
      orderSubscriber.unSubscribe()
    }
    else {
      // No topic exists
    }
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}

Pattern Based Durable Subscription

Durable Pattern-based Subscription combines the characteristics of Pattern-based Subscriptions and Durable Subscriptions. In this mode of subscription, the subscriber can receive pattern-based messages. In case this subscriber gets disconnected and rejoins the network, it will receive any messages that it may have missed during disconnection based on the pattern of the topic it subscribed to.

The following example provides a pattern based on which the topic is subscribed matching the pattern and the subscription made is durable subscription.

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-Condition: Cache is already connected

    // orderTopic is the name of the topic created beforehand
    // Only ? * [] wildcards supported
    string topicName = "order*";

    string subscriptionName = "orderTopicName";

    // Get the topic
    ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);

    // Create and register subscribers for Order topic
    // MessageReceived callback is specified below
    // The subscription policy is exclusive which means that
    // one subscription can have only one subscriber at a time
    IDurableTopicSubscription orderSubscriber =
        orderTopic.CreateDurableSubscription(subscriptionName,
        SubscriptionPolicy.Exclusive,
        MessageReceived, TimeSpan.FromMinutes(20));

    // Topics can also be unsubscribed
    orderSubscriber.UnSubscribe();
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
    {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
    {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    }
    if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
    {
        // Active subscription with this name already exists
        // Specific to Exclusive subscription
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}

// -------- MessageReceivedCallback -------------- //
private void MessageReceived(object sender, MessageEventArgs args)
{
    // Perform operations

    if(args.Message.Payload is Order ord)
    {
        // Perform operations
    }
    else
    {
        // Message failed to receive
    }
}
try {
    // Precondition: cache is connected

    String topicName = "order?opic";
    String subscriptionName = "orderTopicName";

    // Get the topic
    Topic orderTopic = cache.getMessagingService().getTopic(topicName, TopicSearchOptions.ByPattern);

    if (orderTopic != null) {

        // Create and register subscribers for order topic
        TopicSubscription orderSubscriber = orderTopic.createDurableSubscription(subscriptionName,
                SubscriptionPolicy.Exclusive, messageReceived(), TimeSpan.FromMinutes(20));

        // Topics can also be unsubscribed
        orderSubscriber.unsubscribe();
    } else {

        // Topic does not exist
    }
} catch (OperationFailedException exception) {
    if (exception.getErrorCode() == NCacheErrorCodes.TOPIC_DISPOSED) {
        // Specified topic has been disposed
    }
    if (exception.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS) {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    }
    if (exception.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS) {
        // Active subscription with this name already exists
        // Specific to Exclusive subscription
    } else {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
} catch (Exception exception) {
    // Any generic exception like IllegalArgumentException or NullPointerException
}
// This is an async method
try {
    // Precondition: Cache is already connected

    // Topic "orderTopic" exists in the cache
    // Only ? * [] wildcards supported
    let topicName = "order?opic";
    let subscriptionName = "orderTopicName";

    // Get the topic
    let orderTopic = await ncache.getMessagingService().getTopic(topicName, ncache.TopicSearchOptions.ByPattern);

    // Create and register subscribers for orderTopic
    // MessageReceived callback is specified below
    // The subscription policy is exclusive which means one subscription can have only one subscriber at a time
    let orderSubscriber = await orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Exclusive, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));

    // Topics can also be unsubscribed
    orderSubscriber.unSubscribe();
}
catch (error) {
    // Handle any errors
}

// --------- MessageReceivedCallback ------- //
messageReceived(o, messageEventArgs) {

    let messageReceivedListener = new ncache.MessageReceivedListener(){
        function onMessageReceived() {
            // perform operations

            let temp = messageEventArgs.getMessage().getPayload() instanceof Orders;
            let ord = temp ? messageEventArgs.getMessage().getPayload() : null;

            if (temp) {
                // Perform operations
            }
            else {
                // Message failed to receive
            }
        }
    };
    return messageReceivedListener;
}
try:
    # Precondition: Cache is already connected

    # Topic "orderTopic" exists in the cache 
    # Only ? * [] wildcards supported
    topic_name = "order?opic"
    subscription_name = "orderTopicName"

    # Get the topic
    order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)

    # Create and register subscribers for orderTopic
    # MessageReceived callback is specified below
    # The subscription policy is exclusive which means one subscription can have only one subscriber at a time
    order_subscriber = order_topic.create_durable_subscription(subscription_name, ncache.SubscriptionPolicy.EXCLUSIVE, on_message_received, ncache.TimeSpan.from_minutes(20))

    # Topics can also be unsubscribed 
    order_subscriber.un_subscribe()
except Exception as exp:
    # Handle errors

# ---------- MessageReceivedCallback ----------- #
def on_message_received(sender: object, message_event_args: ncache.MessageEventArgs):
    # perform operations
    temp = isinstance(message_event_args.get_message().get_payload(), Order)

    order = message_event_args.get_message().get_payload() if temp else None

    if temp:
        # Perform operations
        print("Message is of order type")
    else:
        # Message failed to receive
        print("Message is of order type")
try {
    // Precondition: cache is connected

    val topicName = "order?opic"
    val subscriptionName = "orderTopicName"

    // Specify Message received listener
    val messageReceived = MessageListener()

    // Get the topic
    val orderTopic = cache.getMessagingService.getTopic(topicName, TopicSearchOptions.ByPattern)

    if (orderTopic != null) { // Create and register subscribers for order topic
      val orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Exclusive, messageReceived, TimeSpan.FromMinutes(20))
      // Topics can also be unsubscribed
      orderSubscriber.unSubscribe()
    }
    else {
      // Topic does not exist
    }
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}

Pattern Based Subscription with Failure Notification

Pattern based subscriptions can also be made with a failure notification registered with them. The following example subscribes to topics which fulfil the pattern with callback registered for message delivery failure.

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-Condition: Cache is already connected

    // orderTopic is the name of the topic created beforehand
    // Only ? * [] wildcards supported
    string topicName = "*Topic";

    // Get the topic
    ITopic orderTopic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);
    if (orderTopic != null)
    {
        // Register message delivery failure
        // Since the instances of pattern based topics are get and registered for message failure
        // Message failure will be notified for these topics
        orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
    }
    else
    {
        // No topic exists
    }
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
    {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
    {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    }
    if (ex.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS)
    {
        // Active subscription with this name already exists
        // Specific to Exclusive subscription
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}
try {
    // precondition: cache is connected
    String topicName = "*Topic";

    // Get topic
    Topic orderTopic = cache.getMessagingService().getTopic(topicName, TopicSearchOptions.ByPattern);

    if (orderTopic != null) {

        MyTopicListener topicListener = new MyTopicListener();
        // register message delivery failure
        orderTopic.addMessageDeliveryFailureListener(topicListener);
    } else {

        // No topic exists
    }
} catch (OperationFailedException exception) {
    if (exception.getErrorCode() == NCacheErrorCodes.TOPIC_DISPOSED) {
        // Specified topic has been disposed
    }
    if (exception.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS) {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    }
    if (exception.ErrorCode == NCacheErrorCodes.SUBSCRIPTION_EXISTS) {
        // Active subscription with this name already exists
        // Specific to Exclusive subscription
    } else {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
} catch (Exception exception) {
    // Any generic exception like IllegalArgumentException or NullPointerException
}
try {
  // Precondition: Cache is already connected

  // Topic "orderTopic" exists in the cache
  // Only ? * [] wildcards supported
  let topicName = "orderTopic";

  // Get the topic
  let orderTopic = ncache
    .getMessagingService()
    .getTopic(topicName, ncache.TopicSearchOptions.ByPattern);

  if (orderTopic != null) {
    let topicListener = new ncache.TopicListener();

    // Register message delivery failure
    // Since the instances of patter based topics are get and registered for message failure
    // message failure will be notified for these topics
    orderTopic.addMessageDeliveryFailureListener(topicListener);
  } else {
    // No topic exists
  }
} catch (error) {
  // Handle any errors
}
try:
    # Precondition: Cache is already connected

    # Topic "orderTopic" exists in the cache
    # Only ? * [] wildcards supported 
    topic_name = "order?opic"

    # Get the topic
    order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)

    if order_topic is not None:
        # Register message delivery failure 
        # Since the instances of pattern based topics are get and registered for message failure,
        # message failure will be notified for these topics
        order_topic.add_message_delivery_failure_listener(topic_listener)
    else:
        # No topic exists
        print("Topic not found")
except Exception as exp:
    # Handle errors
try {
    // precondition: cache is connected

    val topicName = "*Topic"

    // Get topic
    val orderTopic = cache.getMessagingService.getTopic(topicName, TopicSearchOptions.ByPattern)

    if (orderTopic != null) {
      val topicListener = TopicEventListener()

      // register message delivery failure
      orderTopic.addMessageDeliveryFailureListener(topicListener)
    }
    else {
      // No topic exists
    }
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}
Note

To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.

Additional Resources

NCache provides sample application for Pub/Sub on GitHub.

See Also

Event Notifications in Cache
Pub/Sub Topics
Pub/Sub Messages
Publish Messages to Topic
Continuous Query

Back to top Copyright © 2017 Alachisoft