Pub/Sub Topics
A Topic contains the message itself, along with additional subscriber and publisher information. The ITopic/Topic interface in Pub/Sub facilitates creating a Topic, set Topic priority while creation, get a Topic, delete a Topic simply, and asynchronously.
Prerequisites
Before using the NCache client-side APIs, ensure that the following prerequisites are fulfilled:
- Add the following Maven dependencies for your Java client application in
pom.xml file:
<dependency>
<groupId>com.alachisoft.ncache</groupId>
<!--for NCache Enterprise-->
<artifactId>ncache-client</artifactId>
<version>x.x.x</version>
</dependency>
- Install either of the following NuGet packages in your .NET client application:
- Enterprise:
Install-Package Alachisoft.NCache.SDK -Version 4.9.1.0
- Create a new Console Application.
- Make sure that the data being added is serializable.
- Add NCache References by locating
%NCHOME%\NCache\bin\assembly\4.0 and adding Alachisoft.NCache.Web and Alachisoft.NCache.Runtime as appropriate.
- Include the
Alachisoft.NCache.Web.Caching and Alachisoft.NCache.Runtime.Caching namespaces in your application.
- To learn more about the NCache Legacy API, please download the NCache 4.9 documents available as a .zip file on the Alachisoft Website.
How to Create a Pub/Sub Topic in NCache
The CreateTopic method creates a Topic in the cache with a specified name. If the Topic already exists, an instance of the Topic is returned as ITopic. Whenever a message is published on a Topic, it is delivered based on subscriber preferences who are registered on that Topic. The following example creates a Topic orderTopic.
try
{
// Precondition: Cache is already connected
// Mention the name of the Topic
string topicName = "orderTopic";
// Create the Topic
ITopic topic = cache.MessagingService.CreateTopic(topicName);
}
catch (OperationFailedException ex)
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
try
{
// Precondition: Cache is already connected
// Mention the name of the Topic
String topicName = "orderTopic";
// Create the Topic
Topic topic = cache.getMessagingService().createTopic(topicName);
}
catch (OperationFailedException ex)
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
catch(Exception ex)
{
// Any generic exception like IllegalArgumentException or NullPointerException
}
try:
# Precondition: Cache is already connected
# Mention the name of the Topic
topic_name = "orderTopic"
# Create the Topic
topic = cache.get_messaging_service().create_topic(topic_name)
except Exception as ex:
# Exception can occur due to:
# - Connection failures
# - Operation timeout
# - Invalid arguments (e.g., null/empty topic name)
print("Operation failed: " + str(ex))
try
{
// This is an async method
// Precondition: Cache is already connected
// Mention the name of the Topic
let topicName = "orderTopic";
// Create the Topic
let messagingService = await cache.getMessagingService();
let topic = await messagingService.createTopic(topicName);
}
catch (error) {
// Handle any errors
}
try
{
// Using NCache Enterprise 4.9.1
// Precondition: Cache is already connected
// Mention the name of the Topic
string topicName = "orderTopic";
// Create the Topic
cache.MessagingService.CreateTopic(topicName);
}
catch (OperationFailedException ex)
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
Note
To ensure the operation is fail-safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Create Topic with Priority
Note
This feature is only available in NCache 5.2 and onwards.
NCache Pub/Sub allows you to set Topic priority to prioritize the delivery of critical messages. Priority is specified using the TopicPriority property as Low, Normal, and High. In case eviction is enabled, the Topic with the lowest priority are evicted first, and the ones with the highest priority are evicted last. Topics created with no priority specified will be created with the default priority, i.e., Normal. The following example creates a Topic, orderTopic, with priority as high to avoid early eviction (if eviction is enabled).
Important
Priority of a Topic can be specified at the time of Topic creation only and cannot be modified afterwards.
// Precondition: Cache is already connected
// Mention the name of the Topic
string topicName = "orderTopic";
// Create the Topic with priority
ITopic topic = cache.MessagingService.CreateTopic(topicName, Alachisoft.NCache.Runtime.Messaging.TopicPriority.High);
// Precondition: Cache is already connected
// Mention the name of the Topic
String topicName = "orderTopic";
// Create the Topic with priority
Topic topic = cache.getMessagingService().createTopic(topicName, TopicPriority.High);
# Precondition: Cache is already connected
# Mention the name of the Topic
topic_name = "orderTopic"
# Create the Topic
topic = cache.get_messaging_service().create_topic(topic_name, ncache.TopicPriority.HIGH)
// Precondition: Cache is already connected
// This is an async method
// Mention the name of the Topic
let topicName = "orderTopicPriority";
// Create Topic with priority
let messagingService = await cache.getMessagingService();
let topic = await messagingService.createTopic(topicName, ncache.TopicPriority.High);
Get Topic
The GetTopic method fetches an instance of the specified Topic from the cache. If the Topic exists, it is returned, otherwise an exception is thrown. The following example gets an existing Topic orderTopic from the cache.
// Precondition: Cache is already connected
// Mention the name of the Topic
string topicName = "orderTopic";
// Get the Topic from the cache
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
// Verify successful Topic retrieval
if (orderTopic != null)
{
// orderTopic will be used for receiving and/or publishing messages
}
else
{
// No Topic exists
}
// Precondition: Cache is already connected
// Mention name of the Topic
String topicName = "orderTopic";
// Get the Topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
// Verify successful Topic retrieval
if (orderTopic != null)
{
// orderTopic will be used for receiving and/or publishing messages
} else
{
// No Topic exists
}
# Precondition: Cache is already connected
# Mention name of the Topic
topic_name = "orderTopic"
# Get the Topic from the cache
order_topic = cache.get_messaging_service().get_topic(topic_name)
# Verify successful Topic retrieval
if order_topic is not None:
# order_topic will be used for receiving and/or publishing messages
else:
# No Topic exists
// Precondition: Cache is already connected
// This is an async method
// Mention the name of the Topic
let topicName = "orderTopic";
// Get the Topic from the cache
let messagingService = await cache.getMessagingService();
let orderTopic = await messagingService.getTopic(topicName);
// Verify successful Topic retrieval
if (!(orderTopic == null))
{
// orderTopic will be used for receiving and/or publishing messages
}
else
{
// No Topic exists
}
// Using NCache Enterprise 4.9.1
// Precondition: Cache is already connected
// Mention the name of the Topic
string topicName = "orderTopic";
// Get the Topic from the cache
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
// Verify successful Topic retrieval
if (orderTopic != null)
{
// orderTopic will be used for receiving and/or publishing messages
}
else
{
// No Topic exists
}
Delete Topic
The DeleteTopic method unregisters the Topic from the cache and removes all messages associated with that Topic. If registered, a Topic deletion callback OnTopicDeleted will be triggered upon this method call. The following example deletes the existing Topic orderTopic from the cache and removes all messages associated with that Topic. If the OnTopicDeleted callback is registered, it will be triggered upon this method call.
// Precondition: Cache is already connected
// Define the Topic to be deleted
string topicName = "orderTopic";
// Delete the Topic "orderTopic"
cache.MessagingService.DeleteTopic(topicName);
// Callback will be triggered if registered
// Precondition: Cache is already connected
// Mention the Topic to be deleted
String topicName = "orderTopic";
// Delete the Topic
cache.getMessagingService().deleteTopic(topicName);
// Callback will be triggered if registered
# Precondition: Cache is already connected
# Define the Topic to be deleted
topic_name = "orderTopic"
# Delete the Topic "orderTopic"
cache.get_messaging_service().delete_topic(topic_name)
# Callback will be triggered if registered
// Precondition: Cache is already connected
// Mention the Topic to be deleted
let topicName = "orderTopic";
// Delete the Topic
let messagingService = await cache.getMessagingService();
await messagingService.deleteTopic(topicName);
// Callback will be triggered if registered
// Using NCache Enterprise 4.9.1
// Precondition: Cache is already connected
// Define the Topic to be deleted
string topicName = "orderTopic";
// Delete the Topic "orderTopic"
cache.MessagingService.DeleteTopic(topicName);
// Callback will be triggered if registered
Delete Topic Asynchronously
The DeleteTopicAsync method deletes the Topic asynchronously. Whenever a Topic is deleted asynchronously, a Task is returned to the user for performing further tasks without waiting for the Topic to be deleted. The following example shows the asynchronous deletion of the Topic orderTopic.
// Precondition: Cache is already connected
// Define the Topic to be deleted
string topicName = "orderTopic";
// Delete the Topic "orderTopic"
Task task = cache.MessagingService.DeleteTopicAsync(topicName);
// Use task to perform further operations according to business logic
// Callback will be triggered if registered
// Precondition: Cache is already connected
// Mention the Topic to be deleted
String topicName = "orderTopic";
// Delete the Topic asynchronously
TimeScheduler.Task task = (TimeScheduler.Task) cache.getMessagingService().deleteTopicAsync(topicName);
// Use tasks to perform further operations according to business logic
// Callback will be triggered if registered
# Precondition: Cache is already connected
# Mention the Topic to be deleted
topic_name = "orderTopic"
# Delete the Topic asynchronously
task = await cache.get_messaging_service().delete_topic_async(topic_name)
# Use tasks to perform further operations according to business logic
# Callback will be triggered if registered
Properties of ITopic Interface
| Members |
Type |
Description |
MessageDeliveryFailure |
MessageDeliveryFailureCallback |
Event on the Topic so that publisher receives all failed messages that are not delivered to any subscriber or maybe messages are expired or evicted before delivery. |
Name |
string |
Name of the Topic specified during Topic creation. |
OnTopicDeleted |
TopicDeletedCallback |
Event to handle Topic deletion by publisher and subscriber. |
ExpirationTime |
TimeSpan |
If message level expiration is not provided, then this Topic level expiration expires messages in the Topic by default. The value is TimeSpan.MaxValue by default. |
IsClosed |
bool |
Check whether Topic is disposed, before performing any operation. |
Dispose |
IDisposable |
Removes registered Topic's subscription from cache server. |
Additional Resources
NCache provides a sample application for Pub/Sub on GitHub.
See Also
.NET: Alachisoft.NCache.Runtime.Caching namespace.
Java: com.alachisoft.ncache.runtime.caching namespace.
Python: ncache.client.services class.
Node.js: Topic class.