• Webinars
  • Docs
  • Download
  • Blogs
  • Contact Us
Try Free
Show / Hide Table of Contents

How to use Continuous Query in Cache

Note

This feature is only available in NCache Enterprise Edition.

Assuming that you have indexed the required searchable attributes, you are now required to implement continuous query in your application. Keeping in mind the purpose of continuous queries, the first thing you need to do is to define all the callbacks that need to be executed once the result set of your query is in any way changed. Then, we need to register the continuous query with the cache server.

If all your applications don't require tracking of any query result set, then you should not only unregister notifications but also unregister the query from your cache.

Prerequisites

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
  • Cache should have some data related to configured attributes.
  • For API details, refer to: ICache, EventDataFilter, EventType, ExecuteReader, RegisterCQ, UnRegisterCQ, UnRegisterNotification, ContinuousQuery, RegisterNotification , QueryDataNotificationCallback, ICacheReader, FieldCount, Read, Insert, CQEventArg, GetValue.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
  • Cache should have some data related to configured attributes.
  • For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, CacheItemVersion, insert, CQEventArg, getEventType, read, getValue, ContinuousQuery, addDataModificationListener, getFieldCount, CacheReader.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
  • Cache should have some data related to configured attributes.
  • For API details refer to: Cache, ContinuousQuery, EventType, EventDataFilter, CacheItem, getValue, read, getFieldCount.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
  • Cache should have some data related to configured attributes.
  • For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, insert, getEventType, getValue, read, getFieldCount, ContinuousQuery, addDataModificationListener.
  • To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
  • Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
  • Cache should have some data related to configured attributes.
  • For API details refer to: get_event_type, execute_reader, register_cq, un_register_cq, remove_data_modification_listener, CacheItem, insert, get_field_count, add_data_modification_listener, ContinuousQuery, read, get_value, EventType, CQEventArg.

Step 1: Register Callback for Events

Assuming that you have indexed the required searchable attributes, you can implement Continuous Query in your application. This requires you to define all the callbacks that need to be executed once the result set of your query is modified. Then, we need to register the Continuous Query with the cache server.

A callback can be registered for multiple events.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
static void QueryItemCallBack(string key, CQEventArg arg)
{
    switch (arg.EventType)
    {
        case EventType.ItemAdded:
            // "key" has been added to cache
            break;

        case EventType.ItemUpdated:
            // "key" has been updated in cache
            // Get updated Product object
            if (arg.Item != null)
            {
                Product updatedProduct = arg.Item.GetValue<Product>();
                // Perform operations
            }
            break;

        case EventType.ItemRemoved:
            // "key" has been removed from cache
            break;
    }
}
public void queryItemCallback (String key, CQEventArg arg)
{
    switch (arg.getEventType())
    {
        case ItemAdded:
            // Key has been added to the cache
        break;
        case ItemUpdated:
            // Key has been updated in the cache
            // Get updated product object

            if (arg.getItem() != null)
            {
                Product updatedProduct = arg.getItem().getValue(Product.class);
                // Perform operations accordingly
            }
        break;
        case ItemRemoved:
            // Key has been removed from the cache
        break;
    }
}
class CQListener extends QueryDataModificationListener {
  override def onQueryDataModified(key: String, eventArgs: CQEventArg): Unit = {
    eventArgs.getEventType match {
      case EventType.ItemAdded =>
      // Key has been added to the cache

      case EventType.ItemUpdated =>
        // 'key' has been updated in the cache
        // get the updated product
        if (eventArgs.getItem != null) {
          val updateProduct = eventArgs.getItem.getValue(classOf[Product])
          // perform operations
        }

      case EventType.ItemRemoved =>
      // 'key' has been removed from the cache
    }
  }
}
queryItemCallback(key, arg)
{
    switch (arg.getEventType())
    {
        case ncache.ItemAdded:
            // Key has been added to the cache
        break;
        case ncache.ItemUpdated:
            // Key has been updated in the cache
            // Get updated product object

            if (arg.getItem() != null)
            {
                let updatedProduct = arg.getItem().getValue(Product);
                // Perform operations accordingly
            }
        break;
        case ncache.ItemRemoved:
            // key has been removed from the cache
        break;
    }      
}
def query_item_callback(key: str, arg: ncache.CQEventArg):
    if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
        # Key has been added to the cache
        print(key + " added to cache")
    if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
        # Key has been updated in the cache
        # Get updated product object
        if arg.get_item() is not None:
            updated_product = arg.get_item().get_value(Product)
            # Perform operations accordingly
    if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
        # key has been removed from the cache
        print(key + " removed from cache")

Step 2: Register Query and Notifications

  1. After the callbacks are registered, create a Continuous Query, which specifies the criteria for the result set of which the events will be fired. This query will be registered against the server.

  2. Once Continuous Query has been created, the pre-defined callbacks are registered with the query. The callbacks are registered according to EventType and EventDataFilter.

  3. The continuous query can now be registered on the server using RegisterCQ(). You can use this method multiple times in your application to receive the notifications for a change in the dataset of your query.

  4. Any modifications in cache event notifications will be triggered according to the event type. For querying cached data, ExecuteReader executes the query and the result set generated is then read at client side, chunk by chunk.

  5. You can trigger events by modifying cache data such that it affects the result set. The code sample updates an existing cache item such that is added to the query result set, thereby firing an ItemAdded event.

Warning

If the connection breaks between a server and client, any events fired within this duration will not be received by the client.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
try
{
    // Precondition: Cache is already connected

    // Query for required operation
    string query = "SELECT $VALUE$ FROM FQN.Product WHERE Category = ?";

    var queryCommand = new QueryCommand(query);
    queryCommand.Parameters.Add("Category", "Beverages");

    // Create Continuous Query
    var cQuery = new ContinuousQuery(queryCommand);

    // Item add notification 
    // EventDataFilter.None returns the cache keys added
    cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded, EventDataFilter.None);

    // Item update notification 
    // EventDataFilter.DataWithMetadata returns cache keys + modified item + metadata on updation
    cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemUpdated, EventDataFilter.DataWithMetadata);

    // Item Remove notification 
    // EventDataFilter.Metadata returns cache keys + item metadata on updation
    cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemRemoved, EventDataFilter.DataWithMetadata);

    // Register continuousQuery on server 
    cache.MessagingService.RegisterCQ(cQuery);

    // Query Cached Data
    ICacheReader reader = cache.SearchService.ExecuteReader(queryCommand);

    // If resultset is not empty
    if (reader.FieldCount > 0)
    {
        while (reader.Read())
        {
            Product result = reader.GetValue<Product>(1);
            // Perform operations
        }
    }
    else
    {
        // Null query result set returned
    }

    // Update Product Data in Cache to trigger callback
    var updatedProduct = new Product()
    { 
        ProductID = 1001,
        ProductName = "Coffee",
        Category = "Beverages" // Complies with criteria
    };

    string key = $"Product:(updatedProduct.ProductID)";

    cache.Insert(key, updatedProduct);
    // This will add the item to the result set 
    // as it matches query criteria
}
catch (OperationFailedException ex)
{
    if (ex.ErrorCode == NCacheErrorCodes.INCORRECT_FORMAT)
    {
        // Make sure that the query format is correct
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any generic exception like ArgumentException, ArgumentNullException
}
 try
{
    // Precondition: Cache is already connected
​
    // Query for required operation
    String query = "SELECT Values FROM FQN.Product WHERE Category = ?";

    QueryCommand queryCommand = new QueryCommand(query);
    queryCommand.getParameters().put("Category", "Beverages");
​
    // Create continuous query
    com.alachisoft.ncache.client.ContinuousQuery continuousQuery = new com.alachisoft.ncache.client.ContinuousQuery(queryCommand);
​
    EventListener listener = new EventListener();

    EnumSet<com.alachisoft.ncache.runtime.events.EventType> eventType = EnumSet.of(com.alachisoft.ncache.runtime.events.EventType.ItemAdded, com.alachisoft.ncache.runtime.events.EventType.ItemRemoved, com.alachisoft.ncache.runtime.events.EventType.ItemUpdated);
​
​
    // Item remove notification
    // EventDataFilter.Metadata returns cache keys + item metadata on updation
    continuousQuery.addDataModificationListener(listener, eventType, EventDataFilter.None);
​
    // Register continuous query on server
    cache.getMessagingService().registerCQ(continuousQuery);
​
    CacheReader reader = cache.getSearchService().executeReader(queryCommand);
​
    if (reader.getFieldCount() > 0)
    {
        while (reader.read())
        {
            Product result = reader.getValue(1, Product.class);
            // Perform operations
        }
    }
    else
    {
        // Null query result set returned
    }
​
    // Update Product data in cache to trigger callback
    Product updatedProduct = new Product();
    updatedProduct.setProductID(1001);
    updatedProduct.setProductName("Tea");

    String key = "Product:" + updatedProduct.getProductID();

    CacheItem cacheItem = new CacheItem(updatedProduct);
​
    // Trigger add notifications
    CacheItemVersion version = cache.insert(key, cacheItem);
    // This will add item to the result set as it matches query criteria
}
catch (OperationFailedException ex)
{
    if (ex.getErrorCode() == NCacheErrorCodes.INCORRECT_FORMAT)
    {
         // Make sure that the query format is correct
    }
    else
    {
        // Exception can occur due to:
        // Connection Failures
        // Operation Timeout
        // Operation performed during state transfer
    }
}
catch (Exception ex)
{
    // Any generic exception like IllegalArgumentException or NullPointerException
}
try {
    // Precondition: Cache is already connected

    // Query for required operation
    val query = "SELECT Values FROM FQN.Product WHERE Category = ?"

    val queryCommand = new QueryCommand(query)
    queryCommand.setParameters(Map("Category" -> "Beverages"))

    // Create continuous query
    val continuousQuery = new ContinuousQuery(queryCommand)

    val listener = new CQListener()

    val eventType = List(EventType.ItemAdded, EventType.ItemRemoved, EventType.ItemUpdated)

    // Item remove notification
    // EventDataFilter.Metadata returns cache keys + item metadata on updation
    continuousQuery.addDataModificationListener(listener, eventType, EventDataFilter.None)

    // Register continuous query on server
    cache.getMessagingService.registerCQ(continuousQuery)

    val reader = cache.getSearchService.executeReader(queryCommand)

    if (reader.getFieldCount > 0)
    {
      while (reader.read)
      {
        val result = reader.getValue(1, classOf[Product])
        // Perform operations
      }
    }
    else
    {
      // Null query result set returned
    }
    // Update Product data in cache to trigger callback
    val updatedProduct = Product()
    updatedProduct.setProductId(1001)
    updatedProduct.setProductName("Tea")

    val key = "Product:" + updatedProduct.getProductId

    val cacheItem = new CacheItem(updatedProduct)

    // Trigger add notifications
    val version = cache.insert(key, cacheItem)
    // This will add item to the result set as it matches query criteria
}
catch {
    case exception: Exception => {
      // Handle any errors
    }
}
// This is an async method
try
{
    // Precondition: Cache is already connected
​
    // Query for required operation
    let query = "SELECT Values FROM FQN.Product WHERE Category = ?";

    var queryCommand = new QueryCommand(query);
    queryCommand.getParameters().put("Category", "Beverages");
​
    // Create continuous query
    let continuousQuery = new ncache.ContinuousQuery(queryCommand);
​
    let listener = new ncache.EventFilter();
    var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
​
    // Item remove notification
    // EventDataFilter.Metadata returns cache keys + item metadata on updation
    continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
​
    // Register continuous query on server
    await this.cache.getMessagingService().registerCQ(continuousQuery);
​
    let reader = await this.cache.getSearchService().executeReader(queryCommand);
​
    if (reader.getFieldCount() > 0)
    {
        while (reader.read())
        {
            let result = reader.getValue(1, Product);
            // Perform operations
        }
    }
    else
    {
        // Null query result set returned
    }
​
    // Update Product data in cache to trigger callback
    let updatedProduct = new Product();
    updatedProduct.setProductID(1001);
    updatedProduct.setProductName("Tea");

    let key = "Product:" + updatedProduct.getProductID();

    let cacheItem = new ncache.CacheItem(updatedProduct);
​
    // Trigger add notifications
    let version = await this.cache.insert(key, cacheItem);
    // This will add item to the result set as it matches query criteria
} 
catch (error)
{
    // Handle any errors
}
try:
    # Precondition: Cache is already connected

    # Query for required operation
    query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"

    query_command = ncache.QueryCommand(query)
    query_command.set_parameters({"Category": "Beverages"})

    # Create continuous query
    continuous_query = ncache.ContinuousQuery(query_command)

    event_type = [ncache.EventType.ITEM_REMOVED]

    # Item remove notification
    # EventDataFilter.Metadata returns cache keys + item metadata on updation
    continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)

    # Register continuous query on server
    cache.get_messaging_service().register_cq(continuous_query)

    reader = cache.get_search_service().execute_reader(query_command)

    if reader.get_field_count() > 0:
        while reader.read():
            result = reader.get_value(Product, 1)
            # Perform operations
    else:
        # None query result set returned
        print("Query result is None")

    # Update Product data in cache to trigger callback
    updated_product = Product()
    updated_product.set_product_id(1001)
    updated_product.set_product_name("Tea")

    key = "Product:" + updated_product.get_product_id()

    cache_item = ncache.CacheItem(updated_product)

    # Trigger add notifications
    version = cache.insert(key, cache_item)
    # This will add item to the result set as it matches query criteria
except Exception as exp:
    # Handle errors

Step 3: Unregister Notifications from Continuous Query

Notifications can be unregistered from Continuous Query when they are no longer required in application. You can unregister notifications for a specific event type if multiple event types have been registered using UnRegisterNotification method.

For example, if ItemAdded and ItemRemoved event types were registered but your business logic no longer requires events for ItemAdded, you specifically unregister notifications for ItemAdded events.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
// Unregister notifications for ItemAdded events only
cQuery.UnRegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
continuousQuery.removeDataModificationListener(new QueryModificationListener listener, EnumSet<EventType.ADDED> eventType);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(CQListener(), List(EventType.ItemAdded))
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)

Step 4: Unregister Continuous Query from Server

Once the application is no more interested in receiving notifications for changes in a query result set, the registered continuous query should be unregistered from server.

UnregisterCQ takes as argument an object of ContinuousQuery to unregister the callbacks which are no more fired after this call.

  • .NET/.NET Core
  • Java
  • Scala
  • Node.js
  • Python
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
 // Unregister cq from server
cache.getMessagingService().unRegisterCQ(continuousQuery);
// Unregister cq from server
cache.getMessagingService.unRegisterCQ(cQuery)
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)

Additional Resources

NCache provides a sample application for Continuous Queries on GitHub.

See Also

SQL Reference for NCache
Event Notifications in Cache
Pub/Sub Messaging

Back to top Copyright © 2017 Alachisoft