How to use Continuous Query in Cache
Note
This feature is only available in NCache Enterprise Edition.
Assuming that you have indexed the required searchable attributes, you are now required to implement Continuous Query in your application. Keeping in mind the purpose of Continuous Queries, the first thing you need to do is to define all
the callbacks that need to be executed once the result set of your query is in any way changed. Then, we need to register the Continuous Query with the cache server.
If all your applications don't require tracking of any query result set, then you should not only unregister
notifications but also unregister the query from your cache.
Prerequisites
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: ICache, EventDataFilter, EventType, ExecuteReader, RegisterCQ, UnRegisterCQ, UnRegisterNotification, ContinuousQuery, RegisterNotification
, QueryDataNotificationCallback, ICacheReader, FieldCount, Read, Insert, CQEventArg, GetValue.
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, CacheItemVersion, insert, CQEventArg, getEventType, read, getValue, ContinuousQuery, addDataModificationListener, getFieldCount, CacheReader.
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, insert, getEventType, getValue, read, getFieldCount, ContinuousQuery, addDataModificationListener.
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details refer to: get_event_type, execute_reader, register_cq,
un_register_cq, remove_data_modification_listener, CacheItem, insert, get_field_count, add_data_modification_listener, ContinuousQuery, read, get_value, EventType, CQEventArg.
Step 1: Register Callback for Events
Assuming that you have indexed the required searchable attributes, you can implement Continuous Query in your application. This requires you to define all the callbacks that need to be executed once the result set of your query is modified. Then, we need to register the Continuous Query with the cache server.
A callback can be registered for multiple events.
public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
Console.WriteLine($"Item with key '{key}' has been added to resut set of continuous query");
break;
case EventType.ItemUpdated:
Console.WriteLine($"Item with key '{key}' has been updated in the resut set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
Console.WriteLine($"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'");
}
break;
case EventType.ItemRemoved:
Console.WriteLine($"Item with key '{key}' has been removed from resut set of continuous query");
break;
}
}
try
{
// Precondition: Cache is already connected
public void queryItemCallback (String key, CQEventArg arg)
{
switch (arg.getEventType())
{
case ItemAdded:
// Key has been added to the cache
break;
case ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
Product updatedProduct = arg.getItem().getValue(Product.class);
// Perform operations accordingly
}
break;
case ItemRemoved:
// Key has been removed from the cache
break;
}
}
catch (OperationFailedException ex)
{
if (ex.getErrorCode() == NCacheErrorCodes.INCORRECT_FORMAT)
{
// Make sure that the query format is correct
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any generic exception like IllegalArgumentException or NullPointerException
}
try {
// Precondition: Cache is already connected
class CQListener extends QueryDataModificationListener {
override def onQueryDataModified(key: String, eventArgs: CQEventArg): Unit = {
eventArgs.getEventType match {
case EventType.ItemAdded =>
// Key has been added to the cache
case EventType.ItemUpdated =>
// 'key' has been updated in the cache
// get the updated product
if (eventArgs.getItem != null) {
val updateProduct = eventArgs.getItem.getValue(classOf[Product])
// perform operations
}
case EventType.ItemRemoved =>
// 'key' has been removed from the cache
}
}
catch {
case exception: Exception => {
// Handle any errors
}
}
// This is an async method
try
{
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
// Key has been added to the cache
break;
case ncache.ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
// Perform operations accordingly
}
break;
case ncache.ItemRemoved:
// key has been removed from the cache
break;
}
}
}
catch (error)
{
// Handle any errors
}
try:
# Precondition: Cache is already connected
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
# Key has been added to the cache
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Key has been updated in the cache
# Get updated product object
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
# Perform operations accordingly
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
# key has been removed from the cache
print(key + " removed from cache")
except Exception as exp:
# Handle errors
Note
To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Step 2: Register Query and Notifications
After the callbacks are registered, create a Continuous Query, which specifies the criteria for the result set of which the events will be fired. This query will be registered against the server.
Once Continuous Query has been created, the pre-defined callbacks are registered with the query. The callbacks are registered according to EventType
and EventDataFilter
.
The continuous query can now be registered on the server using RegisterCQ()
. You can use this method multiple times in your application to receive the notifications for a change in the dataset of your query.
Any modifications in cache event notifications will be triggered according to the event type. For querying cached data, ExecuteReader
executes the query and the result set generated is then read at client side, chunk by chunk.
You can trigger events by modifying cache data such that it affects the result set. The code sample updates an existing cache item such that it is added to the query result set, thereby firing an ItemAdded
event.
Warning
If the connection breaks between a server and client, any events fired within this duration will not be received by the client.
// Query for required operation
string query = "SELECT $VALUE$ FROM Alachisoft.NCache.Samples.Data.Product WHERE Category = ?";
// Create query command and add parameters
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Item add, update, remove notification
// EventDataFilter.DataWithMetadata returns the cache keys added
cQuery.RegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
// Query for required operation
String query = "SELECT Values FROM FQN.Product WHERE Category = ?";
QueryCommand queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
com.alachisoft.ncache.client.ContinuousQuery continuousQuery = new com.alachisoft.ncache.client.ContinuousQuery(queryCommand);
EventListener listener = new EventListener();
EnumSet<com.alachisoft.ncache.runtime.events.EventType> eventType = EnumSet.of(com.alachisoft.ncache.runtime.events.EventType.ItemAdded, com.alachisoft.ncache.runtime.events.EventType.ItemRemoved, com.alachisoft.ncache.runtime.events.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, EventDataFilter.None);
// Register continuous query on server
cache.getMessagingService().registerCQ(continuousQuery);
CacheReader reader = cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
Product result = reader.getValue(1, Product.class);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
Product updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
String key = "Product:" + updatedProduct.getProductID();
CacheItem cacheItem = new CacheItem(updatedProduct);
// Trigger add notifications
CacheItemVersion version = cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
}
// Query for required operation
val query = "SELECT Values FROM FQN.Product WHERE Category = ?"
val queryCommand = new QueryCommand(query)
queryCommand.setParameters(Map("Category" -> "Beverages"))
// Create continuous query
val continuousQuery = new ContinuousQuery(queryCommand)
val listener = new CQListener()
val eventType = List(EventType.ItemAdded, EventType.ItemRemoved, EventType.ItemUpdated)
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, EventDataFilter.None)
// Register continuous query on server
cache.getMessagingService.registerCQ(continuousQuery)
val reader = cache.getSearchService.executeReader(queryCommand)
if (reader.getFieldCount > 0)
{
while (reader.read)
{
val result = reader.getValue(1, classOf[Product])
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
val updatedProduct = Product()
updatedProduct.setProductId(1001)
updatedProduct.setProductName("Tea")
val key = "Product:" + updatedProduct.getProductId
val cacheItem = new CacheItem(updatedProduct)
// Trigger add notifications
val version = cache.insert(key, cacheItem)
// This will add item to the result set as it matches query criteria
// Precondition: Cache is already connected
// Query for required operation
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
// Register continuous query on server
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
// Trigger add notifications
let version = await this.cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
# Query for required operation
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
# Create continuous query
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
# Item remove notification
# EventDataFilter.Metadata returns cache keys + item metadata on updation
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
# Register continuous query on server
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
# Perform operations
else:
# None query result set returned
print("Query result is None")
# Update Product data in cache to trigger callback
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
# Trigger add notifications
version = cache.insert(key, cache_item)
# This will add item to the result set as it matches query criteria
Step 3: Unregister Notifications from Continuous Query
Notifications can be unregistered from Continuous Query when they are no longer required in application. You can unregister notifications for a specific event type, if multiple event types have been registered using UnRegisterNotification
method.
For example, if ItemAdded
and ItemRemoved
event types were registered but your business logic no longer requires events for ItemAdded
, you can specifically unregister notifications for ItemAdded
events.
// Unregister notifications for ItemAdded events from continuous query
cQuery.UnRegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
continuousQuery.removeDataModificationListener(new QueryModificationListener listener, EnumSet<EventType.ADDED> eventType);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(CQListener(), List(EventType.ItemAdded))
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
Step 4: Unregister Continuous Query from Server
Once the application is no more interested in receiving notifications for changes in a query result set, the registered Continuous Query should be unregistered from server.
UnregisterCQ
takes as argument an object of ContinuousQuery
to unregister the callbacks which are no more fired after this call.
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
// Unregister cq from server
cache.getMessagingService().unRegisterCQ(continuousQuery);
// Unregister cq from server
cache.getMessagingService.unRegisterCQ(cQuery)
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)
Additional Resources
NCache provides a sample application for Continuous Queries on GitHub.
See Also
SQL Reference for NCache
Event Notifications in Cache
Pub/Sub Messaging