Todos os dias, geramos e consumimos uma quantidade cada vez maior de dados. Para as empresas, a velocidade do processamento de dados afeta as decisões que tomam. À medida que cresce a demanda por processamento eficiente de dados, nosso software precisa acompanhar essa demanda. Longas pesquisas não são simplesmente uma alternativa. Vejamos um NCache recurso, Consulta Contínua, projetado para suportar processamento de dados em tempo real.
Consulta Contínua
Com consulta contínua, NCache fornece um mecanismo para monitorar um conjunto de dados observáveis dentro de uma janela de tempo específica. Por aqui, NCache nos notifica sobre todas as alterações que ocorrem nesse conjunto de dados enquanto ele está armazenado em cache. A Consulta Contínua funciona como um mecanismo para monitorar mudanças, não para alterar os dados da aplicação.
Graças à sua arquitetura distribuída, NCache oferece escalabilidade, alta disponibilidade e eficiência de armazenamento. Com sua arquitetura de cluster ponto a ponto com autocorreção, NCache lida com grandes quantidades de dados recebidos para processamento de dados em tempo real.
Como implementar uma consulta contínua
Com consultas contínuas, NCache monitora o resultado de uma consulta. E, diferentemente dos bancos de dados relacionais, para pesquisar itens usando uma consulta semelhante a SQL, NCache requer índices. Caso contrário, seria necessário verificar todo o cache para encontrar os itens que desejamos monitorar. Isso fará NCache lento.
Uma maneira fácil de indexar as entradas que queremos pesquisar é adicionar atributos como ConsultaIndexável e ConsultaIndexada para nossas aulas e propriedades. Por exemplo, vamos monitorar a contagem de peças com falha criadas por um determinado modelo de máquina após algum trabalho de manutenção. Para pesquisar nossas medições em cache por modelo de máquina, vamos anotar nosso Measurement
classe assim:
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Acme.Monitoring.CacheItems; public class Measurement { [QueryIndexed] public string MachineModel { get; set; } public int LastFailedPartCount { get; set; } public DateTimeOffset At { get; set; } } |
Etapa 1: registrar consulta e notificações
A seguir, vamos instalar o Alachisoft.NCache.SDK
Pacote NuGet para definir uma consulta para monitorar e registrar uma notificação para alterações em nossas entradas de cache.
Por exemplo, dentro de um ASP.NET Core serviço hospedado ou qualquer outro processador em segundo plano, vamos escrever uma consulta e uma notificação para monitorar as medições de todas as máquinas com um determinado modelo. Algo assim:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 1. Define a continuous query var queryText = "SELECT $VALUE$ FROM Acme.Monitoring.CacheItems.Measurement WHERE MachineModel = ?"; // Inside our query text, we need to fully qualify our objects var queryCommand = new QueryCommand(queryText); queryCommand.Parameters.Add("MachineModel", "ACME-001"); var query = new ContinuousQuery(queryCommand); // 2. Register the notification query.RegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated, datafilter: EventDataFilter.DataWithMetadata); // 3. Register the continuous query ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.RegisterCQ(query); |
Observemos que escrevemos uma consulta contínua usando um texto de consulta, comando de consulta e parâmetro semelhantes a SQL. Precisávamos qualificar totalmente o nome do nosso objeto dentro do texto da consulta.
No texto da consulta, usamos o $VALUE$
projeção para recuperar o objeto real armazenado dentro do cache, não apenas uma de suas propriedades. NCache suporta outros projeções para recuperar tags, grupos e o resultado de funções de projeção como SUM
, MIN
e MAX
.
Em seguida, registramos uma notificação passando por um retorno de chamada, tipo de evento e filtro de dados. Observe que registramos o mesmo retorno de chamada para dois tipos de eventos. Com o parâmetro de tipo de evento, especificamos o evento que queremos monitorar. NCache suporta três tipos de eventos: ItemAdded
, ItemUpdated
e ItemRemoved
. E, com o parâmetro data filter, especificamos as informações que queremos dentro de nosso callback assim que um evento for disparado. NCache suporta três filtros de dados:
1. Nenhum: retorna apenas a chave da entrada adicionada, atualizada ou removida.
2. Metadados: retorna a chave afetada e os metadados, como nome do grupo, prioridade do item e versão do item.
3.DadosComMetadados: retorna a entrada do cache e os metadados associados.
No nosso exemplo, toda vez que adicionamos ou atualizamos uma medição para uma máquina com o modelo “ACME-001”, NCache chama o OnQueryResultSetChanged
método, passando a entrada real adicionada ou atualizada. Nós costumavamos ItemAdded
e EventType.ItemUpdated
como o tipo de evento e DataWithMetadata
como filtro de dados.
A DataWithMetadata
é útil para evitar a busca de itens por chave novamente dentro do retorno de chamada. Mas vamos usá-lo com cuidado porque é uma viagem de rede cara.
Etapa 2: registrar retornos de chamada para eventos
Um dos parâmetros que passamos ao registrar uma notificação é um ligue de volta. Esta é a ação que queremos realizar quando um item que satisfaça nossa consulta de “monitoramento” for afetado pelo tipo de evento que registramos.
Para continuar com nosso exemplo, este é um retorno de chamada para ouvir ItemAdded
e ItemUpdated
eventos:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public void OnQueryResultSetChanged(string key, CQEventArg arg) { switch (arg.EventType) { case EventType.ItemAdded: // A new measurement for an ACME-001 machine was added // Do something here... Console.WriteLine($"Item with key '{key}' has been added to result set of continuous query"); break; case EventType.ItemUpdated: // A measurement for an ACME-001 machine was changed Console.WriteLine($"Item with key '{key}' has been updated in the result set of the continuous query"); // Since we passed DataWithMetadata, we have access to the cache entry itself if (arg.Item != null) { var old = arg.OldItem.GetValue<Measurement>() var updated = arg.Item.GetValue<Measurement>(); // Do something here with old and updated // Send a notification, push a message into a topic... Console.WriteLine($"Updated product '{key}' has '{updatedMeasurement.FailPartCount}'"); } break; } } |
Com esse retorno de chamada implementado, podemos começar a ingerir medições de nossas máquinas em nosso cache. NCache monitorará os itens do cache e nos informará quando uma medição com o modelo “ACME-001” for adicionada ou atualizada. Então, podemos enviar um e-mail ou gerar um alerta se a contagem de falhas de qualquer máquina exceder um limite.
Etapa 3: cancelar registro de consultas e notificações
Monitorar consultas e notificar clientes tem alguns custos. Mas isso não afeta os clientes de cache, pois é executado de forma assíncrona. Para melhor utilizar as Consultas Contínuas, vamos cancelar registro de notificações e consultas quando não precisamos mais monitorar o resultado da consulta, como este.
1 2 3 4 5 6 |
query.UnRegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated); ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.UnRegisterCQ(query); |
Conclusão
É assim que se implementa a Consulta Contínua em aplicativos .NET. Com uma consulta contínua, monitoramos o resultado de uma consulta para adições, alterações e exclusões durante um período de tempo. Aqui, escrevemos um exemplo para observar a contagem de peças de máquinas com falha. Mas podemos usar a Consulta Contínua para gerenciamento de riscos, detecção de fraudes, análise de logs e outros cenários em tempo real.
Para saber mais sobre Consulta Contínua, verifique o Visão geral da consulta contínua guia.