Uso de consulta continua
Suponiendo que haya indexado los atributos de búsqueda requeridos, ahora debe implementar la consulta continua en su aplicación. Teniendo en cuenta el propósito de las consultas continuas, lo primero que debe hacer es definir todas las devoluciones de llamada que deben ejecutarse una vez que el conjunto de resultados de su consulta haya cambiado de alguna manera. Luego, debemos registrar la consulta continua en el servidor de caché.
Si todas sus aplicaciones no requieren el seguimiento de ningún conjunto de resultados de consultas, entonces no solo debe anular el registro de las notificaciones, sino también anular el registro de la consulta de su caché.
Requisitos previos
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, consulte la página proporcionada en Requisitos previos de la API del lado del cliente.
- La indexación de objetos que se pueden buscar y sus atributos deben configurarse primero como se explica en Configuración de índices de consulta en la Guía del administrador.
- El caché debe tener algunos datos relacionados con los atributos configurados.
- Para obtener detalles de la API, consulte: Dolor, Filtro de datos de evento, Tipo de evento, Ejecutar Lector, RegistrarseCQ, Cancelar registroCQ, Notificación de cancelación del registro, Consulta continua, RegistroNotificación
, QueryDataNotificationCallback, Lector de ICache, Recuento de campo, Leer, recuadro, CQEventArg, Getvalue.
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, consulte la página proporcionada en Requisitos previos de la API del lado del cliente.
- La indexación de objetos que se pueden buscar y sus atributos deben configurarse primero como se explica en Configuración de índices de consulta en la Guía del administrador.
- El caché debe tener algunos datos relacionados con los atributos configurados.
- Para obtener detalles de la API, consulte: cache, Filtro de datos de evento, Tipo de evento, ejecutarLector, registrarseCQ, cancelar el registro CQ, eliminarDataModificationListener, Artículo de caché, Versión de elemento de caché, insertar, CQEventArg, obtener tipo de evento, leer, getValue, Consulta continua, agregarDataModificationListener, getFieldCount, Lector de caché.
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, consulte la página proporcionada en Requisitos previos de la API del lado del cliente.
- La indexación de objetos que se pueden buscar y sus atributos deben configurarse primero como se explica en Configuración de índices de consulta en la Guía del administrador.
- El caché debe tener algunos datos relacionados con los atributos configurados.
- Para obtener detalles de la API, consulte: cache, Filtro de datos de evento, Tipo de evento, ejecutarLector, registrarseCQ, cancelar el registro CQ, eliminarDataModificationListener, Artículo de caché, insertar, obtener tipo de evento, getValue, leer, getFieldCount, Consulta continua, agregarDataModificationListener.
- Para obtener información sobre los requisitos previos estándar necesarios para trabajar con todos NCache características del lado del cliente, consulte la página proporcionada en Requisitos previos de la API del lado del cliente.
- La indexación de objetos que se pueden buscar y sus atributos deben configurarse primero como se explica en Configuración de índices de consulta en la Guía del administrador.
- El caché debe tener algunos datos relacionados con los atributos configurados.
- Para obtener detalles de la API, consulte: get_event_type, ejecutar_lector, registro_cq,
un_register_cq, remove_data_modification_listener, Artículo de caché, insertar, get_field_count, add_data_modification_listener, Consulta continua, leer, obtener_valor, Tipo de evento, CQEventArg.
Paso 1: Registre la devolución de llamada para eventos
Suponiendo que haya indexado los atributos de búsqueda requeridos, puede implementar Consulta continua en su aplicación. Esto requiere que defina todas las devoluciones de llamada que deben ejecutarse una vez que se modifica el conjunto de resultados de su consulta. Luego, debemos registrar la consulta continua con el servidor de caché.
Se puede registrar una devolución de llamada para múltiples eventos.
public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
Console.WriteLine($"Item with key '{key}' has been added to resut set of continuous query");
break;
case EventType.ItemUpdated:
Console.WriteLine($"Item with key '{key}' has been updated in the resut set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
Console.WriteLine($"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'");
}
break;
case EventType.ItemRemoved:
Console.WriteLine($"Item with key '{key}' has been removed from resut set of continuous query");
break;
}
}
public static void onChangeInQueryResultSet(String key, CQEventArg arg) {
switch (arg.getEventType()) {
case ItemAdded:
System.out.println("Item with key '" + key + "' has been added to result set of continuous query");
break;
case ItemUpdated:
System.out.println("Item with key '" + key + "' has been updated in the result set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.getItem() != null) {
Product updatedProduct = (Product) arg.getItem().getValue(Product.class);
System.out.println("Updated product '" + updatedProduct.getProductName() + "' with key '" + key + "' has ID '" + updatedProduct.getProductID() + "'");
}
break;
case ItemRemoved:
System.out.println("Item with key '" + key + "' has been removed from result set of the continuous query");
break;
}
}
// This is an async method
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
// Key has been added to the cache
break;
case ncache.ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
// Perform operations accordingly
}
break;
case ncache.ItemRemoved:
// key has been removed from the cache
break;
}
}
# Precondition: Cache is already connected
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
# Key has been added to the cache
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Key has been updated in the cache
# Get updated product object
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
# Perform operations accordingly
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
# key has been removed from the cache
print(key + " removed from cache")
Note
Para garantizar que la operación sea a prueba de fallas, se recomienda manejar cualquier posible excepción dentro de su aplicación, como se explica en Manejo de fallas.
Paso 2: Registro de consultas y notificaciones
Después de registrar las devoluciones de llamada, cree una Consulta continua, que especifica los criterios para el conjunto de resultados del cual se activarán los eventos. Esta consulta se registrará en el servidor.
Una vez que se ha creado la consulta continua, las devoluciones de llamada predefinidas se registran con la consulta. Las devoluciones de llamada se registran de acuerdo con EventType
y EventDataFilter
.
La consulta continua ahora se puede registrar en el servidor usando RegisterCQ
. Puede utilizar este método varias veces en su aplicación para recibir notificaciones de un cambio en el conjunto de datos de su consulta.
Cualquier modificación en las notificaciones de eventos de caché se activará según el tipo de evento. Para consultar datos almacenados en caché, ExecuteReader
ejecuta la consulta y el conjunto de resultados generado se lee en el lado del cliente, fragmento por fragmento.
Puede desencadenar eventos modificando los datos de la memoria caché para que afecten al conjunto de resultados. El ejemplo de código actualiza un elemento de caché existente de modo que se agrega al conjunto de resultados de la consulta, activando así un ItemAdded
evento.
advertencia
Si la conexión se interrumpe entre un servidor y un cliente, el cliente no recibirá ningún evento activado dentro de esta duración.
// Query for required operation
string query = "SELECT $VALUE$ FROM Alachisoft.NCache.Samples.Data.Product WHERE Category = ?";
// Create query command and add parameters
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Item add, update, remove notification
// EventDataFilter.DataWithMetadata returns the cache keys added
cQuery.RegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
// Query for the desired operation
String query = "SELECT $VALUE$ FROM CQ.Product WHERE productId = ?";
// Create query command
QueryCommand queryCommand = new QueryCommand(query);
// Set parameters
HashMap<String, Object> parameters = queryCommand.getParameters();
parameters.put("productId", "Beverages");
// Create Continuous Query
ContinuousQuery cQuery = new ContinuousQuery(queryCommand);
// Register notifications for continuous query (ItemAdded, ItemUpdated, and ItemRemoved events)
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved), EventDataFilter.DataWithMetadata);
// Register the continuous query on the server
cache.getMessagingService().registerCQ(cQuery);
System.out.println("Registered Successfully");
// Precondition: Cache is already connected
// Query for required operation
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
// Register continuous query on server
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
// Trigger add notifications
let version = await this.cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
# Query for required operation
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
# Create continuous query
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
# Item remove notification
# EventDataFilter.Metadata returns cache keys + item metadata on updation
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
# Register continuous query on server
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
# Perform operations
else:
# None query result set returned
print("Query result is None")
# Update Product data in cache to trigger callback
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
# Trigger add notifications
version = cache.insert(key, cache_item)
# This will add item to the result set as it matches query criteria
Paso 3: anular el registro de notificaciones de consulta continua
Las notificaciones se pueden dar de baja de Consulta Continua cuando ya no sean necesarias en la aplicación. Puede cancelar el registro de notificaciones para un tipo de evento específico si se han registrado varios tipos de eventos usando el UnRegisterNotification
método.
Por ejemplo, si ItemAdded
y ItemRemoved
se registraron tipos de eventos pero su lógica comercial ya no requiere eventos para ItemAdded
, puede anular específicamente el registro de notificaciones para ItemAdded
eventos.
// Unregister notifications for ItemAdded events from continuous query
cQuery.UnRegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved));
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
Paso 4: anular el registro de la consulta continua del servidor
Una vez que la aplicación ya no esté interesada en recibir notificaciones de cambios en un conjunto de resultados de consulta, la consulta continua registrada debe cancelarse del registro del servidor.
UnregisterCQ
toma como argumento un objeto de ContinuousQuery
para cancelar el registro de las devoluciones de llamada que ya no se activan después de esta llamada.
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
// Unregister cq from server
cache.getMessagingService().unRegisterCQ(cQuery);
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)
Recursos adicionales
NCache proporciona una aplicación de muestra para consultas continuas en GitHub.
Vea también
.NETO: Alachisoft.NCache.Eventos.en.tiempo.de.ejecución espacio de nombres
Java: com.alachisoft.ncache.eventos espacio de nombres
Nodo.js: Consulta continua clase.
Pitón: ncache.eventos.de.almacenamiento.en.caché en tiempo de ejecución clase.