Many server applications today need to coordinate work with other applications and share messages and data in an asynchronous manner. And, often this is done in a high transaction environment. For high transaction environments, NCache is an ideal candidate for these asynchronous event-driven communications.
NCache provides a rich set of messaging and streams features to cater to the needs of these applications including Pub/Sub Messaging, Continuous Queries, Item Level Events, and Cache Level Events. Each is described below.
NCache provides a powerful Pub/Sub Messaging platform for applications where the Publisher applications can send messages without knowing who the Subscribers are. The messages can be categorized into Topics and then subscribers can subscribe to specific topics and receive these messages asynchronously.
You can easily use NCache Pub/Sub messaging for distributed applications once you understand the components of Pub/Sub messaging and their working.
Below is a source code example on how a Publisher publishes a message to a topic and then a subscriber subscribes to the topic.
ITopic topic = _cache.MessagingService.CreateTopic(topicName)
// Creating new message containing a “payload”
var message = new Message(payload);
// Set the expiration TimeSpan of the message
message.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publishing the above created message.
topic.Publish(message, DeliveryOption.All, true);
// Get the already created topic to subscribe to it
ITopic orderTopic = cache.MessagingService.GetTopic(topic);
// Create and register subscribers for the topic created
// MessageRecieved callback is registered
orderTopic.CreateSubscription(MessageReceived);
String topicName = "orderUpdates";
Topic topic = cache.getMessagingService().createTopic(topicName);
// Creating new message containing a “payload”
Customer payload = new Customer("David Jones", "XYZCompany");
var message = new Message(payload);
// Set the expiration TimeSpan of the message
message.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publishing the above created message.
topic.publish(message, DeliveryOption.All, true);
// Get the already created topic to subscribe to it
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
// Create and register subscribers for the topic created
orderTopic.createSubscription(new MessageReceivedListener() {
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// message received
}
});
Continuous Query is a powerful feature of NCache that allows you to monitor changes to a “data set” in the distributed cache based on SQL-like query. You create a Continuous Query (CQ) with SQL-like criteria against the cache and register it with NCache. NCache then starts monitoring any changes to this “data set” in the cache including:
Here is how you can create a Continuous Query with specific criteria and register against the server.
// Query for required operation
string query = "SELECT $VALUE$ FROM FQN.Customer WHERE Country = ?";
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Country", "USA");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Register to be notified when a qualified item is added to the cache
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded, EventDataFilter.None);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
// Query for required operation
String query = "SELECT $VALUE$ FROM com.alachisoft.ncache.sample.Customer WHERE country = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Country", "USA");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Register to be notified when a qualified item is added to the cache
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg args) {
switch (args.getEventType())
{
case ItemAdded:
// 'key' has been added to the cache
break;
}
}
}, EnumSet.allOf(EventType.class), EventDataFilter.None);
// Register continuousQuery on server
cache.getMessagingService().registerCQ(cQuery);
For further details, you can see how to use continuous query in cache docs.
Clients register with NCache to receive event notifications they are interested in by providing a callback function. NCache provides filters with events that are used to specify the amount of information returned with the event. These filters are None(default), Metadata and DataWithMetadata. The following types of events are supported:
The code given below shows registering item level events.
public static void RegisterItemLevelEvents()
{
// Get cache
string cacheName = "demoCache";
var cache = CacheManager.GetCache(cacheName);
// Register Item level events
string key = "Product:1001";
cache.MessagingService.RegisterCacheNotification(key, CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
public static void CacheDataNotificationCallbackImpl(string key, CacheEventArg cacheEventArgs)
{
switch (cacheEventArgs.EventType)
{
case EventType.ItemAdded:
// 'key' has been added to the cache
break;
case EventType.ItemUpdated:
// 'key' has been updated in the cache
break;
case EventType.ItemRemoved:
// 'key' has been removed from the cache
break;
}
}
private static void RegisterItemLevelEvents() throws Exception {
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
String key = "Product:1001";
cache.getMessagingService().addCacheNotificationListener(key, new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
private static class CacheDataModificationListenerImpl implements CacheDataModificationListener {
@Override
public void onCacheDataModified(String key, CacheEventArg eventArgs) {
switch (eventArgs.getEventType())
{
case ItemAdded:
// 'key' has been added to the cache
break;
case ItemUpdated:
// 'key' has been updated in the cache
break;
case ItemRemoved:
// 'key' has been removed from the cache
break;
}
}
@Override
public void onCacheCleared(String cacheName) {
//cacheName cleared
}
}
For cache level events, you can use the same callback as used in the item level events.
public static void RegisterCacheLevelEvents()
{
// Get cache
string cacheName = "demoCache";
var cache = CacheManager.GetCache(cacheName);
// Register cache level events
cache.MessagingService.RegisterCacheNotification(EventsSample.CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
private static void RegisterCacheLevelEvents() throws Exception {
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
CacheEventDescriptor eventDescriptor = cache.getMessagingService().addCacheNotificationListener(new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
For .NET and Java applications, NCache is the ideal choice for doing high transaction Pub/Sub Messaging, Continuous Query, and Events. Here is why.
With this, you can use NCache as a powerful yet simple messaging platform. Read more about NCache from the links below and download a fully working 30-Day trial.