我们每天生成和消耗越来越多的数据。对于企业来说,数据处理的速度会影响他们做出的决策。随着对高效数据处理的需求不断增长,我们的软件需要满足这种需求。长轮询不仅仅是一种替代方案。我们来看一个 NCache 连续查询功能,旨在支持实时数据处理。
连续查询
通过连续查询, NCache 提供了一种在特定时间窗口内监视可观察数据集的机制。这边走, NCache 通知我们该数据集在缓存时发生的所有更改。连续查询作为一种监视更改的机制,而不是更改应用程序数据。
多亏了它 分布式架构, NCache 提供可扩展性、高可用性和存储效率。凭借其自我修复的点对点集群架构, NCache 处理大量传入数据以进行实时数据处理。
如何实现连续查询
通过连续查询, NCache 监视查询的结果。并且,与关系数据库不同,要使用类似 SQL 的查询来搜索项目, NCache 需要索引。否则,它必须扫描整个缓存才能找到我们想要监视的项目。它将使 NCache 慢。
对我们想要搜索的条目建立索引的一种简单方法是添加属性,例如 可索引查询 和 查询索引 我们的类和属性。例如,让我们监控给定机器模型在进行一些维护工作后产生的故障零件的数量。要按机器型号搜索缓存的测量结果,让我们注释一下 Measurement
像这样的类:
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Acme.Monitoring.CacheItems; public class Measurement { [QueryIndexed] public string MachineModel { get; set; } public int LastFailedPartCount { get; set; } public DateTimeOffset At { get; set; } } |
第 1 步:注册查询和通知
接下来,我们来安装 Alachisoft.NCache.SDK
NuGet 包定义一个查询来监视和注册缓存条目更改的通知。
例如,在 ASP 内部.NET Core 托管服务或任何其他后台处理器,让我们编写一个查询和一个通知来监视具有给定模型的所有机器的测量结果。像这样的东西:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 1. Define a continuous query var queryText = "SELECT $VALUE$ FROM Acme.Monitoring.CacheItems.Measurement WHERE MachineModel = ?"; // Inside our query text, we need to fully qualify our objects var queryCommand = new QueryCommand(queryText); queryCommand.Parameters.Add("MachineModel", "ACME-001"); var query = new ContinuousQuery(queryCommand); // 2. Register the notification query.RegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated, datafilter: EventDataFilter.DataWithMetadata); // 3. Register the continuous query ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.RegisterCQ(query); |
请注意,我们使用类似 SQL 的查询文本、查询命令和参数编写了一个连续查询。我们需要在查询文本中完全限定对象的名称。
在查询文本中,我们使用了 $VALUE$
投影来检索存储在缓存内的实际对象,而不仅仅是其属性之一。 NCache 支持其他 预测 检索标签、组和投影函数的结果,例如 SUM
, MIN
及 MAX
.
然后,我们注册了一个传递回调、事件类型和数据过滤器的通知。请注意,我们为两种事件类型注册了相同的回调。通过事件类型参数,我们指定要监视的事件。 NCache 支持三种事件类型: ItemAdded
, ItemUpdated
及 ItemRemoved
。并且,通过数据过滤器参数,我们可以在事件触发后在回调中指定所需的信息。 NCache 支持三个 数据过滤器:
1.无:它只返回添加、更新或删除条目的键。
2. 元数据:它返回受影响的键和元数据,例如组名称、项目优先级和项目版本。
3.数据与元数据:它返回缓存条目和关联的元数据。
在我们的示例中,每次我们为型号为“ACME-001”的机器添加或更新一个测量值时, NCache 呼叫 OnQueryResultSetChanged
方法,传递添加或更新的实际条目。我们用了 ItemAdded
和 EventType.ItemUpdated
作为事件类型和 DataWithMetadata
作为数据过滤器。
DataWithMetadata
有助于避免在回调中再次通过键获取项目。但我们要小心使用它,因为这是一次昂贵的网络旅行。
第 2 步:注册事件回调
我们在注册通知时传递的参数之一是 回电话。这是当满足我们的“监控”查询的项目受到我们注册的事件类型的影响时我们想要执行的操作。
继续我们的示例,这是一个要监听的回调 ItemAdded
和 ItemUpdated
事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public void OnQueryResultSetChanged(string key, CQEventArg arg) { switch (arg.EventType) { case EventType.ItemAdded: // A new measurement for an ACME-001 machine was added // Do something here... Console.WriteLine($"Item with key '{key}' has been added to result set of continuous query"); break; case EventType.ItemUpdated: // A measurement for an ACME-001 machine was changed Console.WriteLine($"Item with key '{key}' has been updated in the result set of the continuous query"); // Since we passed DataWithMetadata, we have access to the cache entry itself if (arg.Item != null) { var old = arg.OldItem.GetValue<Measurement>() var updated = arg.Item.GetValue<Measurement>(); // Do something here with old and updated // Send a notification, push a message into a topic... Console.WriteLine($"Updated product '{key}' has '{updatedMeasurement.FailPartCount}'"); } break; } } |
有了这个回调,我们就可以开始将测量结果从我们的机器提取到我们的缓存中。 NCache 将监视缓存项目并让我们知道何时添加或更新型号为“ACME-001”的测量。然后,如果任何机器的故障计数超过阈值,我们可以发送电子邮件或生成警报。
第 3 步:取消注册查询和通知
监视查询和通知客户端需要一些成本。但它不会影响缓存客户端,因为它是异步运行的。为了更好地使用连续查询,让我们 取消注册通知 当我们不再需要监视查询结果时进行查询,就像这样。
1 2 3 4 5 6 |
query.UnRegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated); ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.UnRegisterCQ(query); |
结论
这就是在 .NET 应用程序中实现连续查询的方法。通过连续查询,我们可以监视查询结果在一段时间内的添加、更改和删除情况。在这里,我们编写了一个示例来观察机器故障部件的数量。但我们可以将连续查询用于风险管理、欺诈检测、日志分析和其他实时场景。
要了解有关连续查询的更多信息,请查看 连续查询概述 指南。