For high transaction applications with complex architectures, continuously interchanging data causes a non-uniform load and delay in throughput. Data refining is a big challenge when it comes to large and complex business applications. To cater to this, stream processing is used to define a particular data flow by creating data streams. A typical stream application consists of several producers generating new events and a set of consumers processing these events.
One such popular application is Pub/Sub, where publishers input messages and subscribers receive the messages they subscribe to. However, in stream processing, Pub/Sub faces a few limitations like once the subscriber gets the message, the application no longer retains the message. So later, if another subscriber wants the messages from the publisher, the previous messages do not exist. Moreover, for incoming stream data, data filtration takes place at the client end (subscriber), and not at the server, making the application architecture complex.
To overcome these limitations of Pub/Sub, NCache comes with an efficient mechanism of processing data on the server side using Continuous Queries. Continuous Queries allow applications to be notified of all changes occurring in the data residing in the cache, meeting a certain criteria. This blog helps you understand the advantages of using Continuous Queries in stream processing with the help of a solution created for stream processing on GitHub.
Using Continuous Query for Stream Processing
The solution explains an e-commerce application, that processes thousands of customers each day for their online purchases. If you look at the diagram below, the customers of all kinds and categories are added in the application. To make the processing of customers efficient, the unfiltered customers are categorized and filtered as “Gold”, “Silver” and “Bronze” customers based on the number of their orders, using continuous queries.
Continuous query allows applications to receive notifications when a data meeting certain criteria changes inside the cache and the criteria is specified using SQL commands. For example, if an application wants to tag customers with the higher number of orders as ‘Gold Customers’, all it needs to do is register an SQL command criteria hence providing a callback. This callback is fired on any change occurring in the result set fulfilling the criteria. Once the callback is invoked, the application can categorize these customers as “Gold Customers” using Tags.

Figure 1: Stream Processing using Continuous Query
Similarly, the application can create multiple categories by registering multiple CQs, each with its own criteria and a callback. This way the application only gets the filtered data it is interested in. The filtered data can then further be analyzed according to the business requirements such as providing discounts to the high-end customers based upon the customer’s category.
Events are triggered if any of the following data modification actions take place in the cache:
- Add: Adding a new item to the cache that fulfills the query criteria
- Update: Updating an existing item in the query result set.
- Remove: Removing item from cache or updating any existing cached item such that it causes item removal from query result set.
Let us walk through a quick code example of using stream processing in the cache with continuous query. In this example a continuous query is executed on the data where the orders that are greater than 10 are added to the “Gold Customers” category. Moreover, an event is triggered on every item added to the query result set.
1 2 3 4 5 6 7 8 9 10 11 |
string query = SELECT $VALUE$ FROM Models.Customer WHERE OrdersCount >= ?; var queryCommand = new QueryCommand (query); queryCommand.Parameters.Add("OrdersCount", 10); var contQuery = new ContinuousQuery (queryCommand); // EventDataFilter.None returns the cache keys added cQuery.RegisterNotification (new QueryDataNotificationCallback (QueryItemCallBackForGoldCustomers), EventType.ItemAdded, EventDataFilter.None); cache.MessagingService.RegisterCQ(contQuery); // Register callback for event notifications in the result set |
NCache Details Continuous Query and Pub/Sub
Continuous Query keeps the data that Pub/Sub does not
Now the data filtered through continuous query (for customers with orders >10) is tagged as “Gold Customers” and is updated in the cache. Look at the code below to see how it’s done.
1 2 3 4 5 6 7 8 9 10 |
// A callback for previously executed query private void QueryItemCallBackForGoldCustomers (string key, CQEventArg arg) { var cacheItem = _cache.GetCacheItem(key); cacheItem.Expiration = new Expiration(ExpirationType.None); Tag[] tags = new Tag[1]; tags[0] = new Tag("GoldCustomers"); cacheItem.Tags = tags; cache.Insert(key, cacheItem); } |
Continuous query keeps the data preserved in the cache even after processing. This way, it solves the problem faced with Pub/Sub for continuously emerging data that is multiple applications publishing the data into NCache messaging layer. Hence, multiple subscribers receive the data and have no reliable data storage as the messages are removed from the message bus once received. The data is stored either by the application or by adding a new data source which is a far more complex scenario. Continuous query on the other hand, makes sure that there is no data loss, thus, saving you all the extra effort of manually persisting your data.
NCache Details Download NCache Edition Comparison
Continuous Query Enables Application Decoupling through Powerful Filtration
Large complex applications can have various groupings based on their architectures, for example, with 10 applications running, two of them could be dealing with the Gold customers’ dataset whereas the other two could be dealing with the Silver customers’ dataset. In such cases you would want a separate business logic for each dataset where data is filtered according to each application’s needs for stream processing. Hence, such large complex applications need decoupling as dependency of applications on each other causes huge performance bottlenecks as well as increased application complexity.
Continuous query very efficiently filters your applications’ data with the help of fairly sophisticated SQL statements, thus no application overlaps with the other applications. This decoupling becomes of great use in a microservices architecture where each service is running on a separate application stack. Every microservice gets and processes its own data without causing any dependency. This level of data filtration and application decoupling cannot be achieved using Pub/Sub.
Figure 2 shows various client applications dealing with their respective datasets in decoupled architecture using NCache continuous queries.

Figure 2: Decoupling between Applications
Fetching Data Using Tags
Tags in NCache are added qualifiers for the data that are used to categorize the data based on them. For large data sets such as the scenario mentioned, tags are really helpful for fetching relevant data instead of searching the whole cache for the data. If a customer falls under the category of “Gold Customers”, a tag is added with it for quick retrieval. Based on these categories, the customers can be provided added benefits such as discounts, coupons etc. NCache provides various flexible ways for fetching data using tags, explained thoroughly in the documentation.
Now let us look at the code example of tags associated with the “Gold Customers”. These customers can be offered coupons or premium services.
1 2 3 4 5 6 7 8 9 10 11 |
string key = $"Customers:{customer.CustomerID}"; var cacheItem = new CacheItem (customer); Tag[] tags = new Tag[2]; tags[0] = new Tag ("Gold Customers");] cacheItem.Tags = tags; CacheItemVersion version = cache.Insert(key, cacheItem); // Retrieve the cache items with the tag for processing ICollection retrievedKeys = cache.SearchService.GetKeysByTag(tags[0]); |
Expiring Cache Data
NCache allows cache data expiration that invalidates the data after a specific interval and then removes it from the cache at a clean interval.
NCache provides two types of expirations:
In case of customers, expiration is added to the items which do not fall under any of the three categories i.e. Gold, Silver or Bronze. All the other customers with orders less than 4 are added with an expiration time interval and are removed from the cache for no further analysis. However, the expiration of any customer belonging to either of the categories is set to None to persist the data in the cache unless manually removed. This is how you can add 15 seconds expiration to a customer with less than 4 orders.
1 2 3 4 5 |
var cacheItem = new CacheItem(customers[0]); // Set Expiration TimeSpan cacheItem.Expiration = new Expiration(ExpirationType.Sliding, new TimeSpan(0, 0, 15)); cache.Insert("CustomerID:" + customers[0].Id, cacheItem); |
Why to Use NCache?
NCache is 100% .NET/.NET Core, in-memory distributed caching solution and has been leading in the market for a long time. It is extremely fast and linearly scalable, and it efficiently removes the performance bottlenecks for your application by caching the data. It saves you the network cost by reducing expensive network trips. NCache provides you with a rich set of features such as Continuous Query that makes the data analysis very fast and efficient along with other features to facilitate smooth flow of your application.