• Webinars
  • Docs
  • Download
  • Blogs
  • Contact Us
Try Free
Show / Hide Table of Contents

Subscribe to a Topic

A subscriber (or client application) registers itself to a topic of interest through a subscription. So, NCache provides multiple types of Pub/Sub subscriptions and different methods to subscribe to a topic.

Prerequisites

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: ICache, CacheItem, ITopic, IDurableTopicSubscription, CreateSubscription, CreateDurableSubscription,TopicName, SubscriptionName, IMessagingService, GetTopic, SubscriptionPolicy, UnSubscribe, MessageEventArgs, DeliveryMode, TopicSearchOptions, ITopicSubscription, MessageDeliveryFailure, PayLoad.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: Cache, CacheItem, Topic, TopicSubscription, getMessagingService, getTopic, createDurableSubscription, SubscriptionPolicy, TimeSpan, unSubscribe, MessageReceivedListener, onMessageReceived​, MessageEventArgs, getMessage, getPayload, createSubscription, DeliveryMode, TopicSearchOptions, addMessageDeliveryFailureListener.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: Cache, CacheItem, getTopic, getMessagingService, TimeSpan, unSubscribe, onMessageReceived, MessageReceivedListener, MessageEventArgs, getMessage, getTopic.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: Cache, CacheItem, TopicSubscription, getMessagingService, getTopic, createDurableSubscription, SubscriptionPolicy, TimeSpan, unSubscribe, MessageEventArgs, MessageReceivedListener, getMessage, getPayload, createSubscription, DeliveryMode, TopicSearchOptions, TopicListener, addMessageDeliveryFailureListener.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: Cache, CacheItem, get_topic, get_messaging_service, create_durable_subscription, get_payload, TimeSpan, MessageEventArgs, get_message, create_subscription, un_subscribe, add_message_delivery_failure_listener.

Methods to Create a Subscription

Here we describe how you can create non-durable subscription, durable subscription, multiple subscriptions, and asynchronous subscription.

Non-Durable Subscriptions

The ITopic/Topic interface facilitates creating non-durable subscription and publishing of messages against the topic. The CreateSubscription method registers a non-durable subscription against a topic if the topic exists. It allows the subscriber to register MessageReceivedCallback against the topic so that it can receive the published messages.

The following code sample does the following:

  1. Get existing topic of interest i.e. NewBeverages.
  2. Create subscription for each topic.
  3. Register events for subscribers to receive messages once published to the topic.
  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
try
{
    // Pre-Condition: Cache is already connected

    // NewBeverages is the name of the topic created beforehand
    string topicName = "NewBeverages";

    // Get the topic
    ITopic topic = cache.MessagingService.GetTopic(topicName);

    // If topic exists, Create subscription
    if (topic != null)
    {
        // Create and register subscribers for Order topic
        // MessageReceived callback is specified
        ITopicSubscription subscription = topic.CreateSubscription(MessageReceived);
    }
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
    {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS)
    {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}
try {
    // Precondition: cache is connected

    // Topic "NewBeverages" already exists in the cache
    String topicName = "NewBeverages";

    // Get the topic
    Topic ordertopic = cache.getMessagingService().getTopic(topicName);

    if (ordertopic != null) {
        // Create and register subscribers for topic
        // Message received callback is specified
        TopicSubscription ordersubscriber = ordertopic.createSubscription(onMessageReceived());

    } else {
        // No topic exists
    }
} catch (OperationFailedException exception) {
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED) {
        // Specified topic has been disposed
    }
    if (ex.ErrorCode == NCacheErrorCodes.DEFAULT_TOPICS) {
        // Operation cannot be performed on default topics,
        // Get user-defined topics instead
    } else {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
} catch (Exception exception) {
    // Any other generic exception like IllegalArgumentException or NullPointerException
}
try {
    // Topic "NewBeverages" already exists in the cache
    val topicName = "NewBeverages"

    // Specify Message received listener
    val onMessageReceived = MessageListener()

    // Get the ordertopic
    val ordertopic = cache.getMessagingService.getTopic(topicName)

    if (orderTopic != null) {
        // Create and register subscribers for topic
        // Message received callback is specified
        val ordersubscriber = ordertopic.createSubscription(onMessageReceived)
    }
    else {
        // No topic exists
    }
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}
try {
    // This is an async method

    // Topic "NewBeverages" already exists in the cache
    let topicName = "NewBeverages";

    // Get the topic
    let ordertopic = await ncache.getMessagingService().getTopic(topicName);

    if (ordertopic != null) {
        // Create and register subscribers for topic
        // Message received callback is specified
        let ordersubscriber = await ordertopic.createSubscription(ncache.onMessageReceived());
    }
    else {
        // No topic exists
    }
}
catch (error) {
    // Handle any errors
}
try:
    # Topic "NewBeverages" already exists in the cache
    topic_name = "NewBeverages"

    # Get the topic
    order_topic = cache.get_messaging_service().get_topic(topic_name)

    if order_topic is not None:
        # Create and register subscribers for the topic
        # Message received callback is specified
        order_subscriber = order_topic.create_subscription(on_message_received)
    else:
        # No topic exists
        print("Topic not found")
except Exception as exp:
    # Handle errors
Note

To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.

Durable Subscriptions

The IDurableTopicSubscription interface facilitates creating durable subscription and publishing of messages against the topic. The CreateDurableSubscription method registers a durable subscription against a topic if the topic exists. It allows the subscriber to register MessageReceivedCallback against the topic so that it can receive the published messages.

Shared Durable Subscriptions

The following code demonstrates the shared subscription policy.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
// DiscountedBeverages is the name of the topic created beforehand
string topicName = "DiscountedBeverages";

string subscriptionName = "DiscountedBeveragesSubscription";

// Get the topic
ITopic ordertopic = cache.MessagingService.GetTopic(topicName);

if (ordertopic != null)
{
    // Create and register subscribers for topic
    // MessageReceived callback is specified
    // The subscription policy is shared which means that
    // the subscription can have more than one subscribers
    IDurableTopicSubscription ordersubscription = ordertopic.CreateDurableSubscription (subscriptionName, SubscriptionPolicy.Shared, MessageReceived, TimeSpan.FromMinutes(20));
}
String topicName = "DiscountedBeverages";

String subscriptionName = "DiscountedBeveragesSubscription";

// Get topic
Topic ordertopic = cache.getMessagingService().getTopic(topicName);

if (ordertopic != null) {
    // Create and register subscription for order topic
    TopicSubscription ordersubscriber = ordertopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Shared, messageReceived(), TimeSpan.FromMinutes(20));
} else {
    // No topic exists
}
val topicName = "DiscountedBeverages"
val subscriptionName = "DiscountedBeveragesSubscription"

// Specify Message received listener
val messageReceived = MessageListener()

// Get topic
val ordertopic = cache.getMessagingService.getTopic(topicName)

if (ordertopic != null) { // Create and register subscription for order topic
val ordersubscriber = ordertopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Shared, messageReceived, TimeSpan.FromMinutes(20))
}
else {
    // No topic exists
}
// Topic "DiscountedBeverages" already exists in the cache
let topicName = "DiscountedBeverages";
let subscriptionName = "DiscountedBeveragesSubscription";

// Get the topic
let ordertopic = ncache.getMessagingService().getTopic(topicName);

// Create and register subscribers for orderTopic
// MessageReceived callback is specified below
// The subscription policy is Shared which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
// Topic "DiscountedBeverages" already exists in the cache
let topicName = "DiscountedBeverages";
let subscriptionName = "DiscountedBeveragesSubscription";

// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName);

// Create and register subscribers for orderTopic
// MessageReceived callback is specified below
// The subscription policy is Shared which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Shared, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));

Exclusive Durable Subscriptions

The following code demonstrates the exclusive subscription policy.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
// orderTopic is the name of the topic created beforehand
string topicName = "orderTopic";

string subscriptionName = "orderTopicName";

// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);

// Create and register subscribers for Order topic
// The subscription policy is exclusive which means that
// the subscription can have only one subscriber
IDurableTopicSubscription orderSubscriber = orderTopic.CreateDurableSubscription(subscriptionName, SubscriptionPolicy.Exclusive, MessageReceived, TimeSpan.FromMinutes(20));
String topicName = "orderTopic";

String subscriptionName = "productTopicName";

// Get topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);

if (orderTopic != null) {
    // Create and register subscription for order topic
    TopicSubscription orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Exclusive, messageReceived(), TimeSpan.FromMinutes(20));

} else {
    // No topic exists
}
val topicName = "orderTopic"
val subscriptionName = "productTopicName"

// Specify Message received listener
val messageReceived = MessageListener()

// Get topic
val orderTopic = cache.getMessagingService.getTopic(topicName)

if (orderTopic != null) { // Create and register subscription for order topic
    val orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, SubscriptionPolicy.Exclusive, messageReceived, TimeSpan.FromMinutes(20))
}
else {
    // No topic exists
}
// Topic "orderTopic" already exists in the cache
let topicName = "orderTopic";
let subscriptionName = "orderTopicName";

// Get the topic
let orderTopic = ncache.getMessagingService().getTopic(topicName);

// Create and register subscribers for orderTopic
// The subscription policy is Exclusive which means that the subscription can have more than one subscribers
let orderSubscriber = orderTopic.createDurableSubscription(subscriptionName, ncache.SubscriptionPolicy.Exclusive, this.messageReceived(), ncache.TimeSpan.FromMinutes(20));
}
# Topic "orderTopic" already exists in the cache
topic_name = "orderTopic"
subscription_name = "orderTopicName"

# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)

# Create and register subscribers for orderTopic
# The subscription policy is EXCLUSIVE which means that the subscription can have more than one subscriber
order_subscriber = order_topic.create_durable_subscription(subscription_name, ncache.SubscriptionPolicy.EXCLUSIVE, on_message_received, ncache.TimeSpan.from_minutes(20))

Multiple Subscriptions

Note

This feature is only available for NCache Enterprise edition.

Using this method of subscription, subscribers can provide patterns to subscribe to multiple topics with a single call. For this, it is important that the topics matching the pattern already exist on the server. Once the subscription(s) is successfully created on the pattern-based topic, the subscribers receive messages published on topics that match the pattern. Also, if a topic is created after the pattern-based-subscription has been registered against the pattern provided by subscriber, it will register that subscriber against that topic.

Similarly, regarding un-registering from a topic, this method unsubscribes the subscriber against all the matching topics with provided pattern without affecting any other subscription using this call.

Note
  • A subscriber can only get pattern-based topic and is not allowed to create it.
  • A pattern can be used by the publisher only to receive failure notifications.

Supported Wildcards

Pattern-based method of subscription supports the following three wildcards:

  1. * : zero or many characters. For example, bl* subscribes to black, blue, and blur etc.
  2. ? : any one character. For example, h?t subscribes to hit, hot and hat etc.
  3. [] : range of characters. For example, b[ae]t subscribes to bet and bat, but not bit.

Creating a Subscription with Wildcard

The following example creates a subscription by providing a pattern based on which the topic is subscribed matching the pattern.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
// Create topic name for all topics with suffix Beverages
// Only ? * [] wildcards supported
string topicName = "*Beverages";

// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName, TopicSearchOptions.ByPattern);

// If topic exists, Create subscription
if (topic != null)
{
    // Create and register subscribers for Order topic
    ITopicSubscription subscription = topic.CreateSubscription(MessageReceived);
}
String topicName = "*Beverages";

// Get all topics that fulfill the pattern
Topic orderTopic = cache.getMessagingService().getTopic(topicName, TopicSearchOptions.ByPattern);

if (orderTopic != null) {

    // Create and register notifications for Order topic
    TopicSubscription orderSubscriber = orderTopic.createSubscription(messageReceived());
} else {
    // No topic exists
}
val topicName = "*Beverages"

// Specify Message received listener
val messageReceived = MessageListener()

// Get all topics that fulfill the pattern
val orderTopic = cache.getMessagingService.getTopic(topicName, TopicSearchOptions.ByPattern)

if (orderTopic != null) { // Create and register notifications for Order topic
    val orderSubscriber = orderTopic.createSubscription(messageReceived)
}
else {
    // No topic exists
}

    // Topic "Beverages" exists in the cache
    // Only ? * [] wildcards supported
    let topicName = "*Beverages";

    // Get the topic
    let orderTopic = ncache.getMessagingService().getTopic(topicName, ncache.TopicSearchOptions.ByPattern);

    // Create and register subscribers for orderTopic
    let orderSubscriber = orderTopic.createSubscription(this.messageReceived());
# Topic "Beverages" exists in the cache
# Only ? * [] wildcards supported
topic_name = "*Beverages*"
subscription_name = "orderTopicName"

# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name, ncache.TopicSearchOptions.BY_PATTERN)

# Create and register subscribers for orderTopic
order_subscriber = order_topic.create_subscription(on_message_received)

Asynchronous Subscriptions

Delivery mode can be specified while creating subscriptions for ordered messages and can be sync or async. The following example creates subscription using CreateSubscription method with sync delivery mode. It is recommended to use sync mode for ordered messages and async mode otherwise to achieve high performance.

While creating a subscription, you can specify delivery mode that can be synchronous or asynchronous. In particular, synchronous delivery mode can be used for ordered messages, whereas if you are not using ordered messages, asynchronous mode improves performance.

Note

By default, the delivery mode of messages is set to synchronous.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
// NewBeverages is the name of the topic created beforehand
string topicName = "NewBeverages";

// Get the topic
ITopic topic = cache.MessagingService.GetTopic(topicName);

// If topic exists, Create subscription
if (topic != null)
{
    // Create and register subscribers for Order topic
    // MessageReceived callback is specified
    // DeliveryMode is set to async
    ITopicSubscription subscription = topic.CreateSubscription(MessageReceived, DeliveryMode.Async);
}
// Topic "NewBeverages" already exists in the cache
String topicName = "NewBeverages";

// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);

if (orderTopic != null) {
    // Create and register subscribers for orderTopic
    // Message received callback is specified
    TopicSubscription orderSubscriber = orderTopic.createSubscription(onMessageReceived(), DeliveryMode.Async);
} else {
    // No topic exists
}
// Topic "NewBeverages" already exists in the cache
val topicName = "NewBeverages"

// Specify Message received listener
val onMessageReceived = MessageListener()

// Get the topic
val orderTopic = cache.getMessagingService.getTopic(topicName)

if (orderTopic != null) {
    // Create and register subscribers for orderTopic
    // Message received callback is specified
    val orderSubscriber = orderTopic.createSubscription(onMessageReceived, DeliveryMode.Async)
}
else {
    // No topic exists
}
// This is an async method
// Topic "NewBeverages" already exists in the cache
let topicName = "NewBeverages";

// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);

if (orderTopic != null) {

    // Create and register subscribers for orderTopic
    // Message received callback is specified
    let orderSubscriber = await orderTopic.createSubscription(ncache.onMessageReceived(), ncache.DeliveryMode.Sync);

    // Topics can also be unsubscribed
    orderSubscriber.unSubscribe();
}
else {
    // No topic exists
}
# Topic "NewBeverages" already exists in the cache
topic_name = "NewBeverages"

# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)

if order_topic is not None:
    # Create and register subscribers for orderTopic
    # Message received callback is specified
    order_subscriber = order_topic.create_subscription(on_message_received, ncache.DeliveryMode.SYNC)
else:
    # No topic exists
    print("Topic not found")
Note

You can also unsubscribe from a topic using the UnSubscribe method.

Additional Resources

NCache provides sample application for Pub/Sub on GitHub.

See Also

Pub/Sub Topics
Publish Messages to a Topic
Pub/Sub Events

Back to top Copyright © 2017 Alachisoft