随着数据对企业变得越来越重要,对实时数据处理(尤其是当数据以块或流的形式到达时)和分析的需求正以前所未有的速度增长。 依赖于实时数据处理的应用程序需要一种可靠的方法来在发生数据更改时接收实时通知,而不会导致手动轮询导致的性能开销,特别是在处理大型数据集时。 为了解决这些问题, NCache 提供其 连续查询 功能——您可以在其中创建根据您定义的标准过滤数据的查询。
无需手动轮询的麻烦,CQ 使开发人员能够实时跟踪数据变化并接收警报。 例如,在实时股票交易应用程序中,交易者需要不断更新不断变化的股票价格。 如果没有合适的实时监控系统,他们将不得不手动检查股票价格——这会导致浪费时间和错失良机。 因此,交易者可以创建过滤股票价格的查询,以便在过滤后的数据发生变化时接收实时更新。 这给了他们只查询他们选择的数据集的缓存的优势(通过使用 类似 SQL 的 OQL 查询) 在分布式缓存集群中,而不是耗尽缓存。 本博客将指导您使用 Continuous Query 来监控数据变化 NCache 使用 Python。
通过连续查询监控变化 NCache
NCache的连续查询功能,您可以在分布式缓存网络中定义一个特定的数据集,并使用 OQL 查询跟踪对其的更改。 什么时候 变化 发生在数据集中(由于写操作,如添加、更新或删除操作),注册的应用程序通过更新 缓存级别事件,它通过使用 OQL 查询过滤掉数据来防止应用程序重叠。 通过提供一种通过事件在应用程序之间跟踪和共享数据的方法,CQ 使开发人员能够指定他们的业务逻辑。
在 Python 中使用 CQ 时,用户可以利用 NCache 用于注册 CQ 事件的 Python 客户端库。 您还可以指定事件执行时返回的数据量,即 不包含, 元数据或 数据与元数据. 但是,必须了解 Continuous Query 仅用作监视更改而不是更改应用程序数据的机制。
在中配置连续查询 NCache
下面提到的步骤解释了注册和取消注册回调、查询和通知以接收针对您定义的数据集的通知的过程。
第 1 步:注册事件回调
首先,您需要为缓存级别事件注册一个回调,它可以是 Add、Update、Remove 或它们的组合,以便在修改查询时执行这些回调。 下面的示例代码显示了如何为 ITEM_ADDED 事件注册回调。
1 2 3 4 |
def query_item_callback(key: str, arg: ncache.CQEventArg): if arg.get_event_type() is ncache.EventType.ITEM_ADDED: # Key has been added to the cache print(key + " added to cache") |
您还可以注册回调事件 项目更新和删除事件.
第 2 步:注册查询和通知
跟踪数据变化的关键过程 NCache 正在注册回调并创建指定结果集标准的连续查询。 创建查询后,将根据以下条件注册预定义的回调 事件类型 和 事件数据过滤器. 接下来, 执行阅读器 用于在使用 CQ 在服务器上注册后查询缓存数据 注册_cq. 要触发事件,可以修改缓存数据以影响结果集。 下面的示例代码演示了这种情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# Precondition: Cache is already connected # Query for required operation query = "SELECT $Value$ FROM FQN.Product WHERE category = ?" query_command = ncache.QueryCommand(query) query_command.set_parameters({"Category": "Beverages"}) # Create continuous query continuous_query = ncache.ContinuousQuery(query_command) event_type = [ncache.EventType.ITEM_REMOVED] # Item remove notification # EventDataFilter.Metadata returns cache keys + item metadata on updation continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE) # Register continuous query on server cache.get_messaging_service().register_cq(continuous_query) reader = cache.get_search_service().execute_reader(query_command) if reader.get_field_count() > 0: while reader.read(): result = reader.get_value(Product, 1) # Perform operations else: # None query result set returned print("Query result is None") # Update Product data in cache to trigger callback updated_product = Product() updated_product.set_product_id(1001) updated_product.set_product_name("Tea") key = "Product:" + updated_product.get_product_id() cache_item = ncache.CacheItem(updated_product) # Trigger add notifications version = cache.insert(key, cache_item) # This will add item to the result set as it matches query criteria |
第 3 步:从连续查询中取消注册通知
要在您的应用程序中停止接收来自 Continuous Query 的通知,您可以选择使用 删除数据修改监听器 方法。 但是,这特定于添加和删除等事件类型。 如果您之前注册了项目添加和删除事件,您甚至可以取消注册项目添加通知,但现在只需要删除项目的通知。
1 2 3 |
# Unregister notifications for ItemAdded events only event_type = [ncache.EventType.ITEM_ADDED] c_query.remove_data_modification_listener(cq_event_listener, event_type) |
第 4 步:从服务器注销连续查询
为防止在不再需要此查询时占用资源,您必须从服务器中注销连续查询。 这样,您将不会再收到有关查询结果集更改的通知。 下面的代码示例展示了如何使用 取消注册cq 方法。
1 2 |
# Unregister cq from server cache.get_messaging_service().un_register_cq(c_query) |
结论
NCache的连续查询功能,您可以监视分布式缓存集群内特定数据集的变化。 它确保应用程序只检索必要的数据,从而提高效率并通过 OQL 查询和缓存级别事件最大限度地减少无关数据处理。 这么简单灵活的方式来监控变化? 开始免费试用!