Utilizzo della query continua
Supponendo che tu abbia indicizzato gli attributi ricercabili richiesti, ora ti viene richiesto di implementare la query continua nella tua applicazione. Tenendo presente lo scopo delle query continue, la prima cosa che devi fare è definire tutti i callback che devono essere eseguiti una volta modificato in qualche modo il set di risultati della query. Quindi, dobbiamo registrare la query continua con il server cache.
Se tutte le tue applicazioni non richiedono il monitoraggio di alcun set di risultati di query, non solo dovresti annullare la registrazione delle notifiche ma anche annullare la registrazione della query dalla tua cache.
Prerequisiti
- Per conoscere i prerequisiti standard richiesti per lavorare con all NCache Per le funzionalità lato client fare riferimento alla pagina specificata Prerequisiti dell'API lato client.
- L'indicizzazione degli oggetti ricercabili e dei relativi attributi deve essere prima configurata come spiegato in Configurazione degli indici di query nella Guida dell'amministratore.
- La cache dovrebbe avere alcuni dati relativi agli attributi configurati.
- Per i dettagli dell'API, fare riferimento a: ICache, EventDataFilter, Tipo di evento, Esegui Lettore, RegistratiCQ, Annullare la registrazione CQ, Notifica di annullamento della registrazione, Query continua, Notifica di registrazione
, QueryDataNotification Callback, ICacheReader, Conteggio dei campi, Leggi, inserire, CQEventArg, Ottieni valore.
- Per conoscere i prerequisiti standard richiesti per lavorare con all NCache Per le funzionalità lato client fare riferimento alla pagina specificata Prerequisiti dell'API lato client.
- L'indicizzazione degli oggetti ricercabili e dei relativi attributi deve essere prima configurata come spiegato in Configurazione degli indici di query nella Guida dell'amministratore.
- La cache dovrebbe avere alcuni dati relativi agli attributi configurati.
- Per i dettagli dell'API, fare riferimento a: Cache, EventDataFilter, Tipo di evento, eseguiReader, registratiCQ, unRegisterCQ, rimuoviDataModificationListener, CacheItem, CacheItemVersion, insert, CQEventArg, getEventType, read, getValore, Query continua, addDataModificationListener, getCampoConteggio, Lettore di cache.
- Per conoscere i prerequisiti standard richiesti per lavorare con all NCache Per le funzionalità lato client fare riferimento alla pagina specificata Prerequisiti dell'API lato client.
- L'indicizzazione degli oggetti ricercabili e dei relativi attributi deve essere prima configurata come spiegato in Configurazione degli indici di query nella Guida dell'amministratore.
- La cache dovrebbe avere alcuni dati relativi agli attributi configurati.
- Per i dettagli dell'API, fare riferimento a: Cache, EventDataFilter, Tipo di evento, eseguiReader, registratiCQ, unRegisterCQ, rimuoviDataModificationListener, CacheItem, insert, getEventType, getValore, read, getCampoConteggio, Query continua, addDataModificationListener.
- Per conoscere i prerequisiti standard richiesti per lavorare con all NCache Per le funzionalità lato client fare riferimento alla pagina specificata Prerequisiti dell'API lato client.
- L'indicizzazione degli oggetti ricercabili e dei relativi attributi deve essere prima configurata come spiegato in Configurazione degli indici di query nella Guida dell'amministratore.
- La cache dovrebbe avere alcuni dati relativi agli attributi configurati.
- Per i dettagli dell'API fare riferimento a: get_event_type, esegui_lettore, registro_cq,
un_register_cq, rimuovi_data_modification_listener, CacheItem, insert, get_field_count, add_data_modification_listener, Query continua, read, ottieni_valore, Tipo di evento, CQEventArg.
Passaggio 1: registra la richiamata per gli eventi
Supponendo di aver indicizzato gli attributi ricercabili richiesti, puoi implementare la query continua nella tua applicazione. Ciò richiede di definire tutti i callback che devono essere eseguiti una volta modificato il set di risultati della query. Quindi, dobbiamo registrare la query continua con il server cache.
È possibile registrare una richiamata per più eventi.
public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
Console.WriteLine($"Item with key '{key}' has been added to resut set of continuous query");
break;
case EventType.ItemUpdated:
Console.WriteLine($"Item with key '{key}' has been updated in the resut set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
Console.WriteLine($"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'");
}
break;
case EventType.ItemRemoved:
Console.WriteLine($"Item with key '{key}' has been removed from resut set of continuous query");
break;
}
}
public static void onChangeInQueryResultSet(String key, CQEventArg arg) {
switch (arg.getEventType()) {
case ItemAdded:
System.out.println("Item with key '" + key + "' has been added to result set of continuous query");
break;
case ItemUpdated:
System.out.println("Item with key '" + key + "' has been updated in the result set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.getItem() != null) {
Product updatedProduct = (Product) arg.getItem().getValue(Product.class);
System.out.println("Updated product '" + updatedProduct.getProductName() + "' with key '" + key + "' has ID '" + updatedProduct.getProductID() + "'");
}
break;
case ItemRemoved:
System.out.println("Item with key '" + key + "' has been removed from result set of the continuous query");
break;
}
}
// This is an async method
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
// Key has been added to the cache
break;
case ncache.ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
// Perform operations accordingly
}
break;
case ncache.ItemRemoved:
// key has been removed from the cache
break;
}
}
# Precondition: Cache is already connected
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
# Key has been added to the cache
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Key has been updated in the cache
# Get updated product object
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
# Perform operations accordingly
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
# key has been removed from the cache
print(key + " removed from cache")
Note:
Per garantire che l'operazione sia a prova di errore, si consiglia di gestire eventuali potenziali eccezioni all'interno dell'applicazione, come spiegato in Gestione dei guasti.
Passaggio 2: registra query e notifiche
Dopo aver registrato i callback, creare una query continua, che specifica i criteri per il set di risultati di cui verranno attivati gli eventi. Questa query verrà registrata sul server.
Una volta creata la query continua, le richiamate predefinite vengono registrate con la query. Le richiamate vengono registrate in base a EventType
ed EventDataFilter
.
La query continua può ora essere registrata sul server utilizzando RegisterCQ
. Puoi utilizzare questo metodo più volte nella tua applicazione per ricevere notifiche per una modifica nel set di dati della tua query.
Eventuali modifiche alle notifiche degli eventi della cache verranno attivate in base al tipo di evento. Per eseguire query sui dati memorizzati nella cache, ExecuteReader
esegue la query e il set di risultati generato viene quindi letto sul lato client, pezzo per pezzo.
È possibile attivare eventi modificando i dati della cache in modo che influenzino il set di risultati. L'esempio di codice aggiorna un elemento della cache esistente in modo che venga aggiunto al set di risultati della query, attivando così un ItemAdded
evento.
avvertimento
Se la connessione tra un server e un client si interrompe, tutti gli eventi attivati entro questo periodo non verranno ricevuti dal client.
// Query for required operation
string query = "SELECT $VALUE$ FROM Alachisoft.NCache.Samples.Data.Product WHERE Category = ?";
// Create query command and add parameters
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Item add, update, remove notification
// EventDataFilter.DataWithMetadata returns the cache keys added
cQuery.RegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
// Query for the desired operation
String query = "SELECT $VALUE$ FROM CQ.Product WHERE productId = ?";
// Create query command
QueryCommand queryCommand = new QueryCommand(query);
// Set parameters
HashMap<String, Object> parameters = queryCommand.getParameters();
parameters.put("productId", "Beverages");
// Create Continuous Query
ContinuousQuery cQuery = new ContinuousQuery(queryCommand);
// Register notifications for continuous query (ItemAdded, ItemUpdated, and ItemRemoved events)
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved), EventDataFilter.DataWithMetadata);
// Register the continuous query on the server
cache.getMessagingService().registerCQ(cQuery);
System.out.println("Registered Successfully");
// Precondition: Cache is already connected
// Query for required operation
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
// Register continuous query on server
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
// Trigger add notifications
let version = await this.cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
# Query for required operation
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
# Create continuous query
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
# Item remove notification
# EventDataFilter.Metadata returns cache keys + item metadata on updation
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
# Register continuous query on server
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
# Perform operations
else:
# None query result set returned
print("Query result is None")
# Update Product data in cache to trigger callback
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
# Trigger add notifications
version = cache.insert(key, cache_item)
# This will add item to the result set as it matches query criteria
Passaggio 3: annullare la registrazione delle notifiche dalla query continua
È possibile annullare la registrazione delle notifiche da Continuous Query quando non sono più necessarie nell'applicazione. È possibile annullare la registrazione delle notifiche per un tipo di evento specifico se sono stati registrati più tipi di eventi utilizzando UnRegisterNotification
metodo.
Per esempio, se ItemAdded
ed ItemRemoved
i tipi di evento sono stati registrati ma la tua logica aziendale non richiede più eventi per ItemAdded
, puoi specificamente annullare la registrazione delle notifiche per ItemAdded
eventi.
// Unregister notifications for ItemAdded events from continuous query
cQuery.UnRegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved));
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
Passaggio 4: annullare la registrazione della query continua dal server
Una volta che l'applicazione non è più interessata a ricevere notifiche per le modifiche nel set di risultati di una query, è necessario annullare la registrazione della query continua registrata dal server.
UnregisterCQ
prende come argomento un oggetto di ContinuousQuery
per annullare la registrazione delle richiamate che non vengono più attivate dopo questa chiamata.
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
// Unregister cq from server
cache.getMessagingService().unRegisterCQ(cQuery);
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)
Risorse addizionali
NCache fornisce un'applicazione di esempio per le query continue su GitHub.
Vedere anche
.NETTO: Alachisoft.NCache.Eventi.di.runtime spazio dei nomi.
Giava: com.alachisoft.ncache.eventi spazio dei nomi.
Node.js: Query continua classe.
Pitone: ncache.runtime.caching.events classe.