Messaging and Streams

Many server applications today need to coordinate work with other applications and share messages and data in an asynchronous manner. And, often this is done in a high transaction environment. For high transaction environments, NCache is an ideal candidate for these asynchronous event-driven communications.

NCache provides a rich set of messaging and streams features to cater to the needs of these applications including Pub/Sub Messaging, Continuous Queries, Item Level Events, and Cache Level Events. Each is described below.

Pub/Sub Messaging, Continuous Query, and Events
 

Pub/Sub Messaging

NCache provides a powerful Pub/Sub Messaging platform for applications where the Publisher applications can send messages without knowing who the Subscribers are. The messages can be categorized into Topics and then subscribers can subscribe to specific topics and receive these messages asynchronously.

  • - Publisher: publishes the message to be consumed by multiple subscribers. The publisher does not know who the subscribers are and how many.
  • - Subscriber: registers itself to receive messages against a specific Topic. The subscriber does not know who the publisher is and how many other subscribers are there.
  • - Topic: all messages are published and received against Topics. Publishers publish messages against it and subscribers receive messages from it. You can have multiple Topics in NCache.
  • - Message: this is the actual message sent by Publishers and consumed by Subscribers.

You can easily use NCache Pub/Sub messaging for distributed applications once you understand the components of Pub/Sub messaging and their working.

Pub/Sub Messaging with NCache

Below is a source code example on how a Publisher publishes a message to a topic and then a subscriber subscribes to the topic.

ITopic topic = _cache.MessagingService.CreateTopic(topicName)

// Creating new message containing a “payload” 
var message = new Message(payload);

// Set the expiration TimeSpan of the message
message.ExpirationTime = TimeSpan.FromSeconds(5000);

// Publishing the above created message.
topic.Publish(message, DeliveryOption.All, true);

// Get the already created topic to subscribe to it
ITopic orderTopic = cache.MessagingService.GetTopic(topic);

// Create and register subscribers for the topic created
// MessageRecieved callback is registered
orderTopic.CreateSubscription(MessageReceived);
String topicName = "orderUpdates";
Topic topic = cache.getMessagingService().createTopic(topicName);

// Creating new message containing a “payload”
Customer payload = new Customer("David Jones", "XYZCompany");
var message = new Message(payload);

// Set the expiration TimeSpan of the message
message.setExpirationTime(TimeSpan.FromSeconds(5000));

// Publishing the above created message.
topic.publish(message, DeliveryOption.All, true);

// Get the already created topic to subscribe to it
Topic orderTopic = cache.getMessagingService().getTopic(topicName);

// Create and register subscribers for the topic created
orderTopic.createSubscription(new MessageReceivedListener() {
    @Override
     public void onMessageReceived(Object sender, MessageEventArgs args) {
// message received
     }
});
 

Continuous Query (CQ)

Continuous Query is a powerful feature of NCache that allows you to monitor changes to a “data set” in the distributed cache based on SQL-like query. You create a Continuous Query (CQ) with SQL-like criteria against the cache and register it with NCache. NCache then starts monitoring any changes to this “data set” in the cache including:

  • - Add: if an item is added to the cache that matches the CQ criteria, the client is notified along with this object.
  • - Update: if an item is updated in the cache that matches the CQ criteria, the client is notified along with this object.
  • - Remove: if an item is removed from the cache that matches the CQ criteria, the client is notified along with this object.

Here is how you can create a Continuous Query with specific criteria and register against the server.

// Query for required operation
string query = "SELECT $VALUE$ FROM FQN.Customer WHERE Country = ?";

var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Country", "USA");

// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);

// Register to be notified when a qualified item is added to the cache
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded, EventDataFilter.None);

// Register continuousQuery on server 
cache.MessagingService.RegisterCQ(cQuery);
String cacheName = "demoCache";

// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);

// Query for required operation
String query = "SELECT $VALUE$ FROM com.alachisoft.ncache.sample.Customer WHERE country = ?";

var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Country", "USA");

// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);

// Register to be notified when a qualified item is added to the cache
cQuery.addDataModificationListener(new QueryDataModificationListener() {
	@Override
	public void onQueryDataModified(String key, CQEventArg args) {
		switch (args.getEventType())
		{
			case ItemAdded:
				// 'key' has been added to the cache
				break;
		}
	}
  }, EnumSet.allOf(EventType.class), EventDataFilter.None);

// Register continuousQuery on server
cache.getMessagingService().registerCQ(cQuery);

For further details, you can see how to use continuous query in cache docs.

 

Event Notifications

Clients register with NCache to receive event notifications they are interested in by providing a callback function. NCache provides filters with events that are used to specify the amount of information returned with the event. These filters are None(default), Metadata and DataWithMetadata. The following types of events are supported:

  • - Item Level Events: the client registered to be notified whenever a specific cached item (based on a key) is updated or removed from the cache and provides a callback for it. When this item is updated or removed, the client is notified.
  • - Cache Level Events: Cache level events are the general events that can be useful when all the client applications need to be notified about any modification in the cache by any connected client application.

The code given below shows registering item level events.

public static void RegisterItemLevelEvents()
{
	// Get cache
	string cacheName = "demoCache";
	var cache = CacheManager.GetCache(cacheName); 

	// Register Item level events
	string key = "Product:1001";
	cache.MessagingService.RegisterCacheNotification(key, CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
public static void CacheDataNotificationCallbackImpl(string key, CacheEventArg cacheEventArgs)
{
	switch (cacheEventArgs.EventType)
	{
	   case EventType.ItemAdded:
		   // 'key' has been added to the cache
		   break;
	   case EventType.ItemUpdated:
		   // 'key' has been updated in the cache
		   break;
	   case EventType.ItemRemoved:
		   // 'key' has been removed from the cache
		   break;
	}
}
private static void RegisterItemLevelEvents() throws Exception {
        String cacheName = "demoCache";

        // Initialize an instance of the cache to begin performing operations:
        Cache cache = CacheManager.getCache(cacheName);
        String key = "Product:1001";
        cache.getMessagingService().addCacheNotificationListener(key, new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
    }
private static class CacheDataModificationListenerImpl implements CacheDataModificationListener {
	@Override
	public void onCacheDataModified(String key, CacheEventArg eventArgs) {
		switch (eventArgs.getEventType())
		{
			case ItemAdded:
				// 'key' has been added to the cache
				break;
			case ItemUpdated:
				// 'key' has been updated in the cache
				break;
			case ItemRemoved:
				// 'key' has been removed from the cache
				break;
		}
	}
	@Override
	public void onCacheCleared(String cacheName) {
		//cacheName cleared
	}
}

For cache level events, you can use the same callback as used in the item level events.

public static void RegisterCacheLevelEvents()
{
	// Get cache
	string cacheName = "demoCache";
	var cache = CacheManager.GetCache(cacheName);

	// Register cache level events
	cache.MessagingService.RegisterCacheNotification(EventsSample.CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata); 
}
private static void RegisterCacheLevelEvents() throws Exception {
	String cacheName = "demoCache";

	// Initialize an instance of the cache to begin performing operations:
	Cache cache = CacheManager.getCache(cacheName);

	CacheEventDescriptor eventDescriptor = cache.getMessagingService().addCacheNotificationListener(new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
 

Why NCache?

For .NET and Java applications, NCache is the ideal choice for doing high transaction Pub/Sub Messaging, Continuous Query, and Events. Here is why.

  • Extremely Fast & Scalable: NCache is extremely fast due to being totally in-memory and gives you sub-millisecond response times. And, it is a distributed cache that lets you add servers to the cache cluster to achieve linear scalability and handle extreme transaction loads.
  • High Availability through Data Replication: NCache intelligently replicates data in the cache without compromising performance. So, you won't lose any data (events and messages) even if a cache server goes down.
  • .NET & Java Apps Message each other: NCache lets .NET and Java applications to share a common Messaging and Streams platform and send messages to each other by either serializing the messages in JSON or sending JSON documents as messages.

With this, you can use NCache as a powerful yet simple messaging platform. Read more about NCache from the links below and download a fully working 30-Day trial.

What to Do Next?

© Copyright Alachisoft 2002 - . All rights reserved. NCache is a registered trademark of Diyatech Corp.