Da Daten für Unternehmen immer wichtiger werden, steigt der Bedarf an Echtzeit-Datenverarbeitung (insbesondere wenn Daten in Blöcken oder Strömen ankommen) und Analysen in einem beispiellosen Tempo. Anwendungen, die auf Echtzeit-Datenverarbeitung angewiesen sind, benötigen eine zuverlässige Methode zum Empfangen von Echtzeit-Benachrichtigungen, wenn Datenänderungen auftreten, ohne dass es zu Leistungs-Overheads kommt, die durch manuelle Abfragen entstehen, insbesondere bei der Arbeit mit großen Datensätzen. Um diese Probleme anzugehen, NCache bietet seine Kontinuierliche Abfrage Funktion – wo Sie Abfragen erstellen können, die Daten nach Ihren definierten Kriterien filtern.
Ohne den Aufwand manueller Abfragen ermöglicht CQ Entwicklern, Datenänderungen in Echtzeit zu verfolgen und Benachrichtigungen zu erhalten. Beispielsweise benötigen Händler in einer Echtzeit-Aktienhandelsanwendung ständige Updates zu den sich ändernden Aktienkursen. Ohne ein geeignetes Echtzeit-Überwachungssystem müssten sie die Aktienkurse manuell überprüfen – was zu Zeitverlust und verpassten Chancen führt. Die Händler können daher Abfragen erstellen, die die Aktienkurse filtern, um Echtzeit-Updates zu erhalten, wenn sich die gefilterten Daten ändern. Dies bietet ihnen den Vorteil, den Cache nur für das Dataset ihrer Wahl abzufragen (durch Verwendung von SQL-ähnliche OQL-Abfragen) innerhalb des verteilten Cache-Clusters, anstatt den Cache zu erschöpfen. Dieser Blog führt Sie durch die Verwendung von Continuous Query zur Überwachung von Datenänderungen in NCache mit Python.
Überwachung von Änderungen durch kontinuierliche Abfrage in NCache
Mit der NCacheMit der Continuous Query-Funktion von können Sie einen bestimmten Datensatz im verteilten Cache-Netzwerk definieren und Änderungen daran mithilfe von OQL-Abfragen nachverfolgen. Wenn Änderungen innerhalb des Datensatzes auftreten (aufgrund von Schreibvorgängen wie Hinzufügen, Aktualisieren oder Entfernen), werden registrierte Anwendungen durch aktualisiert Ereignisse auf Cache-Ebene, wodurch Anwendungsüberschneidungen verhindert werden, indem Daten mithilfe von OQL-Abfragen herausgefiltert werden. Durch die Bereitstellung einer Methode zum Verfolgen und Teilen von Daten zwischen Anwendungen durch Ereignisse ermöglicht CQ Entwicklern, ihre Geschäftslogik zu spezifizieren.
Bei der Verwendung von CQ in Python können Benutzer die NCache Python-Clientbibliothek zum Registrieren für CQ-Ereignisse. Sie können auch die Datenmenge angeben, die bei der Ereignisausführung zurückgegeben wird, d. h. Andere, Metadaten, oder DatenMitMetadaten. Es ist jedoch wichtig zu verstehen, dass Continuous Query nur als Mechanismus zum Überwachen von Änderungen fungiert, anstatt Anwendungsdaten zu ändern.
Kontinuierliche Abfrage konfigurieren in NCache
Die unten aufgeführten Schritte erläutern den Vorgang zum Registrieren und Aufheben der Registrierung von Rückrufen, Abfragen und Benachrichtigungen, um Benachrichtigungen für den von Ihnen definierten Datensatz zu erhalten.
Schritt 1: Registrieren Sie den Rückruf für Ereignisse
Zuerst müssen Sie einen Callback für Cache-Level-Ereignisse registrieren, es kann entweder Add, Update, Remove oder eine Kombination davon sein, damit diese Callbacks immer dann ausgeführt werden, wenn die Abfrage geändert wird. Der folgende Beispielcode zeigt, wie ein Rückruf für ein ITEM_ADDED-Ereignis registriert wird.
1 2 3 4 |
def query_item_callback(key: str, arg: ncache.CQEventArg): if arg.get_event_type() is ncache.EventType.ITEM_ADDED: # Key has been added to the cache print(key + " added to cache") |
Sie können auch Callback-Ereignisse für registrieren Element aktualisierte und entfernte Ereignisse.
Schritt 2: Abfrage und Benachrichtigungen registrieren
Die wichtigsten Prozesse zum Nachverfolgen von Datenänderungen NCache registrieren Rückrufe und erstellen eine fortlaufende Abfrage, die die Kriterien für die Ergebnismenge angibt. Nach der Erstellung der Abfrage werden die vordefinierten Rückrufe basierend auf registriert Ereignistyp machen Ereignisdatenfilter. Als nächstes die ausführen_reader wird verwendet, um zwischengespeicherte Daten abzufragen, nachdem der CQ auf dem Server registriert wurde register_cq. Um Ereignisse auszulösen, können Cache-Daten geändert werden, um die Ergebnismenge zu beeinflussen. Der folgende Beispielcode veranschaulicht dieses Szenario.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# Precondition: Cache is already connected # Query for required operation query = "SELECT $Value$ FROM FQN.Product WHERE category = ?" query_command = ncache.QueryCommand(query) query_command.set_parameters({"Category": "Beverages"}) # Create continuous query continuous_query = ncache.ContinuousQuery(query_command) event_type = [ncache.EventType.ITEM_REMOVED] # Item remove notification # EventDataFilter.Metadata returns cache keys + item metadata on updation continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE) # Register continuous query on server cache.get_messaging_service().register_cq(continuous_query) reader = cache.get_search_service().execute_reader(query_command) if reader.get_field_count() > 0: while reader.read(): result = reader.get_value(Product, 1) # Perform operations else: # None query result set returned print("Query result is None") # Update Product data in cache to trigger callback updated_product = Product() updated_product.set_product_id(1001) updated_product.set_product_name("Tea") key = "Product:" + updated_product.get_product_id() cache_item = ncache.CacheItem(updated_product) # Trigger add notifications version = cache.insert(key, cache_item) # This will add item to the result set as it matches query criteria |
- NCache Details
- Registrieren Sie Abfragen und Benachrichtigungen
- Kontinuierliche Abfrage verwenden in NCache
Schritt 3: Benachrichtigungen von Continuous Query abmelden
Um den Empfang von Benachrichtigungen von Continuous Query in Ihrer Anwendung zu beenden, haben Sie die Möglichkeit, sie mithilfe von abzumelden Remove_data_modification_listener Methode. Dies ist jedoch spezifisch für Ereignistypen wie „Hinzufügen“ und „Entfernen“. Sie können sogar die Benachrichtigungen über das Hinzufügen von Elementen abmelden, wenn Sie sich zuvor sowohl für Ereignisse zum Hinzufügen als auch für das Entfernen von Elementen registriert haben, jetzt aber nur Benachrichtigungen für das entfernte Element benötigen.
1 2 3 |
# Unregister notifications for ItemAdded events only event_type = [ncache.EventType.ITEM_ADDED] c_query.remove_data_modification_listener(cq_event_listener, event_type) |
Schritt 4: Continuous Query vom Server abmelden
Um die Ressourcennutzung zu verhindern, wenn diese Abfrage nicht mehr benötigt wird, müssen Sie die fortlaufende Abfrage vom Server abmelden. Dadurch erhalten Sie keine Benachrichtigungen mehr über Änderungen in der Abfrageergebnismenge. Das folgende Codebeispiel zeigt, wie die Registrierung von CQ vom Cacheserver aufgehoben wird, indem die un_register_cq Methode.
1 2 |
# Unregister cq from server cache.get_messaging_service().un_register_cq(c_query) |
Zusammenfassung
Mit der NCacheMit der Continuous Query-Funktion von können Sie Änderungen in einem bestimmten Dataset innerhalb eines verteilten Cache-Clusters überwachen. Es stellt sicher, dass nur die notwendigen Daten von der Anwendung abgerufen werden, was die Effizienz verbessert und die irrelevante Datenverarbeitung über OQL-Abfragen und Cache-Level-Ereignisse minimiert. Eine so einfache und flexible Möglichkeit, Änderungen zu überwachen? Starten Sie noch heute Ihre kostenlose Testversion!