Using Continuous Query
Assuming that you have indexed the required searchable attributes, you are now required to implement the Continuous Query in your application. Keeping in mind the purpose of Continuous Queries, the first thing you need to do is to define all the callbacks that need to be executed once the result set of your query is in any way changed. Then, we need to register the Continuous Query with the cache server.
If all your applications don't require tracking of any query result set, then you should not only unregister
notifications but also unregister the query from your cache.
Prerequisites
- To learn about the standard prerequisites required to work with all NCache client-side features, please refer to the given page on Client-Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: ICache, EventDataFilter, EventType, ExecuteReader, RegisterCQ, UnRegisterCQ, UnRegisterNotification, ContinuousQuery, RegisterNotification, QueryDataNotificationCallback, ICacheReader, FieldCount, Read, Insert, CQEventArg, GetValue.
- To learn about the standard prerequisites required to work with all NCache client-side features, please refer to the given page on Client-Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, CacheItemVersion, insert, CQEventArg, getEventType, read, getValue, ContinuousQuery, addDataModificationListener, getFieldCount, CacheReader.
- To learn about the standard prerequisites required to work with all NCache client-side features, please refer to the given page on Client-Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, insert, getEventType, getValue, read, getFieldCount, ContinuousQuery, addDataModificationListener.
- To learn about the standard prerequisites required to work with all NCache client-side features, please refer to the given page on Client-Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: get_event_type, execute_reader, register_cq,
un_register_cq, remove_data_modification_listener, CacheItem, insert, get_field_count, add_data_modification_listener, ContinuousQuery, read, get_value, EventType, CQEventArg.
Step 1: Register Query and Notifications
First you need to create a Continuous Query, which specifies the criteria for the result set of which the events will be fired. This query will be registered against the server.
Once Continuous Query has been created, the pre-defined callbacks are registered with the query. The callbacks are registered according to EventType
and EventDataFilter
.
The Continuous Query can now be registered on the server using RegisterCQ
. You can use this method multiple times in your application to receive notifications for a change in the dataset of your query.
Any modifications in cache event notifications will be triggered according to the event type. For querying cached data, the ExecuteReader
executes the query and the result set generated is then read on the client-side, chunk by chunk.
You can trigger events by modifying cache data such that it affects the result set.
The code sample updates an existing cache item such that it is added to the query result set, thereby firing an ItemAdded
event.
Warning
If the connection breaks between a server and a client, any events fired within this duration will not be received by the client.
// Query for required operation
string query = "SELECT $VALUE$ FROM Alachisoft.NCache.Samples.Data.Product WHERE Category = ?";
// Create query command and add parameters
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Item add, update, remove notification
// EventDataFilter.DataWithMetadata returns the keys with the data and meta added with data
cQuery.RegisterNotification(OnChangeInQueryResultSet, EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
// Query for the desired operation
String query = "SELECT $VALUE$ FROM CQ.Product WHERE productId = ?";
// Create query command
QueryCommand queryCommand = new QueryCommand(query);
// Set parameters
HashMap<String, Object> parameters = queryCommand.getParameters();
parameters.put("productId", "Beverages");
// Create Continuous Query
ContinuousQuery cQuery = new ContinuousQuery(queryCommand);
// Register notifications for continuous query (ItemAdded, ItemUpdated, and ItemRemoved events)
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved), EventDataFilter.DataWithMetadata);
// Register the continuous query on the server
cache.getMessagingService().registerCQ(cQuery);
System.out.println("Registered Successfully");
// Query for required operation
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
// Register continuous query on server
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
// Trigger add notifications
let version = await this.cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
# Query for required operation
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
# Create continuous query
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
# Item remove notification
# EventDataFilter.Metadata returns cache keys + item metadata on updation
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
# Register continuous query on server
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
# Perform operations
else:
# None query result set returned
print("Query result is None")
# Update Product data in cache to trigger callback
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
# Trigger add notifications
version = cache.insert(key, cache_item)
# This will add item to the result set as it matches query criteria
Step 2: Register Callback for Events
Assuming that you have indexed the required searchable attributes, you can implement the Continuous Query in your application. This requires you to define all the callbacks that need to be executed once the result set of your query is modified. A callback can be registered for multiple events as shown below.
public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
Console.WriteLine($"Item with key '{key}' has been added to result set of continuous query");
break;
case EventType.ItemUpdated:
Console.WriteLine($"Item with key '{key}' has been updated in the result set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
Console.WriteLine($"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'");
}
break;
case EventType.ItemRemoved:
Console.WriteLine($"Item with key '{key}' has been removed from result set of continuous query");
break;
}
}
public static void onChangeInQueryResultSet(String key, CQEventArg arg) {
switch (arg.getEventType()) {
case ItemAdded:
System.out.println("Item with key '" + key + "' has been added to result set of continuous query");
break;
case ItemUpdated:
System.out.println("Item with key '" + key + "' has been updated in the result set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.getItem() != null) {
Product updatedProduct = (Product) arg.getItem().getValue(Product.class);
System.out.println("Updated product '" + updatedProduct.getProductName() + "' with key '" + key + "' has ID '" + updatedProduct.getProductID() + "'");
}
break;
case ItemRemoved:
System.out.println("Item with key '" + key + "' has been removed from result set of the continuous query");
break;
}
}
// This is an async method
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
// Key has been added to the cache
break;
case ncache.ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
// Perform operations accordingly
}
break;
case ncache.ItemRemoved:
// key has been removed from the cache
break;
}
}
# Precondition: Cache is already connected
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
# Key has been added to the cache
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Key has been updated in the cache
# Get updated product object
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
# Perform operations accordingly
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
# key has been removed from the cache
print(key + " removed from cache")
Note
To ensure the operation is fail-safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Step 3: Unregister Notifications from Continuous Query
Notifications can be unregistered from Continuous Query when they are no longer required in the application. You can unregister notifications for a specific event type if multiple event types have been registered using the UnRegisterNotification
method. For example, if ItemAdded
and ItemRemoved
event types were registered but your business logic no longer requires events for ItemAdded
, you can specifically unregister notifications for ItemAdded
events, while still keeping the add and remove notifications registered.
When a client unregisters a notification from a Continuous Query, it only impacts that specific client. Other clients with the same notifications remain unaffected, allowing independent management of subscriptions. However, this method cannot unregister all CQ notifications through this method. For that, a separate method UnRegisterCQ is used. In the following example, an event notification for an item added notification is unregistered from a Continuous Query.
// Unregister notifications for ItemAdded events from continuous query
cQuery.UnRegisterNotification((OnChangeInQueryResultSet), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved));
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
Step 4: Unregister Continuous Query from Server
Once the application is no longer interested in receiving notifications for changes in a query result set, the registered Continuous Query should be unregistered from the server using the UnRegisterCQ method. Using this method, the result set isn’t immediately removed from the server. If it is the last client, the result set will be removed from the cache. Otherwise, the result set remains in the cache, but that specific client will stop receiving notifications for it. Essentially, it depends on the number of clients registered for that result set.
The UnregisterCQ
method takes a ContinuousQuery
object as an argument to unregister the callbacks that are no longer fired after this call. The following code example demonstrates how to unregister a Continuous Query from the cache server.
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
// Unregister cq from server
cache.getMessagingService().unRegisterCQ(cQuery);
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)
Additional Resources
NCache provides a sample application for Continuous Queries on GitHub.
See Also
.NET: Alachisoft.NCache.Runtime.Events namespace.
Java: com.alachisoft.ncache.events namespace.
Node.js: ContinuousQuery class.
Python: ncache.runtime.caching.events class.