A medida que los datos se vuelven más críticos para las empresas, la necesidad de procesamiento de datos en tiempo real (particularmente cuando los datos llegan en fragmentos o flujos) y el análisis aumenta a un ritmo sin precedentes. Las aplicaciones que dependen del procesamiento de datos en tiempo real requieren un método confiable para recibir notificaciones en tiempo real cuando se producen cambios en los datos sin causar sobrecargas de rendimiento que resultan del sondeo manual, específicamente cuando se trabaja con grandes conjuntos de datos. Para abordar estos problemas, NCache proporciona su Consulta continua característica: donde puede crear consultas que filtran datos según sus criterios definidos.
Sin la molestia del sondeo manual, CQ permite a los desarrolladores realizar un seguimiento de los cambios de datos en tiempo real y recibir alertas. Por ejemplo, en una aplicación de comercio de acciones en tiempo real, los comerciantes necesitan actualizaciones constantes sobre los cambios en los precios de las acciones. Sin un sistema de monitoreo en tiempo real adecuado, tendrían que verificar manualmente los precios de las acciones, lo que resultaría en pérdida de tiempo y oportunidades. Los comerciantes pueden, por lo tanto, crear consultas que filtren los precios de las acciones para recibir actualizaciones en tiempo real cada vez que cambien los datos filtrados. Esto les da la ventaja de consultar el caché solo para el conjunto de datos de su elección (mediante el uso de Consultas OQL similares a SQL) dentro del clúster de caché distribuida en lugar de agotar la caché. Este blog lo guiará a través del uso de Consulta continua para monitorear los cambios de datos en NCache usando Python.
Seguimiento de Cambios a través de Consulta Continua en NCache
Con NCacheCon la función Consulta continua, puede definir un conjunto de datos específico en la red de caché distribuida y realizar un seguimiento de los cambios mediante consultas OQL. Cuando cambios ocurren dentro del conjunto de datos (debido a operaciones de escritura como operaciones de agregar, actualizar o eliminar), las aplicaciones registradas se actualizan a través de Eventos de nivel de caché, que evita la superposición de aplicaciones mediante el filtrado de datos mediante consultas OQL. Al proporcionar un método para rastrear y compartir datos entre aplicaciones a través de eventos, CQ permite a los desarrolladores especificar su lógica comercial.
Al usar CQ en Python, los usuarios pueden aprovechar la NCache Biblioteca de cliente de Python para registrarse en eventos de CQ. También puede especificar la cantidad de datos devueltos tras la ejecución del evento, es decir, Ninguna, metadatoso DatosConMetadatos. Sin embargo, es esencial comprender que la Consulta continua solo actúa como un mecanismo para monitorear los cambios en lugar de alterar los datos de la aplicación.
Configuración de consulta continua en NCache
Los pasos que se mencionan a continuación explican el proceso de registro y cancelación del registro de devoluciones de llamada, consultas y notificaciones para recibir notificaciones sobre el conjunto de datos definido por usted.
Paso 1: Registre la devolución de llamada para eventos
Primero, debe registrar una devolución de llamada para eventos de nivel de caché, puede ser Agregar, Actualizar, Eliminar o una combinación, para que estas devoluciones de llamada se ejecuten siempre que se modifique la consulta. El siguiente código de ejemplo muestra cómo registrar una devolución de llamada para un evento ITEM_ADDED.
1 2 3 4 |
def query_item_callback(key: str, arg: ncache.CQEventArg): if arg.get_event_type() is ncache.EventType.ITEM_ADDED: # Key has been added to the cache print(key + " added to cache") |
También puede registrar eventos de devolución de llamada para elementos actualizados y eventos eliminados.
- NCache Detalles
- Registro de devolución de llamada para eventos
- Registro de devolución de llamada para eventos
Paso 2: Registro de consultas y notificaciones
Los procesos clave para rastrear los cambios de datos en NCache están registrando devoluciones de llamada y creando una consulta continua que especifica los criterios del conjunto de resultados. Después de la creación de la consulta, las devoluciones de llamada predefinidas se registran en función de Tipo de evento y Filtro de datos de evento. A continuación, el ejecutar_lector se utiliza para consultar los datos almacenados en caché después de que el CQ se haya registrado en el servidor mediante registro_cq. Para desencadenar eventos, los datos de caché se pueden modificar para afectar el conjunto de resultados. El código de ejemplo siguiente muestra este escenario.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# Precondition: Cache is already connected # Query for required operation query = "SELECT $Value$ FROM FQN.Product WHERE category = ?" query_command = ncache.QueryCommand(query) query_command.set_parameters({"Category": "Beverages"}) # Create continuous query continuous_query = ncache.ContinuousQuery(query_command) event_type = [ncache.EventType.ITEM_REMOVED] # Item remove notification # EventDataFilter.Metadata returns cache keys + item metadata on updation continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE) # Register continuous query on server cache.get_messaging_service().register_cq(continuous_query) reader = cache.get_search_service().execute_reader(query_command) if reader.get_field_count() > 0: while reader.read(): result = reader.get_value(Product, 1) # Perform operations else: # None query result set returned print("Query result is None") # Update Product data in cache to trigger callback updated_product = Product() updated_product.set_product_id(1001) updated_product.set_product_name("Tea") key = "Product:" + updated_product.get_product_id() cache_item = ncache.CacheItem(updated_product) # Trigger add notifications version = cache.insert(key, cache_item) # This will add item to the result set as it matches query criteria |
Paso 3: anular el registro de notificaciones de consulta continua
Para dejar de recibir notificaciones de Consulta continua en su aplicación, tiene la opción de cancelar el registro mediante el remove_data_modification_listener método. Sin embargo, esto es específico para los tipos de eventos como Agregar y Eliminar. Incluso puede anular el registro de las notificaciones de elementos agregados si, anteriormente se registró para los eventos de elementos agregados y eliminados, pero ahora solo requiere notificaciones para el elemento eliminado.
1 2 3 |
# Unregister notifications for ItemAdded events only event_type = [ncache.EventType.ITEM_ADDED] c_query.remove_data_modification_listener(cq_event_listener, event_type) |
Paso 4: anular el registro de la consulta continua del servidor
Para evitar la utilización de recursos cuando esta consulta ya no sea necesaria, debe anular el registro de Consulta continua del servidor. Al hacerlo, ya no recibirá notificaciones de cambios en el conjunto de resultados de la consulta. El siguiente ejemplo de código muestra cómo anular el registro de CQ del servidor de caché utilizando el un_register_cq método.
1 2 |
# Unregister cq from server cache.get_messaging_service().un_register_cq(c_query) |
Conclusión
Con NCacheCon la función Consulta continua, puede monitorear los cambios en un conjunto de datos específico dentro de un clúster de caché distribuida. Garantiza que la aplicación solo recupere los datos necesarios, lo que mejora la eficiencia y minimiza el procesamiento de datos extraños a través de consultas OQL y eventos de nivel de caché. ¿Una manera tan fácil y flexible de monitorear los cambios? ¡Comience su prueba gratis hoy!