• Webinars
  • Docs
  • Download
  • Blogs
  • Contact Us
Try Free
Show / Hide Table of Contents

Pub/Sub Topics Behavior and Properties

A topic is an entity containing the message itself, along with additional subscriber and publisher information, and it is stored in the cache. It contains the message store, which stores the actual data objects being published by the publisher in a queue. It also internally maintains a list of all subscribers subscribing to it. The topic also contains a list of publishers.

Once the message is published to the topic, it fires an event, which the topic relays to subscribers based upon the preference of message delivery option so that they can get the message as needed. The following figure illustrates the role of a topic as an intermediary channel for publishers and subscribers:

PubSub Topics in NCache

Topic Behavior

Temporary Subscriber Disconnection

In case of temporary subscriber disconnection and then auto re-connection, all topic related information such as subscriptions and failure event notifications are re-registered to topic without any interruption on subscriber end.

Topic Deletion Notification

In case of topic deletion, it is forceful and deletes all messages and related meta-info from the cache. Hence, the subscriber and publisher must be notified of this deletion because of the following reasons:

  • The subscriber might be waiting for incoming messages from the registered topic. Once the topic does not exist, the subscribers can then handle their execution accordingly through event notification and prevent infinite waiting state.

  • The publisher can prevent sending messages to a non-existing topic and handle any pending messages and future execution accordingly.

Topic Priority

Note

This feature is available in NCache 5.2 and onward.

NCache introduces topic-level priority that helps you prioritize the lowest and high priority topics based on their importance. Topics containing critical or important messages can be created with a high priority which means that those messages are delivered first. Similarly, topics containing unimportant data can be created with a lower priority which means that those messages are evicted first.

Topics created with no priority specified will be created with the default priority i.e. Normal. Priority is specified using the TopicPriority property as Low, Normal, and High. In case eviction is enabled, the topics with least priority are evicted first, and the ones with the high priority are evicted last.

Important

Priority of a topic can be specified at the time of topic creation only and cannot be modified afterwards.

Prerequisites

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
  • Install either of the following NuGet packages in your application based on your NCahce edition:
    • Enterprise: Alachisoft.NCache.SDK
    • Professional: Alachisoft.NCache.Professional.SDK
  • Include the following namespace in your application:
    • Alachisoft.NCache.Client
    • Alachisoft.NCache.Runtime
    • Alachisoft.NCache.Runtime.Caching
    • Alachisoft.NCache.Runtime.Exceptions
  • Cache must be running.
  • The application must be connected to cache before performing the operation.
  • Make sure that the data being added is serializable.
  • For API details, refer to: ICache, CacheItem, ITopic, CreateTopic, GetTopic, DeleteTopic.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.
  • Add the following Maven dependencies in your pom.xml file:
<dependency>
    <groupId>com.alachisoft.ncache</groupId>
    <!--for NCache Enterprise Edition--> 
    <artifactId>ncache-client</artifactId>
    <!--for NCache Professional Edition-->
    <artifactId>ncache-professional-client</artifactId>
    <version>x.x.x</version>
</dependency>
  • Import the following packages in your application:
    • import com.alachisoft.ncache.client.*;
    • import com.alachisoft.ncache.runtime.exceptions.*;
  • Cache must be running.
  • The application must be connected to cache before performing the operation.
  • Make sure that the data being added is serializable.
  • For API details, refer to: Cache, CacheItem, Topic, createTopic, getTopic, deleteTopic.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.
  • Install and include either of the following modules in your application based on your NCache edition:
    • Enterprise: const ncache = require('ncache-client')
    • Professional: const ncache = require('ncache-professional-client')
  • Cache must be running.
  • The application must be connected to cache before performing the operation.
  • For API details, refer to: Cache, CacheItem, createTopic, getTopic, deleteTopic.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.
  • Install the NCache Python client by executing the following command:
# Enterprise Client
pip install ncache-client

# Professional Client
pip install ncache-professional-client
  • Import the NCache module and asyncio in your application.
  • Cache must be running.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.
  • Add the following Maven dependencies in your pom.xml file:
<dependency>
    <groupId>com.alachisoft.ncache</groupId>
    <!--for NCache Enterprise Edition--> 
    <artifactId>ncache-scala-client</artifactId>
    <!--for NCache Professional Edition-->
    <artifactId>ncache-scala-professional-client</artifactId> 
    <version>x.x.x</version>
</dependency>
  • Import the following packages in your application:
    • import com.alachisoft.ncache.scala.client.*;
  • Cache must be running.
  • The application must be connected to cache before performing the operation.
  • Make sure that the data being added is serializable.
  • To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
  • To handle any unseen exceptions, refer to the Troubleshooting section.

Create Topic

CreateTopic method creates topic in cache with specified name. If the topic already exists, an instance of the topic is returned as ITopic. Whenever a message is published on a topic, it is delivered based on message preference to subscribers that are registered on that topic. Following example creates a topic orderTopic.

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-condition: Cache is already connected

    // Mention the name of the topic
    string topicName = "orderTopic";

    // Create the topic
    ITopic topic = cache.MessagingService.CreateTopic(topicName);
}
catch (OperationFailedException ex)
{
    // Exception can occur due to:
    // Connection Failures
    // Operation Timeout
    // Operation performed during state transfer
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}
try {
    // Precondition: Cache is already connected

    // Mention the name of the topic
    String topicName = "orderTopic";

    // Create the topic
    Topic topic = cache.getMessagingService().createTopic(topicName);
} catch (OperationFailedException ex) {
    // Exception can occur due to:
    // Connection Failures
    // Operation Timeout
    // Operation performed during state transfer
} catch(Exception ex) {
    // Any generic exception like IllegalArgumentException or NullPointerException
}
// This is an async method
try {
  // Precondition: Cache is already connected

  // Mention the name of the topic
  let topicName = "orderTopic";

  // Create the topic
  let topic = await ncache.getMessagingService().createTopic(topicName);
} catch (error) {
  // Handle any errors
}
try:
    # Precondition: Cache is already connected 

    # Mention the name of the topic
    topic_name = "orderTopic"

    # Create the topic
    topic = cache.get_messaging_service().create_topic(topic_name)
except Exception as exp:
    # Handle errors
try {
    // Precondition: Cache is already connected

    // Mention the name of the topic
    val topicName = "orderTopic"

    // Create the topic
    val topic = cache.getMessagingService.createTopic(topicName)
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}
Note

To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.

Create Topic with Priority

Note

This feature is only available in NCache 5.2 and onward.

The following example creates a topic, orderTopic, with priority as high to avoid early eviction (if eviction is enabled).

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-condition: Cache is already connected

    // Mention the name of the topic
    string topicName = "orderTopic";

    // Create the topic with priority
    ITopic topic = cache.MessagingService.CreateTopic(topicName, Alachisoft.NCache.Runtime.Messaging.TopicPriority.High);
}
catch (OperationFailedException ex)
{
    // Exception can occur due to:
    // Connection Failures
    // Operation Timeout
    // Operation performed during state transfer
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}
try {
    // Precondition: Cache is already connected

    // Mention name of the topic
    String topicName = "orderTopic";

    // Create topic with priority
    Topic topic = cache.getMessagingService().createTopic(topicName, TopicPriority.High);
} catch (OperationFailedException ex) {
    // Exception can occur due to:
    // Connection Failures
    // Operation Timeout
    // Operation performed during state transfer
} catch (Exception ex) {
    // Any other generic exception like IllegalArgumentException or NullPointerException
}
// This is an async method
try {
  // Precondition: Cache is already connected

  // Mention name of the topic
  let topicName = "orderTopic";

  // Create topic with priority
  let topic = await ncache
    .getMessagingService()
    .createTopic(topicName, ncache.TopicPriority.High);
} catch (error) {
  // Handle any errors
}
try:
    # Precondition: Cache is already connected

    # Mention the name of the topic
    topic_name = "orderTopic"

    # Create the topic
    topic = cache.get_messaging_service().create_topic(topic_name, ncache.TopicPriority.HIGH)
except Exception as exp:
    # Handle errors
try {
    // Precondition: Cache is already connected

    // Mention name of the topic
    val topicName = "orderTopic"

    // Create topic with priority
    val topic = cache.getMessagingService.createTopic(topicName, TopicPriority.High)
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}

Get Topic

GetTopic method fetches an instance of the specified topic from the cache. If the topic exists, it is returned, otherwise exception is thrown. The following example gets an existing topic orderTopic from the cache.

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-condition: Cache is already connected

    // Mention the name of the topic
    string topicName = "orderTopic";

    // Get the topic from the cache
    ITopic orderTopic = cache.MessagingService.GetTopic(topicName);

    // Verify successful topic retrieval
    if (orderTopic != null)
    {
        // orderTopic will be used for receiving and/or publishing messages
    }
    else
    {
        // No topic exists
    }
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_NOT_FOUND)
    {
        // Specified topic does not exist
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}
try {
    // Precondition: Cache is already connected

    // Mention name of the topic
    String topicName = "orderTopic";

    // Get the topic
    Topic orderTopic = cache.getMessagingService().getTopic(topicName);

    // Verify successful topic retrieval
    if (orderTopic != null) {
        // orderTopic will be used for receiving and/or publishing messages
    } else {
        // No topic exists
    }
} catch (OperationFailedException ex) {
    // Exception can occur due to:
    // Connection Failures
    // Operation Timeout
    // Operation performed during state transfer
} catch (Exception ex) {
    // Generic exception like IllegalArgumentException or NullPointerException
}
// This is an async method
try {
  // Precondition: Cache is already connected

  // Mention name of the topic
  let topicName = "orderTopic";

  // Get the topic
  let orderTopic = await ncache.getMessagingService().getTopic(topicName);

  // Verify successful topic retrieval
  if (!(orderTopic == null)) {
    // orderTopic will be used for receiving and/or publishing messages
  } else {
    // No topic exists
  }
} catch (error) {
  // Handle any errors
}
try:
    # Precondition: Cache is already connected 

    # Mention name of the topic 
    topic_name = "orderTopic"

    # Get the topic
    order_topic = cache.get_messaging_service().get_topic(topic_name)

    # Verify successful topic retrieval 
    if order_topic is not None:
        # order_topic will be used for receiving and/or publishing messages
        print("Topic found")
    else:
        # No topic exists
        print("Topic not found")
except Exception as exp:
    # Handle errors
try {
    // Precondition: Cache is already connected

    // Mention name of the topic
    val topicName = "orderTopic"

    // Get the topic
    val orderTopic = cache.getMessagingService.getTopic(topicName)

    // Verify successful topic retrieval
    if (orderTopic != null) {
      // orderTopic will be used for receiving and/or publishing messages
    }
    else {
      // No topic exists
    }
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}
Note

To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.

Delete Topic

DeleteTopic method un-registers the topic from cache and removes all messages associated with that topic. If registered, a topic deletion callback OnTopicDeleted will be triggered upon this method call.

The following example deletes the existing topic orderTopic from cache and removes all messages associated with that topic. If the OnTopicDeleted callback is registered, it will be triggered upon this method call.

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala
try
{
    // Pre-condition: Cache is already connected

    // Define the topic to be deleted
    string topicName = "orderTopic";

    // Delete the topic "orderTopic"
    cache.MessagingService.DeleteTopic(topicName);

    // Callback will be triggered if registered
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_NOT_FOUND)
    {
        // Topic does not exist
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}
try {
    // Precondition: Cache is already connected

    // Mention the topic to be deleted
    String topicName = "orderTopic";

    // Delete the topic
    cache.getMessagingService().deleteTopic(topicName);

    // Callback will be triggered if registered
} catch (OperationFailedException ex) {
    // Exception can occur due to:
    // Connection Failures
    // Operation Timeout
    // Operation performed during state transfer
} catch (Exception ex) {
    // Any other generic exception like IllegalArgumentException or NullPointerException
}
try {
  // Precondition: Cache is already connected

  // Mention the topic to be deleted
  let topicName = "orderTopic";

  // Delete the topic
  ncache.getMessagingService().deleteTopic(topicName);

  // Callback will be triggered if registered
} catch (error) {
  // Handle any errors
}
try:
    # Precondition: Cache is already connected 

    # Mention the topic to be deleted
    topic_name = "orderTopic"

    # Delete the topic
    cache.get_messaging_service().delete_topic(topic_name)

    # Callback will be triggered if registered
except Exception as exp:
    # Handle errors
try {
    // Precondition: Cache is already connected

    // Mention the topic to be deleted
    val topicName = "orderTopic"

    // Delete the topic
    cache.getMessagingService.deleteTopic(topicName)

    // Callback will be triggered if registered
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}
Note

To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.

Delete Topic Asynchronously

DeleteTopicAsync method deletes the topic asynchronously. Whenever a topic is deleted asynchronously, Task is returned to the user for performing further tasks without waiting for the topic to be deleted. The following example shows the asynchronous deletion of topic orderTopic.

  • .NET/.NET Core
  • Java
  • Python
  • Scala
try
{
    // Pre-condition: Cache is already connected

    // Define the topic to be deleted
    string topicName = "orderTopic";

    // Delete the topic "orderTopic"
    Task task = cache.MessagingService.DeleteTopicAsync(topicName);

    // Use task to perform further operations according to business logic
    // Callback will be triggered if registered
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.TOPIC_NOT_FOUND)
    {
        // Topic does not exist
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}
try {
    // Precondition: Cache is connected

    // Mention the topic to be deleted
    String topicName = "orderTopic";

    // Delete the topic asynchronously
    TimeScheduler.Task task = (TimeScheduler.Task) cache.getMessagingService().deleteTopicAsync(topicName);

    // Use tasks to perform further operations according to business logic
    // Callback will be triggered if registered
} catch (OperationFailedException exception) {
    if (exception.getErrorCode() == NCacheErrorCodes.TOPIC_NOT_FOUND)
    {
        // Topic does not exist
    } else {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
} catch (Exception exception) {
    // Any other generic exception like IllegalArgumentException or NullPointerException
}
try:
    # Precondition: Cache is connected

    # Mention the topic to be deleted
    topic_name = "orderTopic"

    # Delete the topic asynchronously
    async def delete_async():
        task = cache.get_messaging_service().delete_topic_async(topic_name)
        result = await task

asyncio.run(delete _async())


    # Use tasks to perform further operations according to business logic
    # Callback will be triggered if registered
except Exception as exp:
    # Handle errors
try {
    // Precondition: Cache is already connected

    // Mention the topic to be deleted
    val topicName = "orderTopic"

    // Delete the topic
    val task = cache.getMessagingService.deleteTopicAsync(topicName)

    // Use tasks to perform further operations according to business logic
    // Callback will be triggered if registered
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}
Note

To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.

Properties of ITopic Interface

Members Type Description
MessageDeliveryFailure MessageDeliveryFailureCallback Event on topic so that publisher receives all failed messages that are not delivered to any subscriber or may be messages are expired or evicted before delivery.
Name string Name of the topic specified during topic creation.
OnTopicDeleted TopicDeletedCallback Event to handle topic deletion by publisher and subscriber.
ExpirationTime TimeSpan If message level expiration is not provided, then this topic level expiration expires messages in the topic by default. The value is TimeSpan.MaxValue by default.
IsClosed bool Check whether topic is disposed, before performing any operation.
Dispose IDisposable Removes registered topic’s subscription from cache server.

Additional Resources

NCache provides a sample application for Pub/Sub on GitHub.

See Also

Event Notifications in Cache
Pub/Sub Messages
Publish Messages to Topic
Subscribe for Topic Messages
Continuous Query

Back to top Copyright © 2017 Alachisoft