連続クエリの使用
必要な検索可能属性のインデックスを作成したと仮定すると、アプリケーションに継続的クエリを実装する必要があります。継続的クエリの目的を念頭に置いて、最初に行う必要があるのは、クエリの結果セットが何らかの方法で変更されたときに実行する必要があるすべてのコールバックを定義することです。次に、継続クエリをキャッシュ サーバーに登録する必要があります。
すべてのアプリケーションでクエリ結果セットの追跡が必要ない場合は、通知の登録を解除するだけでなく、キャッシュからクエリの登録を解除する必要があります。
前提条件
- すべてを使用するために必要な標準的な前提条件について学習するには NCache クライアント側の機能については、次のページを参照してください。 クライアント側 API の前提条件.
- 検索可能なオブジェクトとその属性のインデックス作成は、で説明されているように最初に構成する必要があります。 クエリインデックスの構成 管理者ガイド。
- キャッシュには、構成された属性に関連するデータが含まれている必要があります。
- APIの詳細については、以下を参照してください。 Iキャッシュ, イベントデータフィルター, イベントタイプ, 実行リーダー, CQ を登録する, CQ の登録を解除する, 登録解除通知, 連続クエリ, 登録通知
, クエリデータ通知コールバック, ICacheReader, フィールド数, 読む, インセット, CQイベント引数, GetValue.
- すべてを使用するために必要な標準的な前提条件について学習するには NCache クライアント側の機能については、次のページを参照してください。 クライアント側 API の前提条件.
- 検索可能なオブジェクトとその属性のインデックス作成は、で説明されているように最初に構成する必要があります。 クエリインデックスの構成 管理者ガイド。
- キャッシュには、構成された属性に関連するデータが含まれている必要があります。
- APIの詳細については、以下を参照してください。 キャッシュ, イベントデータフィルター, イベントタイプ, 実行リーダー, 登録CQ, CQ の登録を解除する, RemoveDataModificationListener, キャッシュ項目, キャッシュアイテムのバージョン, insert, CQイベント引数, getEventType, read, getValue, 連続クエリ, addDataModificationListener, getフィールドカウント, キャッシュリーダー.
- すべてを使用するために必要な標準的な前提条件について学習するには NCache クライアント側の機能については、次のページを参照してください。 クライアント側 API の前提条件.
- 検索可能なオブジェクトとその属性のインデックス作成は、で説明されているように最初に構成する必要があります。 クエリインデックスの構成 管理者ガイド。
- キャッシュには、構成された属性に関連するデータが含まれている必要があります。
- APIの詳細については、以下を参照してください。 キャッシュ, イベントデータフィルター, イベントタイプ, 実行リーダー, 登録CQ, CQ の登録を解除する, RemoveDataModificationListener, キャッシュ項目, insert, getEventType, getValue, read, getフィールドカウント, 連続クエリ, addDataModificationListener.
- すべてを使用するために必要な標準的な前提条件について学習するには NCache クライアント側の機能については、次のページを参照してください。 クライアント側 API の前提条件.
- 検索可能なオブジェクトとその属性のインデックス作成は、で説明されているように最初に構成する必要があります。 クエリインデックスの構成 管理者ガイド。
- キャッシュには、構成された属性に関連するデータが含まれている必要があります。
- APIの詳細については、以下を参照してください。 get_event_type, 実行リーダー, レジスターcq,
un_register_cq, 削除_データ_変更_リスナー, キャッシュ項目, insert, get_field_count, add_data_modification_listener, 連続クエリ, read, 値の取得, イベントタイプ, CQイベント引数.
ステップ1:イベントのコールバックを登録する
必要な検索可能な属性にインデックスを付けたと仮定すると、アプリケーションに連続クエリを実装できます。 これには、クエリの結果セットが変更された後に実行する必要があるすべてのコールバックを定義する必要があります。 次に、ContinuousQueryをキャッシュサーバーに登録する必要があります。
コールバックは、複数のイベントに登録できます。
public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
Console.WriteLine($"Item with key '{key}' has been added to resut set of continuous query");
break;
case EventType.ItemUpdated:
Console.WriteLine($"Item with key '{key}' has been updated in the resut set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
Console.WriteLine($"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'");
}
break;
case EventType.ItemRemoved:
Console.WriteLine($"Item with key '{key}' has been removed from resut set of continuous query");
break;
}
}
public static void onChangeInQueryResultSet(String key, CQEventArg arg) {
switch (arg.getEventType()) {
case ItemAdded:
System.out.println("Item with key '" + key + "' has been added to result set of continuous query");
break;
case ItemUpdated:
System.out.println("Item with key '" + key + "' has been updated in the result set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.getItem() != null) {
Product updatedProduct = (Product) arg.getItem().getValue(Product.class);
System.out.println("Updated product '" + updatedProduct.getProductName() + "' with key '" + key + "' has ID '" + updatedProduct.getProductID() + "'");
}
break;
case ItemRemoved:
System.out.println("Item with key '" + key + "' has been removed from result set of the continuous query");
break;
}
}
// This is an async method
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
// Key has been added to the cache
break;
case ncache.ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
// Perform operations accordingly
}
break;
case ncache.ItemRemoved:
// key has been removed from the cache
break;
}
}
# Precondition: Cache is already connected
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
# Key has been added to the cache
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Key has been updated in the cache
# Get updated product object
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
# Perform operations accordingly
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
# key has been removed from the cache
print(key + " removed from cache")
Note
操作がフェイルセーフであることを保証するために、で説明されているように、アプリケーション内の潜在的な例外を処理することをお勧めします。 失敗の処理.
ステップ2:クエリと通知を登録する
コールバックが登録されたら、イベントが発生する結果セットの基準を指定する連続クエリを作成します。 このクエリはサーバーに対して登録されます。
連続クエリが作成されると、事前定義されたコールバックがクエリに登録されます。 コールバックはに従って登録されます EventType
および EventDataFilter
.
連続クエリは、を使用してサーバーに登録できるようになりました RegisterCQ
。 アプリケーション内でこのメソッドを複数回使用すると、クエリのデータセットの変更に関する通知を受け取ることができます。
キャッシュイベント通知の変更は、イベントタイプに応じてトリガーされます。 キャッシュされたデータをクエリするには、 ExecuteReader
クエリを実行すると、生成された結果セットがクライアント側でチャンクごとに読み取られます。
結果セットに影響を与えるようにキャッシュ データを変更することで、イベントをトリガーできます。 コード サンプルでは、既存のキャッシュ アイテムがクエリ結果セットに追加されるように更新し、それによって ItemAdded
イベント。
警告
サーバーとクライアント間の接続が切断された場合、この期間内に発生したイベントはクライアントによって受信されません。
// Query for required operation
string query = "SELECT $VALUE$ FROM Alachisoft.NCache.Samples.Data.Product WHERE Category = ?";
// Create query command and add parameters
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Item add, update, remove notification
// EventDataFilter.DataWithMetadata returns the cache keys added
cQuery.RegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
// Query for the desired operation
String query = "SELECT $VALUE$ FROM CQ.Product WHERE productId = ?";
// Create query command
QueryCommand queryCommand = new QueryCommand(query);
// Set parameters
HashMap<String, Object> parameters = queryCommand.getParameters();
parameters.put("productId", "Beverages");
// Create Continuous Query
ContinuousQuery cQuery = new ContinuousQuery(queryCommand);
// Register notifications for continuous query (ItemAdded, ItemUpdated, and ItemRemoved events)
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved), EventDataFilter.DataWithMetadata);
// Register the continuous query on the server
cache.getMessagingService().registerCQ(cQuery);
System.out.println("Registered Successfully");
// Precondition: Cache is already connected
// Query for required operation
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
// Register continuous query on server
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
// Trigger add notifications
let version = await this.cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
# Query for required operation
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
# Create continuous query
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
# Item remove notification
# EventDataFilter.Metadata returns cache keys + item metadata on updation
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
# Register continuous query on server
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
# Perform operations
else:
# None query result set returned
print("Query result is None")
# Update Product data in cache to trigger callback
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
# Trigger add notifications
version = cache.insert(key, cache_item)
# This will add item to the result set as it matches query criteria
ステップ3:継続的なクエリから通知の登録を解除する
通知は、アプリケーションで不要になった場合、継続的クエリから登録を解除できます。 複数のイベント タイプが登録されている場合は、特定のイベント タイプの通知を登録解除できます。 UnRegisterNotification
方法。
たとえば、 ItemAdded
および ItemRemoved
イベントタイプは登録されましたが、ビジネスロジックで次のイベントは不要になりました ItemAdded
、具体的に通知の登録を解除できます。 ItemAdded
イベント。
// Unregister notifications for ItemAdded events from continuous query
cQuery.UnRegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved));
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
ステップ4:サーバーから継続クエリの登録を解除する
アプリケーションがクエリ結果セットの変更に関する通知を受信する必要がなくなったら、登録された継続的クエリをサーバーから登録解除する必要があります。
UnregisterCQ
引数としてオブジェクトを受け取ります ContinuousQuery
この呼び出し後に起動されなくなったコールバックを登録解除します。
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
// Unregister cq from server
cache.getMessagingService().unRegisterCQ(cQuery);
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)
その他のリソース
NCache の連続クエリのサンプルアプリケーションを提供します GitHubの.
も参照してください
。ネット: Alachisoft.NCache.ランタイム.イベント 名前空間
Java: comの。alachisoft.ncache.イベント 名前空間
Node.js: 連続クエリ とに提供されます。
Python: ncache.runtime.caching.events とに提供されます。