Man mano che i dati diventano più critici per le aziende, la necessità di elaborazione dei dati in tempo reale (in particolare quando i dati arrivano in blocchi o flussi) e l'analisi sta aumentando a un ritmo senza precedenti. Le applicazioni che dipendono dall'elaborazione dei dati in tempo reale richiedono un metodo affidabile per ricevere notifiche in tempo reale quando si verificano modifiche ai dati senza causare sovraccarichi di prestazioni derivanti dal polling manuale, in particolare quando si lavora con set di dati di grandi dimensioni. Per affrontare questi problemi, NCache fornisce il suo Interrogazione continua caratteristica - dove è possibile creare query che filtrano i dati secondo i criteri definiti.
Senza il fastidio del polling manuale, CQ consente agli sviluppatori di tenere traccia delle modifiche ai dati in tempo reale e ricevere avvisi. Ad esempio, in un'applicazione di trading azionario in tempo reale, i trader necessitano di aggiornamenti costanti sulle variazioni dei prezzi delle azioni. Senza un adeguato sistema di monitoraggio in tempo reale, dovrebbero controllare manualmente i prezzi delle azioni, con conseguente perdita di tempo e opportunità mancate. I trader possono quindi creare query che filtrano i prezzi delle azioni per ricevere aggiornamenti in tempo reale ogni volta che i dati filtrati cambiano. Ciò offre loro il vantaggio di interrogare la cache solo per il set di dati di loro scelta (utilizzando Query OQL simili a SQL) all'interno del cluster di cache distribuita anziché esaurire la cache. Questo blog ti guiderà attraverso l'utilizzo di Continuous Query per monitorare le modifiche ai dati in NCache usando Python.
Monitoraggio delle modifiche tramite query continue in NCache
Con NCacheCon la funzione Continuous Query di , puoi definire un set di dati specifico nella rete della cache distribuita e tenerne traccia delle modifiche utilizzando le query OQL. Quando i cambiamenti si verificano all'interno del set di dati (a causa di operazioni di scrittura come operazioni di aggiunta, aggiornamento o rimozione), le applicazioni registrate vengono aggiornate tramite Eventi a livello di cache, che impedisce la sovrapposizione delle applicazioni filtrando i dati utilizzando query OQL. Fornendo un metodo per il monitoraggio e la condivisione dei dati tra le applicazioni tramite eventi, CQ consente agli sviluppatori di specificare la loro logica di business.
Quando si utilizza CQ in Python, gli utenti possono sfruttare il NCache Libreria client Python per registrarsi agli eventi CQ. È inoltre possibile specificare la quantità di dati restituiti all'esecuzione dell'evento, ad es. Nessuna, Metadati, o Dati con metadati. Tuttavia, è essenziale comprendere che la query continua funge solo da meccanismo per monitorare le modifiche anziché alterare i dati dell'applicazione.
Configurazione della query continua in NCache
I passaggi indicati di seguito illustrano il processo di registrazione e annullamento della registrazione di richiamate, query e notifiche per ricevere notifiche relative al set di dati definito dall'utente.
Passaggio 1: registra la richiamata per gli eventi
Innanzitutto, è necessario registrare un callback per gli eventi a livello di cache, può essere Aggiungi, Aggiorna, Rimuovi o una combinazione, in modo che questi callback vengano eseguiti ogni volta che la query viene modificata. Il codice di esempio seguente mostra come registrare una richiamata per un evento ITEM_ADDED.
1 2 3 4 |
def query_item_callback(key: str, arg: ncache.CQEventArg): if arg.get_event_type() is ncache.EventType.ITEM_ADDED: # Key has been added to the cache print(key + " added to cache") |
Puoi anche registrare eventi di richiamata per elemento aggiornato ed eventi rimossi.
- NCache Dettagli
- Registrazione della richiamata per gli eventi
- Registrazione della richiamata per gli eventi
Passaggio 2: registra query e notifiche
I processi chiave per tenere traccia delle modifiche ai dati NCache stanno registrando i callback e creando una query continua che specifica i criteri del set di risultati. Dopo la creazione della query, i callback predefiniti vengono registrati in base a Tipo di evento ed EventDataFilter. Successivamente, il esegui_lettore viene utilizzato per interrogare i dati memorizzati nella cache dopo che il CQ è stato registrato sul server utilizzando registro_cq. Per attivare gli eventi, i dati della cache possono essere modificati per influire sul set di risultati. Il codice di esempio riportato di seguito illustra questo scenario.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# Precondition: Cache is already connected # Query for required operation query = "SELECT $Value$ FROM FQN.Product WHERE category = ?" query_command = ncache.QueryCommand(query) query_command.set_parameters({"Category": "Beverages"}) # Create continuous query continuous_query = ncache.ContinuousQuery(query_command) event_type = [ncache.EventType.ITEM_REMOVED] # Item remove notification # EventDataFilter.Metadata returns cache keys + item metadata on updation continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE) # Register continuous query on server cache.get_messaging_service().register_cq(continuous_query) reader = cache.get_search_service().execute_reader(query_command) if reader.get_field_count() > 0: while reader.read(): result = reader.get_value(Product, 1) # Perform operations else: # None query result set returned print("Query result is None") # Update Product data in cache to trigger callback updated_product = Product() updated_product.set_product_id(1001) updated_product.set_product_name("Tea") key = "Product:" + updated_product.get_product_id() cache_item = ncache.CacheItem(updated_product) # Trigger add notifications version = cache.insert(key, cache_item) # This will add item to the result set as it matches query criteria |
Passaggio 3: annullare la registrazione delle notifiche dalla query continua
Per interrompere la ricezione di notifiche da Continuous Query nella tua applicazione, puoi scegliere di annullarne la registrazione utilizzando il file rimuovi_data_modification_listener metodo. Tuttavia, questo è specifico per i tipi di evento come Aggiungi e Rimuovi. Puoi persino annullare la registrazione delle notifiche di elementi aggiunti se, in precedenza, ti sei registrato per entrambi gli eventi di elementi aggiunti e rimossi, ma ora richiedi notifiche solo per l'elemento rimosso.
1 2 3 |
# Unregister notifications for ItemAdded events only event_type = [ncache.EventType.ITEM_ADDED] c_query.remove_data_modification_listener(cq_event_listener, event_type) |
Passaggio 4: annullare la registrazione della query continua dal server
Per impedire l'utilizzo delle risorse quando questa query non è più necessaria, è necessario annullare la registrazione della query continua dal server. In questo modo, non riceverai più notifiche per le modifiche nel set di risultati della query. L'esempio di codice seguente mostra come annullare la registrazione di CQ dal server di cache utilizzando il un_register_cq metodo.
1 2 |
# Unregister cq from server cache.get_messaging_service().un_register_cq(c_query) |
Conclusione
Con NCacheCon la funzione Continuous Query di , puoi monitorare le modifiche in un set di dati specifico all'interno di un cluster di cache distribuita. Garantisce che solo i dati necessari vengano recuperati dall'applicazione, migliorando l'efficienza e riducendo al minimo l'elaborazione di dati estranei tramite query OQL ed eventi a livello di cache. Un modo così semplice e flessibile per monitorare i cambiamenti? Inizia oggi la tua prova gratuita!