Ogni giorno generiamo e consumiamo una quantità crescente di dati. Per le aziende, la velocità di elaborazione dei dati influisce sulle decisioni che prendono. Man mano che cresce la richiesta di un’elaborazione efficiente dei dati, il nostro software deve tenere il passo con tale richiesta. Il sondaggio lungo non è semplicemente un'alternativa. Diamo un'occhiata a uno NCache funzionalità, Continuous Query, progettata per supportare l'elaborazione dei dati in tempo reale.
Interrogazione continua
Con la query continua, NCache fornisce un meccanismo per monitorare un set di dati osservabili all'interno di una finestra temporale specifica. Da questa parte, NCache ci avvisa di tutte le modifiche che si verificano nel set di dati mentre è memorizzato nella cache. La query continua funziona come un meccanismo per monitorare le modifiche, non per alterare i dati dell'applicazione.
Grazie alla sua architettura distribuita, NCache offre scalabilità, disponibilità elevata ed efficienza di archiviazione. Grazie alla sua architettura di clustering peer-to-peer autoriparante, NCache gestisce grandi quantità di dati in entrata per l'elaborazione dei dati in tempo reale.
Come implementare una query continua
Con query continue, NCache monitora il risultato di una query. E, a differenza dei database relazionali, per cercare elementi utilizzando una query di tipo SQL, NCache richiede indici. Altrimenti, dovremmo scansionare l'intera cache per trovare gli elementi che vogliamo monitorare. Lo farà NCache lento.
Un modo semplice per indicizzare le voci che vogliamo cercare è aggiungere attributi come QueryIndicizzabile ed QueryIndicizzato alle nostre classi e proprietà. Ad esempio, monitoriamo il conteggio delle parti guaste create da un determinato modello di macchina dopo alcuni lavori di manutenzione. Per cercare le nostre misurazioni memorizzate nella cache per modello di macchina, annotiamo il nostro Measurement
classe così:
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Acme.Monitoring.CacheItems; public class Measurement { [QueryIndexed] public string MachineModel { get; set; } public int LastFailedPartCount { get; set; } public DateTimeOffset At { get; set; } } |
Passaggio 1: registra query e notifiche
Successivamente, installiamo il file Alachisoft.NCache.SDK
Pacchetto NuGet per definire una query per monitorare e registrare una notifica per le modifiche nelle voci della cache.
Ad esempio, all'interno di un ASP.NET Core servizio ospitato o qualsiasi altro processore in background, scriviamo una query e una notifica per monitorare le misurazioni di tutte le macchine con un determinato modello. Qualcosa come questo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 1. Define a continuous query var queryText = "SELECT $VALUE$ FROM Acme.Monitoring.CacheItems.Measurement WHERE MachineModel = ?"; // Inside our query text, we need to fully qualify our objects var queryCommand = new QueryCommand(queryText); queryCommand.Parameters.Add("MachineModel", "ACME-001"); var query = new ContinuousQuery(queryCommand); // 2. Register the notification query.RegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated, datafilter: EventDataFilter.DataWithMetadata); // 3. Register the continuous query ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.RegisterCQ(query); |
Notiamo che abbiamo scritto una query continua utilizzando un testo di query, un comando di query e un parametro di tipo SQL. Avevamo bisogno di qualificare completamente il nome del nostro oggetto all'interno del testo della query.
Nel testo della query abbiamo utilizzato il file $VALUE$
proiezione per recuperare l'oggetto reale memorizzato nella cache, non solo una delle sue proprietà. NCache ne supporta altri proiezioni per recuperare tag, gruppi e il risultato di funzioni di proiezione come SUM
, MIN
e MAX
.
Quindi, abbiamo registrato una notifica che passa una richiamata, un tipo di evento e un filtro dati. Nota che abbiamo registrato la stessa richiamata per due tipi di evento. Con il parametro del tipo di evento specifichiamo l'evento che vogliamo monitorare. NCache supporta tre tipi di eventi: ItemAdded
, ItemUpdated
e ItemRemoved
. E, con il parametro data filter, specifichiamo le informazioni che vogliamo all'interno della nostra callback una volta attivato un evento. NCache supporta tre filtri dati:
1. Nessuno: Restituisce solo la chiave della voce aggiunta, aggiornata o rimossa.
2. Metadati: restituisce la chiave interessata e i metadati, ad esempio il nome del gruppo, la priorità dell'elemento e la versione dell'elemento.
3.DatiConMetadati: Restituisce la voce della cache e i metadati associati.
Nel nostro esempio, ogni volta che aggiungiamo o aggiorniamo una misurazione per una macchina con il modello “ACME-001”, NCache chiama il OnQueryResultSetChanged
metodo, passando la voce effettiva aggiunta o aggiornata. Abbiamo usato ItemAdded
ed EventType.ItemUpdated
come tipo di evento e DataWithMetadata
come filtro dati.
I DataWithMetadata
è utile per evitare di recuperare nuovamente gli elementi tramite chiave all'interno della richiamata. Ma usiamolo con attenzione perché è un viaggio di rete costoso.
Passaggio 2: registrare le richiamate per gli eventi
Uno dei parametri che passiamo durante la registrazione di una notifica è a richiama. Questa è l'azione che vogliamo eseguire una volta che un elemento che soddisfa la nostra query di "monitoraggio" viene influenzato dal tipo di evento che abbiamo registrato.
Per continuare con il nostro esempio, questa è una richiamata da ascoltare ItemAdded
ed ItemUpdated
eventi:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public void OnQueryResultSetChanged(string key, CQEventArg arg) { switch (arg.EventType) { case EventType.ItemAdded: // A new measurement for an ACME-001 machine was added // Do something here... Console.WriteLine($"Item with key '{key}' has been added to result set of continuous query"); break; case EventType.ItemUpdated: // A measurement for an ACME-001 machine was changed Console.WriteLine($"Item with key '{key}' has been updated in the result set of the continuous query"); // Since we passed DataWithMetadata, we have access to the cache entry itself if (arg.Item != null) { var old = arg.OldItem.GetValue<Measurement>() var updated = arg.Item.GetValue<Measurement>(); // Do something here with old and updated // Send a notification, push a message into a topic... Console.WriteLine($"Updated product '{key}' has '{updatedMeasurement.FailPartCount}'"); } break; } } |
Con questa richiamata in atto, possiamo iniziare a importare le misurazioni dalle nostre macchine nella nostra cache. NCache monitorerà gli elementi della cache e ci informerà quando una misurazione con il modello "ACME-001" viene aggiunta o aggiornata. Quindi, possiamo inviare un'e-mail o generare un avviso se il conteggio dei guasti di una macchina supera una soglia.
Passaggio 3: annulla la registrazione di query e notifiche
Il monitoraggio delle query e la notifica ai clienti comportano alcuni costi. Ma non influisce sui client cache poiché viene eseguito in modo asincrono. Per utilizzare meglio le query continue, procediamo notifiche di annullamento della registrazione e query quando non abbiamo più bisogno di monitorare il risultato della query, in questo modo.
1 2 3 4 5 6 |
query.UnRegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated); ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.UnRegisterCQ(query); |
Conclusione
Ecco come implementare la Continuous Query all'interno delle applicazioni .NET. Con una query continua, monitoriamo il risultato di una query per eventuali aggiunte, modifiche ed eliminazioni durante un periodo di tempo. Qui abbiamo scritto un esempio per osservare il conteggio delle parti guasti delle macchine. Ma possiamo utilizzare Continuous Query per la gestione del rischio, il rilevamento delle frodi, l'analisi dei log e altri scenari in tempo reale.
Per ulteriori informazioni sulle query continue, consulta il Panoramica delle query continue guida.