Cada día generamos y consumimos una cantidad cada vez mayor de datos. Para las empresas, la velocidad del procesamiento de datos afecta las decisiones que toman. A medida que crece la demanda de un procesamiento de datos eficiente, nuestro software debe mantenerse al día con esa demanda. Las encuestas largas no son simplemente una alternativa. Veamos uno NCache característica, Consulta continua, diseñada para admitir el procesamiento de datos en tiempo real.
Consulta continua
Con consulta continua, NCache proporciona un mecanismo para monitorear un conjunto de datos observables dentro de una ventana de tiempo específica. Por aquí, NCache nos notifica sobre todos los cambios que ocurren en ese conjunto de datos mientras está almacenado en caché. La consulta continua funciona como un mecanismo para monitorear los cambios, no para alterar los datos de la aplicación.
Gracias a su arquitectura distribuida, NCache Ofrece escalabilidad, alta disponibilidad y eficiencia de almacenamiento. Con su arquitectura de clustering peer-to-peer autorreparable, NCache maneja grandes cantidades de datos entrantes para el procesamiento de datos en tiempo real.
Cómo implementar una consulta continua
Con consultas continuas, NCache monitorea el resultado de una consulta. Y, a diferencia de las bases de datos relacionales, para buscar elementos mediante una consulta tipo SQL, NCache requiere índices. De lo contrario, tendría que escanear todo el caché para encontrar los elementos que queremos monitorear. Esto lo hara NCache lento.
Una forma sencilla de indexar las entradas que queremos buscar es agregando atributos como consulta indexable y consulta indexada a nuestras clases y propiedades. Por ejemplo, monitoreemos el recuento de piezas fallidas creadas por un modelo de máquina determinado después de algunos trabajos de mantenimiento. Para buscar nuestras mediciones almacenadas en caché por modelo de máquina, anotemos nuestro Measurement
clase como esta:
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Acme.Monitoring.CacheItems; public class Measurement { [QueryIndexed] public string MachineModel { get; set; } public int LastFailedPartCount { get; set; } public DateTimeOffset At { get; set; } } |
Paso 1: Registro de consultas y notificaciones
A continuación, instalemos el Alachisoft.NCache.SDK
Paquete NuGet para definir una consulta para monitorear y registrar una notificación de cambios en nuestras entradas de caché.
Por ejemplo, dentro de un ASP.NET Core servicio alojado o cualquier otro procesador en segundo plano, escribamos una consulta y una notificación para monitorear las medidas de todas las máquinas con un modelo determinado. Algo como esto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 1. Define a continuous query var queryText = "SELECT $VALUE$ FROM Acme.Monitoring.CacheItems.Measurement WHERE MachineModel = ?"; // Inside our query text, we need to fully qualify our objects var queryCommand = new QueryCommand(queryText); queryCommand.Parameters.Add("MachineModel", "ACME-001"); var query = new ContinuousQuery(queryCommand); // 2. Register the notification query.RegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated, datafilter: EventDataFilter.DataWithMetadata); // 3. Register the continuous query ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.RegisterCQ(query); |
Observemos que escribimos una consulta continua utilizando un texto de consulta, un comando de consulta y un parámetro similares a SQL. Necesitábamos calificar completamente el nombre de nuestro objeto dentro del texto de la consulta.
En el texto de consulta, utilizamos el $VALUE$
proyección para recuperar el objeto real almacenado dentro del caché, no solo una de sus propiedades. NCache apoya a otros proyecciones para recuperar etiquetas, grupos y el resultado de funciones de proyección como SUM
, MIN
y MAX
.
Luego, registramos una notificación pasando una devolución de llamada, un tipo de evento y un filtro de datos. Observe que registramos la misma devolución de llamada para dos tipos de eventos. Con el parámetro de tipo de evento, especificamos el evento que queremos monitorear. NCache admite tres tipos de eventos: ItemAdded
, ItemUpdated
y ItemRemoved
. Y, con el parámetro de filtro de datos, especificamos la información que queremos dentro de nuestra devolución de llamada una vez que se activa un evento. NCache apoya a tres filtros de datos:
1. Ninguno: Solo devuelve la clave de la entrada agregada, actualizada o eliminada.
2. Metadatos: Devuelve la clave afectada y los metadatos, como el nombre del grupo, la prioridad del artículo y la versión del artículo.
3.DatosConMetadatos: Devuelve la entrada de la caché y los metadatos asociados.
En nuestro ejemplo, cada vez que agregamos o actualizamos una medida para una máquina con el modelo “ACME-001”, NCache llama al OnQueryResultSetChanged
método, pasando la entrada real agregada o actualizada. Nosotros usamos ItemAdded
y EventType.ItemUpdated
como el tipo de evento y DataWithMetadata
como filtro de datos.
El DataWithMetadata
Es útil para evitar recuperar elementos por clave nuevamente dentro de la devolución de llamada. Pero usémoslo con cuidado porque es un viaje de red costoso.
Paso 2: registrar devoluciones de llamadas para eventos
Uno de los parámetros que pasamos al registrar una notificación es un llamar de vuelta. Esta es la acción que queremos realizar una vez que un elemento que satisface nuestra consulta de "monitoreo" se vea afectado por el tipo de evento que registramos.
Para continuar con nuestro ejemplo, esta es una devolución de llamada para escuchar ItemAdded
y ItemUpdated
eventos:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public void OnQueryResultSetChanged(string key, CQEventArg arg) { switch (arg.EventType) { case EventType.ItemAdded: // A new measurement for an ACME-001 machine was added // Do something here... Console.WriteLine($"Item with key '{key}' has been added to result set of continuous query"); break; case EventType.ItemUpdated: // A measurement for an ACME-001 machine was changed Console.WriteLine($"Item with key '{key}' has been updated in the result set of the continuous query"); // Since we passed DataWithMetadata, we have access to the cache entry itself if (arg.Item != null) { var old = arg.OldItem.GetValue<Measurement>() var updated = arg.Item.GetValue<Measurement>(); // Do something here with old and updated // Send a notification, push a message into a topic... Console.WriteLine($"Updated product '{key}' has '{updatedMeasurement.FailPartCount}'"); } break; } } |
Con esta devolución de llamada implementada, podemos comenzar a incorporar mediciones de nuestras máquinas a nuestra memoria caché. NCache monitoreará los elementos de la caché y nos informará cuando se agregue o actualice una medición con el modelo “ACME-001”. Luego, podemos enviar un correo electrónico o generar una alerta si el recuento de fallas de cualquier máquina excede un umbral.
Paso 3: Dar de baja consultas y notificaciones
Monitorear consultas y notificar a los clientes tiene algunos costos. Pero no afecta a los clientes de caché ya que se ejecuta de forma asincrónica. Para utilizar mejor las consultas continuas, notificaciones de baja y consultas cuando ya no necesitamos monitorear el resultado de nuestra consulta, como esta.
1 2 3 4 5 6 |
query.UnRegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated); ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.UnRegisterCQ(query); |
Conclusión
Así es como se implementa la consulta continua dentro de las aplicaciones .NET. Con una consulta continua, monitoreamos el resultado de una consulta para detectar adiciones, cambios y eliminaciones durante un período de tiempo. Aquí, escribimos un ejemplo para observar el recuento de piezas fallidas de las máquinas. Pero podemos utilizar la consulta continua para la gestión de riesgos, la detección de fraudes, el análisis de registros y otros escenarios en tiempo real.
Para obtener más información sobre la consulta continua, consulte la Descripción general de consultas continuas guía.