• Webinars
  • Docs
  • Download
  • Blogs
  • Contact Us
Try Free
Show / Hide Table of Contents

Pub/Sub Topics

A topic contains the message itself, along with additional subscriber and publisher information. The ITopic/Topic interface facilitates to create a topic, set topic priority while creation, get a topic, delete a topic simply and asynchronously.

Prerequisites

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: ICache, CacheItem, ITopic, CreateTopic, GetTopic, DeleteTopic, TopicPriority, DeleteTopicAsync.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: Cache, CacheItem, Topic, createTopic, getTopic, deleteTopic, getMessagingService, deleteTopicAsync.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: Cache, CacheItem, createTopic, getTopic.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: Cache, CacheItem, createTopic, getTopic, deleteTopic, TopicPriority.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • For API details, refer to: Cache, CacheItem, create_topic, get_topic, delete_topic, TopicPriority.

Create Topic

CreateTopic method creates topic in cache with specified name. If the topic already exists, an instance of the topic is returned as ITopic. Whenever a message is published on a topic, it is delivered based on message preference to subscribers that are registered on that topic. Following example creates a topic orderTopic.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
try
{
    // Pre-condition: Cache is already connected

    // Mention the name of the topic
    string topicName = "orderTopic";

    // Create the topic
    ITopic topic = cache.MessagingService.CreateTopic(topicName);
}
catch (OperationFailedException ex)
{
    // Exception can occur due to:
    // Connection Failures
    // Operation Timeout
    // Operation performed during state transfer
}
catch (Exception ex)
{
    // Any other generic exception like ArgumentNullException or ArgumentException
    // Topic name is null/empty
}
try {
    // Precondition: Cache is already connected

    // Mention the name of the topic
    String topicName = "orderTopic";

    // Create the topic
    Topic topic = cache.getMessagingService().createTopic(topicName);
} catch (OperationFailedException ex) {
    // Exception can occur due to:
    // Connection Failures
    // Operation Timeout
    // Operation performed during state transfer
} catch(Exception ex) {
    // Any generic exception like IllegalArgumentException or NullPointerException
}
try {
    // Precondition: Cache is already connected

    // Mention the name of the topic
    val topicName = "orderTopic"

    // Create the topic
    val topic = cache.getMessagingService.createTopic(topicName)
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}
// This is an async method
try {
  // Precondition: Cache is already connected

  // Mention the name of the topic
  let topicName = "orderTopic";

  // Create the topic
  let topic = await ncache.getMessagingService().createTopic(topicName);
} catch (error) {
  // Handle any errors
}
try:
    # Precondition: Cache is already connected

    # Mention the name of the topic
    topic_name = "orderTopic"

    # Create the topic
    topic = cache.get_messaging_service().create_topic(topic_name)
except Exception as exp:
    # Handle errors
Note

To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.

Create Topic with Priority

Note

This feature is only available in NCache 5.2 and onward.

NCache allows you to set topic priority in order to prioritize delivery of critical messages. Priority is specified using the TopicPriority property as Low, Normal, and High. In case eviction is enabled, the topics with least priority are evicted first, and the ones with the high priority are evicted last. Topics created with no priority specified will be created with the default priority i.e. Normal.

Important

Priority of a topic can be specified at the time of topic creation only and cannot be modified afterwards.

The following example creates a topic, orderTopic, with priority as high to avoid early eviction (if eviction is enabled).

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
// Mention the name of the topic
string topicName = "orderTopic";

// Create the topic with priority
ITopic topic = cache.MessagingService.CreateTopic(topicName, Alachisoft.NCache.Runtime.Messaging.TopicPriority.High);
// Mention name of the topic
String topicName = "orderTopic";

// Create topic with priority
Topic topic = cache.getMessagingService().createTopic(topicName, TopicPriority.High);
// Mention name of the topic
val topicName = "orderTopic"

// Create topic with priority
val topic = cache.getMessagingService.createTopic(topicName, TopicPriority.High)
// This is an async method

// Mention name of the topic
let topicName = "orderTopic";

// Create topic with priority
let topic = await ncache
.getMessagingService()
.createTopic(topicName, ncache.TopicPriority.High);
# Mention the name of the topic
topic_name = "orderTopic"

# Create the topic
topic = cache.get_messaging_service().create_topic(topic_name, ncache.TopicPriority.HIGH)

Get Topic

GetTopic method fetches an instance of the specified topic from the cache. If the topic exists, it is returned, otherwise exception is thrown. The following example gets an existing topic orderTopic from the cache.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
// Mention the name of the topic
string topicName = "orderTopic";

// Get the topic from the cache
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);

// Verify successful topic retrieval
if (orderTopic != null)
{
    // orderTopic will be used for receiving and/or publishing messages
}
else
{
    // No topic exists
}
// Mention name of the topic
String topicName = "orderTopic";

// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);

// Verify successful topic retrieval
if (orderTopic != null) {
    // orderTopic will be used for receiving and/or publishing messages
} else {
    // No topic exists
}
// Mention name of the topic
val topicName = "orderTopic"

// Get the topic
val orderTopic = cache.getMessagingService.getTopic(topicName)

// Verify successful topic retrieval
if (orderTopic != null) {
    // orderTopic will be used for receiving and/or publishing messages
}
else {
    // No topic exists
}
// This is an async method

// Mention name of the topic
let topicName = "orderTopic";

// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);

// Verify successful topic retrieval
if (!(orderTopic == null)) {
// orderTopic will be used for receiving and/or publishing messages
} else {
// No topic exists
}
# Mention name of the topic
topic_name = "orderTopic"

# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)

# Verify successful topic retrieval
if order_topic is not None:
    # order_topic will be used for receiving and/or publishing messages
    print("Topic found")
else:
    # No topic exists
    print("Topic not found")

Delete Topic

DeleteTopic method un-registers the topic from cache and removes all messages associated with that topic. If registered, a topic deletion callback OnTopicDeleted will be triggered upon this method call.

The following example deletes the existing topic orderTopic from cache and removes all messages associated with that topic. If the OnTopicDeleted callback is registered, it will be triggered upon this method call.

  • .NET/.NET Core
  • Java
  • Node.js
  • Python
  • Scala

// Define the topic to be deleted
string topicName = "orderTopic";

// Delete the topic "orderTopic"
cache.MessagingService.DeleteTopic(topicName);

// Callback will be triggered if registered
// Mention the topic to be deleted
String topicName = "orderTopic";

// Delete the topic
cache.getMessagingService().deleteTopic(topicName);

// Callback will be triggered if registered
// Mention the topic to be deleted
let topicName = "orderTopic";

// Delete the topic
ncache.getMessagingService().deleteTopic(topicName);

// Callback will be triggered if registered
# Mention the topic to be deleted
topic_name = "orderTopic"

# Delete the topic
cache.get_messaging_service().delete_topic(topic_name)

# Callback will be triggered if registered
// Mention the topic to be deleted
val topicName = "orderTopic"

// Delete the topic
cache.getMessagingService.deleteTopic(topicName)

// Callback will be triggered if registered

Delete Topic Asynchronously

DeleteTopicAsync method deletes the topic asynchronously. Whenever a topic is deleted asynchronously, Task is returned to the user for performing further tasks without waiting for the topic to be deleted. The following example shows the asynchronous deletion of topic orderTopic.

  • .NET/.NET Core
  • Java
  • Scala
  • Python
// Define the topic to be deleted
string topicName = "orderTopic";

// Delete the topic "orderTopic"
Task task = cache.MessagingService.DeleteTopicAsync(topicName);

// Use task to perform further operations according to business logic
// Callback will be triggered if registered
// Mention the topic to be deleted
String topicName = "orderTopic";

// Delete the topic asynchronously
TimeScheduler.Task task = (TimeScheduler.Task) cache.getMessagingService().deleteTopicAsync(topicName);

// Use tasks to perform further operations according to business logic
// Callback will be triggered if registered
// Mention the topic to be deleted
val topicName = "orderTopic"

// Delete the topic
val task = cache.getMessagingService.deleteTopicAsync(topicName)

// Use tasks to perform further operations according to business logic
// Callback will be triggered if registered
# Mention the topic to be deleted
topic_name = "orderTopic"

# Delete the topic asynchronously
async def delete_async():
    task = cache.get_messaging_service().delete_topic_async(topic_name)
    result = await task

asyncio.run(delete _async())

# Use tasks to perform further operations according to business logic
# Callback will be triggered if registered

Properties of ITopic Interface

Members Type Description
MessageDeliveryFailure MessageDeliveryFailureCallback Event on topic so that publisher receives all failed messages that are not delivered to any subscriber or may be messages are expired or evicted before delivery.
Name string Name of the topic specified during topic creation.
OnTopicDeleted TopicDeletedCallback Event to handle topic deletion by publisher and subscriber.
ExpirationTime TimeSpan If message level expiration is not provided, then this topic level expiration expires messages in the topic by default. The value is TimeSpan.MaxValue by default.
IsClosed bool Check whether topic is disposed, before performing any operation.
Dispose IDisposable Removes registered topic’s subscription from cache server.

Additional Resources

NCache provides a sample application for Pub/Sub on GitHub.

See Also

Publish Messages to a Topic
Subscribe to a Topic
Event Notifications in Cache
Continuous Query

Back to top Copyright © 2017 Alachisoft