如今,许多服务器应用程序需要与其他应用程序协调工作并以异步方式共享消息和数据。 而且,这通常是在高交易环境中完成的。 对于高交易环境, NCache 是这些的理想人选 异步事件驱动通信.
NCache 提供了一组丰富的消息传递和流功能来满足这些应用程序的需求,包括 发布/订阅消息, 连续查询, 项目级别事件及 缓存级别事件. 下面分别介绍。
NCache 提供了强大的 发布/订阅消息 发布者应用程序可以在不知道订阅者是谁的情况下发送消息的应用程序平台。 消息可以分为 Topics 然后订阅者可以订阅特定主题并异步接收这些消息。
您可以轻松地 使用 NCache 分布式应用程序的 Pub/Sub 消息传递 一旦您了解了 Pub/Sub 消息传递的组件及其工作原理。
下面是一个源代码示例 Publisher 如何发布消息 一个话题,然后一个 订阅者订阅主题.
ITopic topic = _cache.MessagingService.CreateTopic(topicName)
// Creating new message containing a “payload”
var message = new Message(payload);
// Set the expiration TimeSpan of the message
message.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publishing the above created message.
topic.Publish(message, DeliveryOption.All, true);
// Get the already created topic to subscribe to it
ITopic orderTopic = cache.MessagingService.GetTopic(topic);
// Create and register subscribers for the topic created
// MessageRecieved callback is registered
orderTopic.CreateSubscription(MessageReceived);
String topicName = "orderUpdates";
Topic topic = cache.getMessagingService().createTopic(topicName);
// Creating new message containing a “payload”
Customer payload = new Customer("David Jones", "XYZCompany");
var message = new Message(payload);
// Set the expiration TimeSpan of the message
message.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publishing the above created message.
topic.publish(message, DeliveryOption.All, true);
// Get the already created topic to subscribe to it
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
// Create and register subscribers for the topic created
orderTopic.createSubscription(new MessageReceivedListener() {
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// message received
}
});
连续查询 是一个强大的功能 NCache 它允许您基于类似 SQL 的查询来监视分布式缓存中“数据集”的更改。 您可以使用类似于 SQL 的条件针对缓存创建连续查询 (CQ) 并将其注册到 NCache. NCache 然后 开始监视此“数据集”的任何更改 缓存中包括:
以下是如何创建具有特定条件的连续查询并在服务器上注册的方法。
// Query for required operation
string query = "SELECT $VALUE$ FROM FQN.Customer WHERE Country = ?";
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Country", "USA");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Register to be notified when a qualified item is added to the cache
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded, EventDataFilter.None);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
// Query for required operation
String query = "SELECT $VALUE$ FROM com.alachisoft.ncache.sample.Customer WHERE country = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Country", "USA");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Register to be notified when a qualified item is added to the cache
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg args) {
switch (args.getEventType())
{
case ItemAdded:
// 'key' has been added to the cache
break;
}
}
}, EnumSet.allOf(EventType.class), EventDataFilter.None);
// Register continuousQuery on server
cache.getMessagingService().registerCQ(cQuery);
有关更多详细信息,您可以查看 如何在缓存文档中使用连续查询.
客户注册 NCache 接收 事件通知 他们感兴趣的是提供回调函数。 NCache 为过滤器提供事件,这些事件用于指定随事件返回的信息量。 这些过滤器是 无(默认), 元数据 和 数据与元数据。 支持以下类型的事件:
下面给出的代码显示了注册项目级别事件。
public static void RegisterItemLevelEvents()
{
// Get cache
string cacheName = "demoCache";
var cache = CacheManager.GetCache(cacheName);
// Register Item level events
string key = "Product:1001";
cache.MessagingService.RegisterCacheNotification(key, CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
public static void CacheDataNotificationCallbackImpl(string key, CacheEventArg cacheEventArgs)
{
switch (cacheEventArgs.EventType)
{
case EventType.ItemAdded:
// 'key' has been added to the cache
break;
case EventType.ItemUpdated:
// 'key' has been updated in the cache
break;
case EventType.ItemRemoved:
// 'key' has been removed from the cache
break;
}
}
private static void RegisterItemLevelEvents() throws Exception {
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
String key = "Product:1001";
cache.getMessagingService().addCacheNotificationListener(key, new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
private static class CacheDataModificationListenerImpl implements CacheDataModificationListener {
@Override
public void onCacheDataModified(String key, CacheEventArg eventArgs) {
switch (eventArgs.getEventType())
{
case ItemAdded:
// 'key' has been added to the cache
break;
case ItemUpdated:
// 'key' has been updated in the cache
break;
case ItemRemoved:
// 'key' has been removed from the cache
break;
}
}
@Override
public void onCacheCleared(String cacheName) {
//cacheName cleared
}
}
对于缓存级别事件,您可以使用与项目级别事件中使用的相同的回调。
public static void RegisterCacheLevelEvents()
{
// Get cache
string cacheName = "demoCache";
var cache = CacheManager.GetCache(cacheName);
// Register cache level events
cache.MessagingService.RegisterCacheNotification(EventsSample.CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
private static void RegisterCacheLevelEvents() throws Exception {
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
CacheEventDescriptor eventDescriptor = cache.getMessagingService().addCacheNotificationListener(new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
针对 .NET 和 Java 应用程序, NCache 是执行高事务 Pub/Sub 消息、连续查询和事件的理想选择。 这就是为什么。
有了这个,你可以使用 NCache 作为一个强大而简单的消息传递平台。 阅读更多关于 NCache 从下面的链接下载完整的 30 天试用版。