Kontinuierliche Abfrage verwenden
Vorausgesetzt, Sie haben die erforderlichen durchsuchbaren Attribute indiziert, müssen Sie nun Continuous Query in Ihrer Anwendung implementieren. Unter Berücksichtigung des Zwecks kontinuierlicher Abfragen müssen Sie zunächst alle Rückrufe definieren, die ausgeführt werden müssen, sobald sich die Ergebnismenge Ihrer Abfrage in irgendeiner Weise ändert. Dann müssen wir die kontinuierliche Abfrage beim Cache-Server registrieren.
Wenn alle Ihre Anwendungen keine Nachverfolgung von Abfrageergebnissen erfordern, sollten Sie nicht nur die Benachrichtigungen abmelden, sondern auch die Abfrage aus Ihrem Cache abmelden.
Voraussetzungen:
- Lernen Sie die Standardvoraussetzungen kennen, die für die Arbeit mit allen erforderlich sind NCache Weitere Informationen zu clientseitigen Funktionen finden Sie auf der angegebenen Seite Clientseitige API-Voraussetzungen.
- Die Indizierung für durchsuchbare Objekte und ihre Attribute muss zuerst konfiguriert werden, wie in erläutert Konfigurieren von Abfrageindizes im Administratorhandbuch.
- Der Cache sollte einige Daten enthalten, die sich auf konfigurierte Attribute beziehen.
- Einzelheiten zur API finden Sie unter: ICache, Ereignisdatenfilter, Ereignistyp, ExecuteReader, RegistrierenCQ, UnRegisterCQ, UnRegisterNotification, ContinuousQuery, Registrierungsbenachrichtigung
, QueryDataNotificationCallback, ICacheReader, FieldCount, Lesen Sie mehr, Insert, CQEventArg, Getvalue.
- Lernen Sie die Standardvoraussetzungen kennen, die für die Arbeit mit allen erforderlich sind NCache Weitere Informationen zu clientseitigen Funktionen finden Sie auf der angegebenen Seite Clientseitige API-Voraussetzungen.
- Die Indizierung für durchsuchbare Objekte und ihre Attribute muss zuerst konfiguriert werden, wie in erläutert Konfigurieren von Abfrageindizes im Administratorhandbuch.
- Der Cache sollte einige Daten enthalten, die sich auf konfigurierte Attribute beziehen.
- Einzelheiten zur API finden Sie unter: Cache-Speicher, Ereignisdatenfilter, Ereignistyp, AusführenLeser, registrierenCQ, unRegisterCQ, entfernteatamodificificationListener, CacheItem, CacheItemVersion, einfügen, CQEventArg, GetEventType, lesen, Wert erhalten, ContinuousQuery, addDataModificationListener, getFieldCount, CacheReader.
- Lernen Sie die Standardvoraussetzungen kennen, die für die Arbeit mit allen erforderlich sind NCache Weitere Informationen zu clientseitigen Funktionen finden Sie auf der angegebenen Seite Clientseitige API-Voraussetzungen.
- Die Indizierung für durchsuchbare Objekte und ihre Attribute muss zuerst konfiguriert werden, wie in erläutert Konfigurieren von Abfrageindizes im Administratorhandbuch.
- Der Cache sollte einige Daten enthalten, die sich auf konfigurierte Attribute beziehen.
- Einzelheiten zur API finden Sie unter: Cache-Speicher, Ereignisdatenfilter, Ereignistyp, AusführenLeser, registrierenCQ, unRegisterCQ, entfernteatamodificificationListener, CacheItem, einfügen, GetEventType, Wert erhalten, lesen, getFieldCount, ContinuousQuery, addDataModificationListener.
- Lernen Sie die Standardvoraussetzungen kennen, die für die Arbeit mit allen erforderlich sind NCache Weitere Informationen zu clientseitigen Funktionen finden Sie auf der angegebenen Seite Clientseitige API-Voraussetzungen.
- Die Indizierung für durchsuchbare Objekte und ihre Attribute muss zuerst konfiguriert werden, wie in erläutert Konfigurieren von Abfrageindizes im Administratorhandbuch.
- Der Cache sollte einige Daten enthalten, die sich auf konfigurierte Attribute beziehen.
- Einzelheiten zur API finden Sie unter: get_event_type, ausführen_reader, register_cq,
un_register_cq, Remove_data_modification_listener, CacheItem, einfügen, get_field_count, add_data_modification_listener, ContinuousQuery, lesen, Wert erhalten, Ereignistyp, CQEventArg.
Schritt 1: Registrieren Sie den Rückruf für Ereignisse
Angenommen, Sie haben die erforderlichen durchsuchbaren Attribute indiziert, können Sie Continuous Query in Ihrer Anwendung implementieren. Dazu müssen Sie alle Rückrufe definieren, die ausgeführt werden müssen, sobald die Ergebnismenge Ihrer Abfrage geändert wird. Dann müssen wir die kontinuierliche Abfrage beim Cache-Server registrieren.
Ein Rückruf kann für mehrere Ereignisse registriert werden.
public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
Console.WriteLine($"Item with key '{key}' has been added to resut set of continuous query");
break;
case EventType.ItemUpdated:
Console.WriteLine($"Item with key '{key}' has been updated in the resut set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
Console.WriteLine($"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'");
}
break;
case EventType.ItemRemoved:
Console.WriteLine($"Item with key '{key}' has been removed from resut set of continuous query");
break;
}
}
public static void onChangeInQueryResultSet(String key, CQEventArg arg) {
switch (arg.getEventType()) {
case ItemAdded:
System.out.println("Item with key '" + key + "' has been added to result set of continuous query");
break;
case ItemUpdated:
System.out.println("Item with key '" + key + "' has been updated in the result set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.getItem() != null) {
Product updatedProduct = (Product) arg.getItem().getValue(Product.class);
System.out.println("Updated product '" + updatedProduct.getProductName() + "' with key '" + key + "' has ID '" + updatedProduct.getProductID() + "'");
}
break;
case ItemRemoved:
System.out.println("Item with key '" + key + "' has been removed from result set of the continuous query");
break;
}
}
// This is an async method
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
// Key has been added to the cache
break;
case ncache.ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
// Perform operations accordingly
}
break;
case ncache.ItemRemoved:
// key has been removed from the cache
break;
}
}
# Precondition: Cache is already connected
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
# Key has been added to the cache
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Key has been updated in the cache
# Get updated product object
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
# Perform operations accordingly
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
# key has been removed from the cache
print(key + " removed from cache")
Note
Um sicherzustellen, dass der Vorgang ausfallsicher ist, wird empfohlen, alle potenziellen Ausnahmen in Ihrer Anwendung zu behandeln, wie in erläutert Umgang mit Fehlern.
Schritt 2: Abfrage und Benachrichtigungen registrieren
Nachdem die Rückrufe registriert sind, erstellen Sie eine fortlaufende Abfrage, die die Kriterien für die Ergebnismenge angibt, deren Ereignisse ausgelöst werden. Diese Abfrage wird beim Server registriert.
Sobald Continuous Query erstellt wurde, werden die vordefinierten Rückrufe mit der Abfrage registriert. Die Rückrufe werden gem EventType
und EventDataFilter
.
Die Dauerabfrage kann nun mit auf dem Server registriert werden RegisterCQ
. Sie können diese Methode in Ihrer Anwendung mehrmals verwenden, um Benachrichtigungen über eine Änderung im Datensatz Ihrer Abfrage zu erhalten.
Alle Änderungen in Cache-Ereignisbenachrichtigungen werden entsprechend dem Ereignistyp ausgelöst. Zum Abfragen von zwischengespeicherten Daten, ExecuteReader
führt die Abfrage aus und die generierte Ergebnismenge wird dann auf der Clientseite Stück für Stück gelesen.
Sie können Ereignisse auslösen, indem Sie Cache-Daten so ändern, dass sie sich auf den Ergebnissatz auswirken. Das Codebeispiel aktualisiert ein vorhandenes Cache-Element, sodass es dem Abfrageergebnissatz hinzugefügt wird, wodurch ein ausgelöst wird ItemAdded
Ereignis.
Warnung
Wenn die Verbindung zwischen einem Server und einem Client unterbrochen wird, werden alle innerhalb dieser Dauer ausgelösten Ereignisse nicht vom Client empfangen.
// Query for required operation
string query = "SELECT $VALUE$ FROM Alachisoft.NCache.Samples.Data.Product WHERE Category = ?";
// Create query command and add parameters
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Item add, update, remove notification
// EventDataFilter.DataWithMetadata returns the cache keys added
cQuery.RegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
// Query for the desired operation
String query = "SELECT $VALUE$ FROM CQ.Product WHERE productId = ?";
// Create query command
QueryCommand queryCommand = new QueryCommand(query);
// Set parameters
HashMap<String, Object> parameters = queryCommand.getParameters();
parameters.put("productId", "Beverages");
// Create Continuous Query
ContinuousQuery cQuery = new ContinuousQuery(queryCommand);
// Register notifications for continuous query (ItemAdded, ItemUpdated, and ItemRemoved events)
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved), EventDataFilter.DataWithMetadata);
// Register the continuous query on the server
cache.getMessagingService().registerCQ(cQuery);
System.out.println("Registered Successfully");
// Precondition: Cache is already connected
// Query for required operation
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
// Register continuous query on server
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
// Trigger add notifications
let version = await this.cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
# Query for required operation
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
# Create continuous query
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
# Item remove notification
# EventDataFilter.Metadata returns cache keys + item metadata on updation
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
# Register continuous query on server
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
# Perform operations
else:
# None query result set returned
print("Query result is None")
# Update Product data in cache to trigger callback
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
# Trigger add notifications
version = cache.insert(key, cache_item)
# This will add item to the result set as it matches query criteria
Schritt 3: Benachrichtigungen von Continuous Query abmelden
Benachrichtigungen können von Continuous Query abgemeldet werden, wenn sie in der Anwendung nicht mehr benötigt werden. Sie können die Registrierung von Benachrichtigungen für einen bestimmten Ereignistyp aufheben, wenn mehrere Ereignistypen mithilfe von registriert wurden UnRegisterNotification
Methode.
Zum Beispiel, wenn ItemAdded
und ItemRemoved
Ereignistypen wurden registriert, aber Ihre Geschäftslogik erfordert keine Ereignisse mehr ItemAdded
, können Sie Benachrichtigungen gezielt abmelden ItemAdded
Veranstaltungen.
// Unregister notifications for ItemAdded events from continuous query
cQuery.UnRegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved));
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
Schritt 4: Continuous Query vom Server abmelden
Sobald die Anwendung kein Interesse mehr daran hat, Benachrichtigungen über Änderungen in einem Abfrageergebnissatz zu erhalten, sollte die registrierte kontinuierliche Abfrage vom Server abgemeldet werden.
UnregisterCQ
nimmt als Argument ein Objekt von ContinuousQuery
um die Rückrufe abzumelden, die nach diesem Anruf nicht mehr ausgelöst werden.
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
// Unregister cq from server
cache.getMessagingService().unRegisterCQ(cQuery);
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)
Weitere Informationen
NCache bietet eine Beispielanwendung für Continuous Queries an GitHub.
Siehe auch
.NETZ: Alachisoft.NCache.Laufzeitereignisse Namespace.
Java: com.alachisoft.ncache.Veranstaltungen Namespace.
Node.js: ContinuousQuery Klasse.
Python: ncache.runtime.caching.events Klasse.