Using Continuous Query [Deprecated]
Assuming that you have indexed the required searchable attributes, you are now required to implement the Continuous Query in your application. Keeping in mind the purpose of Continuous Queries, the first thing you need to do is to define all the callbacks that need to be executed once the result set of your query is in any way changed. Then, we need to register the Continuous Query with the cache server.
If all your applications don't require tracking of any query result set, then you should not only unregister
notifications but also unregister the query from your cache.
Prerequisites
- To learn about the standard prerequisites required to work with all NCache client-side features, please refer to the given page on Client-Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: ICache, EventDataFilter, EventType, ExecuteReader, RegisterCQ, UnRegisterCQ, UnRegisterNotification, ContinuousQuery, RegisterNotification, QueryDataNotificationCallback, ICacheReader, FieldCount, Read, Insert, CQEventArg, GetValue.
- To learn about the standard prerequisites required to work with all NCache client-side features, please refer to the given page on Client-Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, CacheItemVersion, insert, CQEventArg, getEventType, read, getValue, ContinuousQuery, addDataModificationListener, getFieldCount, CacheReader.
- To learn about the standard prerequisites required to work with all NCache client-side features, please refer to the given page on Client-Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: get_event_type, execute_reader, register_cq,
un_register_cq, remove_data_modification_listener, CacheItem, insert, get_field_count, add_data_modification_listener, ContinuousQuery, read, get_value, EventType, CQEventArg.
- To learn about the standard prerequisites required to work with all NCache client-side features, please refer to the given page on Client-Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, insert, getEventType, getValue, read, getFieldCount, ContinuousQuery, addDataModificationListener.
- Create a new Console Application.
- Make sure that the data being added is serializable.
- Add NCache References by locating
%NCHOME%\NCache\bin\assembly\4.0
and adding Alachisoft.NCache.Web
and Alachisoft.NCache.Runtime
as appropriate.
- Include the
Alachisoft.NCache.Runtime.Events
and Alachisoft.NCache.Web.Caching
namespace in your application.
Step 1: Register Query and Notifications
First you need to create a Continuous Query, which specifies the criteria for the result set of which the events will be fired. This query will be registered against the server.
Once Continuous Query has been created, the pre-defined callbacks are registered with the query. The callbacks are registered according to EventType
and EventDataFilter
.
The Continuous Query can now be registered on the server using RegisterCQ
. You can use this method multiple times in your application to receive notifications for a change in the dataset of your query.
Any modifications in cache event notifications will be triggered according to the event type. For querying cached data, the ExecuteReader
executes the query and the result set generated is then read on the client-side, chunk by chunk.
You can trigger events by modifying cache data such that it affects the result set.
The code sample updates an existing cache item such that it is added to the query result set, thereby firing an ItemAdded
event.
Warning
If the connection breaks between a server and a client, any events fired within this duration will not be received by the client.
string query = "SELECT $VALUE$ FROM Alachisoft.NCache.Samples.Data.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
var cQuery = new ContinuousQuery(queryCommand);
cQuery.RegisterNotification(OnChangeInQueryResultSet, EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
cache.MessagingService.RegisterCQ(cQuery);
String query = "SELECT $VALUE$ FROM CQ.Product WHERE productId = ?";
QueryCommand queryCommand = new QueryCommand(query);
HashMap<String, Object> parameters = queryCommand.getParameters();
parameters.put("productId", "Beverages");
ContinuousQuery cQuery = new ContinuousQuery(queryCommand);
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved), EventDataFilter.DataWithMetadata);
cache.getMessagingService().registerCQ(cQuery);
System.out.println("Registered Successfully");
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
else:
print("Query result is None")
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
version = cache.insert(key, cache_item)
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
}
}
else
{
}
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
let version = await this.cache.insert(key, cacheItem);
public class CQ
{
static void Main(string[] args)
{
Cache cache = NCache.InitializeCache("mycache");
string queryString = "Select Product WHERE this.supplier=?";
Hashtable values = new Hashtable();
values.Add("supplier", "Carlos Diaz");
ContinuousQuery continuousQuery = new ContinuousQuery(queryString, values);
continuousQuery.RegisterNotification(new QueryDataNotificationCallback(ItemAddedCallBack), EventType.ItemAdded, EventDataFilter.None);
continuousQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.None);
continuousQuery.RegisterClearNotification(new ContinuousQueryClearCallback(CacheClear));
cache.RegisterCQ(continuousQuery);
}
}
Step 2: Register Callback for Events
Assuming that you have indexed the required searchable attributes, you can implement the Continuous Query in your application. This requires you to define all the callbacks that need to be executed once the result set of your query is modified. A callback can be registered for multiple events as shown below.
public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
Console.WriteLine($"Item with key '{key}' has been added to result set of continuous query");
break;
case EventType.ItemUpdated:
Console.WriteLine($"Item with key '{key}' has been updated in the result set of continuous query");
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
Console.WriteLine($"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'");
}
break;
case EventType.ItemRemoved:
Console.WriteLine($"Item with key '{key}' has been removed from result set of continuous query");
break;
}
}
public static void onChangeInQueryResultSet(String key, CQEventArg arg) {
switch (arg.getEventType()) {
case ItemAdded:
System.out.println("Item with key '" + key + "' has been added to result set of continuous query");
break;
case ItemUpdated:
System.out.println("Item with key '" + key + "' has been updated in the result set of continuous query");
if (arg.getItem() != null) {
Product updatedProduct = (Product) arg.getItem().getValue(Product.class);
System.out.println("Updated product '" + updatedProduct.getProductName() + "' with key '" + key + "' has ID '" + updatedProduct.getProductID() + "'");
}
break;
case ItemRemoved:
System.out.println("Item with key '" + key + "' has been removed from result set of the continuous query");
break;
}
}
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
print(key + " removed from cache")
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
break;
case ncache.ItemUpdated:
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
}
break;
case ncache.ItemRemoved:
break;
}
}
static void ItemAddedCallBack(string key, CQEventArg arg)
{
EventCacheItem item = arg.Item;
}
static void QueryItemCallBack(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemRemoved:
break;
case EventType.ItemUpdated:
break;
}
}
static void CacheClear()
{
Cache cache = null;
cache.Clear();
}
Note
To ensure the operation is fail-safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Step 3: Unregister Notifications from Continuous Query
Notifications can be unregistered from Continuous Query when they are no longer required in the application. You can unregister notifications for a specific event type if multiple event types have been registered using the UnRegisterNotification
method. For example, if ItemAdded
and ItemRemoved
event types were registered but your business logic no longer requires events for ItemAdded
, you can specifically unregister notifications for ItemAdded
events, while still keeping the add and remove notifications registered.
When a client unregisters a notification from a Continuous Query, it only impacts that specific client. Other clients with the same notifications remain unaffected, allowing independent management of subscriptions. However, this method cannot unregister all CQ notifications through this method. For that, a separate method UnRegisterCQ is used. In the following example, an event notification for an item added notification is unregistered from a Continuous Query.
cQuery.UnRegisterNotification((OnChangeInQueryResultSet), EventType.ItemAdded);
cQuery.removeDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved));
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
Step 4: Unregister Continuous Query from Server
Once the application is no longer interested in receiving notifications for changes in a query result set, the registered Continuous Query should be unregistered from the server using the UnRegisterCQ method. Using this method, the result set isn’t immediately removed from the server. If it is the last client, the result set will be removed from the cache. Otherwise, the result set remains in the cache, but that specific client will stop receiving notifications for it. Essentially, it depends on the number of clients registered for that result set.
The UnregisterCQ
method takes a ContinuousQuery
object as an argument to unregister the callbacks that are no longer fired after this call. The following code example demonstrates how to unregister a Continuous Query from the cache server.
cache.MessagingService.UnRegisterCQ(cQuery);
cache.getMessagingService().unRegisterCQ(cQuery);
cache.get_messaging_service().un_register_cq(c_query)
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
Additional Resources
NCache provides a sample application for Continuous Queries on GitHub.
See Also
.NET: Alachisoft.NCache.Runtime.Events namespace.
Java: com.alachisoft.ncache.events namespace.
Python: ncache.runtime.caching.events class.
Node.js: ContinuousQuery class.