消息传递和流

如今,许多服务器应用程序需要与其他应用程序协调工作并以异步方式共享消息和数据。 而且,这通常是在高交易环境中完成的。 对于高交易环境, NCache 是这些的理想人选 异步事件驱动通信.

NCache 提供了一组丰富的消息传递和流功能来满足这些应用程序的需求,包括 发布/订阅消息, 连续查询, 项目级别事件缓存级别事件. 下面分别介绍。

Pub/Sub 消息传递、连续查询和事件
 

发布/订阅消息

NCache 提供了强大的 发布/订阅消息 发布者应用程序可以在不知道订阅者是谁的情况下发送消息的应用程序平台。 消息可以分为 Topics 然后订阅者可以订阅特定主题并异步接收这些消息。

  • - 出版商: 发布消息以供多个订阅者使用。 发布者不知道订阅者是谁以及有多少。
  • - 订户: 注册自身以接收针对特定主题的消息。 订阅者不知道发布者是谁以及还有多少其他订阅者。
  • - 主题: 所有消息都是针对主题发布和接收的。 发布者针对它发布消息,订阅者从它接收消息。 您可以有多个主题 NCache.
  • - 信息: 这是发布者发送并由订阅者消费的实际消息。

您可以轻松地 使用 NCache 分布式应用程序的 Pub/Sub 消息传递 一旦您了解了 Pub/Sub 消息传递的组件及其工作原理。

发布/订阅消息 NCache

下面是一个源代码示例 Publisher 如何发布消息 一个话题,然后一个 订阅者订阅主题.

ITopic topic = _cache.MessagingService.CreateTopic(topicName)

// Creating new message containing a “payload” 
var message = new Message(payload);

// Set the expiration TimeSpan of the message
message.ExpirationTime = TimeSpan.FromSeconds(5000);

// Publishing the above created message.
topic.Publish(message, DeliveryOption.All, true);

// Get the already created topic to subscribe to it
ITopic orderTopic = cache.MessagingService.GetTopic(topic);

// Create and register subscribers for the topic created
// MessageRecieved callback is registered
orderTopic.CreateSubscription(MessageReceived);
String topicName = "orderUpdates";
Topic topic = cache.getMessagingService().createTopic(topicName);

// Creating new message containing a “payload”
Customer payload = new Customer("David Jones", "XYZCompany");
var message = new Message(payload);

// Set the expiration TimeSpan of the message
message.setExpirationTime(TimeSpan.FromSeconds(5000));

// Publishing the above created message.
topic.publish(message, DeliveryOption.All, true);

// Get the already created topic to subscribe to it
Topic orderTopic = cache.getMessagingService().getTopic(topicName);

// Create and register subscribers for the topic created
orderTopic.createSubscription(new MessageReceivedListener() {
    @Override
     public void onMessageReceived(Object sender, MessageEventArgs args) {
// message received
     }
});
 

连续查询 (CQ)

连续查询 是一个强大的功能 NCache 它允许您基于类似 SQL 的查询来监视分布式缓存中“数据集”的更改。 您可以使用类似于 SQL 的条件针对缓存创建连续查询 (CQ) 并将其注册到 NCache. NCache 然后 开始监视此“数据集”的任何更改 缓存中包括:

  • - 添加: 如果将符合 CQ 标准的项目添加到缓存中,则客户端会与该对象一起收到通知。
  • - 更新: 如果缓存中更新了符合 CQ 标准的项目,则客户端会与该对象一起收到通知。
  • - 移除: 如果从缓存中删除了符合 CQ 标准的项目,则客户端会与该对象一起收到通知。

以下是如何创建具有特定条件的连续查询并在服务器上注册的方法。

// Query for required operation
string query = "SELECT $VALUE$ FROM FQN.Customer WHERE Country = ?";

var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Country", "USA");

// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);

// Register to be notified when a qualified item is added to the cache
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded, EventDataFilter.None);

// Register continuousQuery on server 
cache.MessagingService.RegisterCQ(cQuery);
String cacheName = "demoCache";

// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);

// Query for required operation
String query = "SELECT $VALUE$ FROM com.alachisoft.ncache.sample.Customer WHERE country = ?";

var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Country", "USA");

// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);

// Register to be notified when a qualified item is added to the cache
cQuery.addDataModificationListener(new QueryDataModificationListener() {
	@Override
	public void onQueryDataModified(String key, CQEventArg args) {
		switch (args.getEventType())
		{
			case ItemAdded:
				// 'key' has been added to the cache
				break;
		}
	}
  }, EnumSet.allOf(EventType.class), EventDataFilter.None);

// Register continuousQuery on server
cache.getMessagingService().registerCQ(cQuery);

有关更多详细信息,您可以查看 如何在缓存文档中使用连续查询.

 

活动通知

客户注册 NCache 接收 事件通知 他们感兴趣的是提供回调函数。 NCache 为过滤器提供事件,这些事件用于指定随事件返回的信息量。 这些过滤器是 无(默认), 元数据数据与元数据。 支持以下类型的事件:

  • - 项目级别事件: 客户端注册为在特定缓存项(基于键)更新或从缓存中删除时收到通知,并为其提供回调。 当此项目被更新或删除时,客户端会收到通知。
  • - 缓存级别事件: 当所有客户端应用程序需要被任何连接的客户端应用程序通知缓存中的任何修改时,缓存级别事件是非常有用的通用事件。

下面给出的代码显示了注册项目级别事件。

public static void RegisterItemLevelEvents()
{
	// Get cache
	string cacheName = "demoCache";
	var cache = CacheManager.GetCache(cacheName); 

	// Register Item level events
	string key = "Product:1001";
	cache.MessagingService.RegisterCacheNotification(key, CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
public static void CacheDataNotificationCallbackImpl(string key, CacheEventArg cacheEventArgs)
{
	switch (cacheEventArgs.EventType)
	{
	   case EventType.ItemAdded:
		   // 'key' has been added to the cache
		   break;
	   case EventType.ItemUpdated:
		   // 'key' has been updated in the cache
		   break;
	   case EventType.ItemRemoved:
		   // 'key' has been removed from the cache
		   break;
	}
}
private static void RegisterItemLevelEvents() throws Exception {
        String cacheName = "demoCache";

        // Initialize an instance of the cache to begin performing operations:
        Cache cache = CacheManager.getCache(cacheName);
        String key = "Product:1001";
        cache.getMessagingService().addCacheNotificationListener(key, new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
    }
private static class CacheDataModificationListenerImpl implements CacheDataModificationListener {
	@Override
	public void onCacheDataModified(String key, CacheEventArg eventArgs) {
		switch (eventArgs.getEventType())
		{
			case ItemAdded:
				// 'key' has been added to the cache
				break;
			case ItemUpdated:
				// 'key' has been updated in the cache
				break;
			case ItemRemoved:
				// 'key' has been removed from the cache
				break;
		}
	}
	@Override
	public void onCacheCleared(String cacheName) {
		//cacheName cleared
	}
}

对于缓存级别事件,您可以使用与项目级别事件中使用的相同的回调。

public static void RegisterCacheLevelEvents()
{
	// Get cache
	string cacheName = "demoCache";
	var cache = CacheManager.GetCache(cacheName);

	// Register cache level events
	cache.MessagingService.RegisterCacheNotification(EventsSample.CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata); 
}
private static void RegisterCacheLevelEvents() throws Exception {
	String cacheName = "demoCache";

	// Initialize an instance of the cache to begin performing operations:
	Cache cache = CacheManager.getCache(cacheName);

	CacheEventDescriptor eventDescriptor = cache.getMessagingService().addCacheNotificationListener(new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
 

为什么 NCache?

针对 .NET 和 Java 应用程序, NCache 是执行高事务 Pub/Sub 消息、连续查询和事件的理想选择。 这就是为什么。

  • 极快且可扩展: NCache 由于完全在内存中,因此速度非常快,并为您提供亚毫秒级的响应时间。 而且,它是一个分布式缓存,可以让您将服务器添加到缓存集群中 实现线性可扩展性并处理极端事务负载.
  • 通过数据复制实现高可用性: NCache 在不影响性能的情况下智能地复制缓存中的数据。 因此,即使缓存服务器出现故障,您也不会丢失任何数据(事件和消息)。
  • .NET 和 Java 应用程序相互发送消息: NCache 允许 .NET 和 Java 应用程序共享通用的消息传递和流平台,并通过以 JSON 格式序列化消息或将 JSON 文档作为消息发送来相互发送消息。

有了这个,你可以使用 NCache 作为一个强大而简单的消息传递平台。 阅读更多关于 NCache 从下面的链接下载完整的 30 天试用版。

接下来做什么?

注册每月电子邮件通讯以获取最新更新。

联系我们

联系电话
©版权所有 Alachisoft 2002 - 版权所有。 NCache 是 Diyatech Corp. 的注册商标。