使用连续查询
Assuming that you have indexed the required searchable attributes, you are now required to implement Continuous Query in your application. Keeping in mind the purpose of Continuous Queries, the first thing you need to do is to define all the callbacks that need to be executed once the result set of your query is in any way changed. Then, we need to register the Continuous Query with the cache server.
如果您的所有应用程序都不需要跟踪任何查询结果集,那么您不仅应该取消注册通知,还应该从缓存中取消注册查询。
先决条件
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参阅给定页面 客户端 API 先决条件.
- 需要首先配置可搜索对象及其属性的索引,如中所述 配置查询索引 在管理员指南中。
- 缓存应该有一些与配置属性相关的数据。
- 有关 API 详细信息,请参阅: 缓存, 事件数据过滤器, 事件类型, 执行阅读器, 注册CQ, 取消注册CQ, 取消注册通知, 连续查询, 注册通知
, 查询数据通知回调, 缓存读取器, 字段计数, 阅读, 插页, CQ事件参数, 取得价值.
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参阅给定页面 客户端 API 先决条件.
- 需要首先配置可搜索对象及其属性的索引,如中所述 配置查询索引 在管理员指南中。
- 缓存应该有一些与配置属性相关的数据。
- 有关 API 详细信息,请参阅: 缓存, 事件数据过滤器, 事件类型, 执行阅读器, 注册CQ, 取消注册CQ, 移除数据修改监听器, 缓存项, 缓存项版本, 插, CQ事件参数, 获取事件类型, 读, 的getValue, 连续查询, 添加数据修改监听器, 获取字段计数, 缓存读取器.
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参阅给定页面 客户端 API 先决条件.
- 需要首先配置可搜索对象及其属性的索引,如中所述 配置查询索引 在管理员指南中。
- 缓存应该有一些与配置属性相关的数据。
- 有关 API 详细信息,请参阅: 缓存, 事件数据过滤器, 事件类型, 执行阅读器, 注册CQ, 取消注册CQ, 移除数据修改监听器, 缓存项, 插, 获取事件类型, 的getValue, 读, 获取字段计数, 连续查询, 添加数据修改监听器.
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参阅给定页面 客户端 API 先决条件.
- 需要首先配置可搜索对象及其属性的索引,如中所述 配置查询索引 在管理员指南中。
- 缓存应该有一些与配置属性相关的数据。
- 有关 API 详细信息,请参阅: 获取事件类型, 执行阅读器, 注册_cq,
取消注册cq, 删除数据修改监听器, 缓存项, 插, 获取字段计数, 添加数据修改监听器, 连续查询, 读, 获取值, 事件类型, CQ事件参数.
第 1 步:注册事件回调
假设您已经索引了所需的可搜索属性,您可以在应用程序中实现连续查询。 这要求您定义在修改查询的结果集后需要执行的所有回调。 然后,我们需要将 Continuous Query 注册到缓存服务器。
可以为多个事件注册回调。
public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
Console.WriteLine($"Item with key '{key}' has been added to resut set of continuous query");
break;
case EventType.ItemUpdated:
Console.WriteLine($"Item with key '{key}' has been updated in the resut set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
Console.WriteLine($"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'");
}
break;
case EventType.ItemRemoved:
Console.WriteLine($"Item with key '{key}' has been removed from resut set of continuous query");
break;
}
}
public static void onChangeInQueryResultSet(String key, CQEventArg arg) {
switch (arg.getEventType()) {
case ItemAdded:
System.out.println("Item with key '" + key + "' has been added to result set of continuous query");
break;
case ItemUpdated:
System.out.println("Item with key '" + key + "' has been updated in the result set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.getItem() != null) {
Product updatedProduct = (Product) arg.getItem().getValue(Product.class);
System.out.println("Updated product '" + updatedProduct.getProductName() + "' with key '" + key + "' has ID '" + updatedProduct.getProductID() + "'");
}
break;
case ItemRemoved:
System.out.println("Item with key '" + key + "' has been removed from result set of the continuous query");
break;
}
}
// This is an async method
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
// Key has been added to the cache
break;
case ncache.ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
// Perform operations accordingly
}
break;
case ncache.ItemRemoved:
// key has been removed from the cache
break;
}
}
# Precondition: Cache is already connected
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
# Key has been added to the cache
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Key has been updated in the cache
# Get updated product object
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
# Perform operations accordingly
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
# key has been removed from the cache
print(key + " removed from cache")
备注
为确保操作是故障安全的,建议处理应用程序中的任何潜在异常,如中所述 处理故障.
第 2 步:注册查询和通知
注册回调后,创建一个连续查询,它指定将触发事件的结果集的条件。 此查询将针对服务器注册。
一旦创建了连续查询,预定义的回调就会注册到查询中。 回调是根据注册的 EventType
和 EventDataFilter
.
现在可以使用以下方法在服务器上注册连续查询 RegisterCQ
。 您可以在应用程序中多次使用此方法来接收查询数据集更改的通知。
缓存事件通知中的任何修改都将根据事件类型触发。 对于查询缓存数据, ExecuteReader
执行查询,然后在客户端逐块读取生成的结果集。
您可以通过修改缓存数据来触发事件,从而影响结果集。 该代码示例更新现有缓存项,以便将其添加到查询结果集中,从而触发 ItemAdded
事件。
警告
如果服务器和客户端之间的连接中断,则客户端将不会接收在此持续时间内触发的任何事件。
// Query for required operation
string query = "SELECT $VALUE$ FROM Alachisoft.NCache.Samples.Data.Product WHERE Category = ?";
// Create query command and add parameters
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Item add, update, remove notification
// EventDataFilter.DataWithMetadata returns the cache keys added
cQuery.RegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
// Query for the desired operation
String query = "SELECT $VALUE$ FROM CQ.Product WHERE productId = ?";
// Create query command
QueryCommand queryCommand = new QueryCommand(query);
// Set parameters
HashMap<String, Object> parameters = queryCommand.getParameters();
parameters.put("productId", "Beverages");
// Create Continuous Query
ContinuousQuery cQuery = new ContinuousQuery(queryCommand);
// Register notifications for continuous query (ItemAdded, ItemUpdated, and ItemRemoved events)
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved), EventDataFilter.DataWithMetadata);
// Register the continuous query on the server
cache.getMessagingService().registerCQ(cQuery);
System.out.println("Registered Successfully");
// Precondition: Cache is already connected
// Query for required operation
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
// Register continuous query on server
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
// Trigger add notifications
let version = await this.cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
# Query for required operation
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
# Create continuous query
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
# Item remove notification
# EventDataFilter.Metadata returns cache keys + item metadata on updation
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
# Register continuous query on server
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
# Perform operations
else:
# None query result set returned
print("Query result is None")
# Update Product data in cache to trigger callback
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
# Trigger add notifications
version = cache.insert(key, cache_item)
# This will add item to the result set as it matches query criteria
第 3 步:从连续查询中取消注册通知
当应用程序不再需要通知时,可以从 Continuous Query 中取消注册通知。 如果已使用以下方法注册了多个事件类型,您可以取消注册特定事件类型的通知: UnRegisterNotification
方法。
例如,如果 ItemAdded
和 ItemRemoved
事件类型已注册,但您的业务逻辑不再需要事件 ItemAdded
,您可以专门取消注册通知 ItemAdded
事件。
// Unregister notifications for ItemAdded events from continuous query
cQuery.UnRegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved));
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
第 4 步:从服务器注销连续查询
一旦应用程序不再有兴趣接收查询结果集中更改的通知,则应从服务器取消注册已注册的连续查询。
UnregisterCQ
将以下对象作为参数 ContinuousQuery
注销在此调用后不再触发的回调。
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
// Unregister cq from server
cache.getMessagingService().unRegisterCQ(cQuery);
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)
更多资讯
NCache 提供了一个用于连续查询的示例应用程序 GitHub上.
参见
.NET: Alachisoft.NCache.运行时.事件 命名空间。
Java的: COM。alachisoft.ncache。事件 命名空间。
节点.js: 连续查询 类。
Python: ncache.runtime.caching.events 类。