Chaque jour, nous générons et consommons une quantité croissante de données. Pour les entreprises, la rapidité de traitement des données impacte les décisions qu’elles prennent. À mesure que la demande de traitement efficace des données augmente, nos logiciels doivent répondre à cette demande. Les longs sondages ne sont pas simplement une alternative. Regardons-en un NCache fonctionnalité, Requête Continue, conçue pour prendre en charge le traitement des données en temps réel.
Requête continue
Avec la requête continue, NCache fournit un mécanisme pour surveiller un ensemble de données observables dans une fenêtre de temps spécifique. Par ici, NCache nous informe de toutes les modifications apportées à cet ensemble de données pendant qu'il est mis en cache. La requête continue fonctionne comme un mécanisme permettant de surveiller les modifications et non de modifier les données de l'application.
Grace à ses architecture distribuée, NCache offre évolutivité, haute disponibilité et efficacité de stockage. Avec son architecture de clustering peer-to-peer auto-réparatrice, NCache gère de grandes quantités de données entrantes pour le traitement des données en temps réel.
Comment implémenter une requête continue
Avec des requêtes continues, NCache surveille le résultat d’une requête. Et contrairement aux bases de données relationnelles, pour rechercher des éléments à l'aide d'une requête de type SQL, NCache nécessite des index. Sinon, il faudrait analyser l’intégralité du cache pour trouver les éléments que nous souhaitons surveiller. Cela fera NCache lent.
Un moyen simple d'indexer les entrées que nous souhaitons rechercher consiste à ajouter des attributs tels que RequêteIndexable ainsi que RequêteIndexée à nos classes et propriétés. Par exemple, surveillons le nombre de pièces défectueuses créées par un modèle de machine donné après certains travaux de maintenance. Pour rechercher nos mesures mises en cache par modèle de machine, annotons notre Measurement
classe comme ceci :
1 2 3 4 5 6 7 8 9 |
using Alachisoft.NCache.Runtime.Caching; namespace Acme.Monitoring.CacheItems; public class Measurement { [QueryIndexed] public string MachineModel { get; set; } public int LastFailedPartCount { get; set; } public DateTimeOffset At { get; set; } } |
Étape 1 : enregistrer la requête et les notifications
Ensuite, installons le Alachisoft.NCache.SDK
Package NuGet pour définir une requête pour surveiller et enregistrer une notification pour les modifications dans nos entrées de cache.
Par exemple, dans un ASP.NET Core service hébergé ou tout autre processeur en arrière-plan, écrivons une requête et une notification pour surveiller les mesures de toutes les machines avec un modèle donné. Quelque chose comme ça:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 1. Define a continuous query var queryText = "SELECT $VALUE$ FROM Acme.Monitoring.CacheItems.Measurement WHERE MachineModel = ?"; // Inside our query text, we need to fully qualify our objects var queryCommand = new QueryCommand(queryText); queryCommand.Parameters.Add("MachineModel", "ACME-001"); var query = new ContinuousQuery(queryCommand); // 2. Register the notification query.RegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated, datafilter: EventDataFilter.DataWithMetadata); // 3. Register the continuous query ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.RegisterCQ(query); |
Remarquons que nous avons écrit une requête continue en utilisant un texte de requête, une commande de requête et un paramètre de type SQL. Nous devions qualifier complètement le nom de notre objet dans le texte de la requête.
Dans le texte de la requête, nous avons utilisé le $VALUE$
projection pour récupérer l’objet réel stocké dans le cache, pas seulement une de ses propriétés. NCache prend en charge d'autres projections pour récupérer les balises, les groupes et le résultat des fonctions de projection comme SUM
, MIN
et MAX
.
Ensuite, nous avons enregistré une notification transmettant un rappel, un type d'événement et un filtre de données. Notez que nous avons enregistré le même rappel pour deux types d'événements. Avec le paramètre type d'événement, nous spécifions l'événement que nous voulons surveiller. NCache prend en charge trois types d'événements : ItemAdded
, ItemUpdated
et ItemRemoved
. Et, avec le paramètre data filter, nous spécifions les informations souhaitées dans notre rappel une fois qu'un événement est déclenché. NCache soutient trois filtres de données:
1. Aucun: Il renvoie uniquement la clé de l'entrée ajoutée, mise à jour ou supprimée.
2. Métadonnées: Il renvoie la clé affectée et les métadonnées, telles que le nom du groupe, la priorité de l'élément et la version de l'élément.
3.DonnéesAvecMétadonnées: Il renvoie l'entrée du cache et les métadonnées associées.
Dans notre exemple, chaque fois que nous ajoutons ou mettons à jour une mesure pour une machine avec le modèle « ACME-001 », NCache appelle le OnQueryResultSetChanged
méthode, en transmettant l’entrée réelle ajoutée ou mise à jour. Nous avons utilisé ItemAdded
ainsi que EventType.ItemUpdated
comme type d'événement et DataWithMetadata
comme filtre de données.
La DataWithMetadata
est utile pour éviter de récupérer à nouveau les éléments par clé dans le rappel. Mais utilisons-le avec précaution car c'est un voyage réseau coûteux.
Étape 2 : Enregistrez les rappels pour les événements
L'un des paramètres que nous transmettons lors de l'enregistrement d'une notification est un rappeler. Il s'agit de l'action que nous souhaitons effectuer une fois qu'un élément qui satisfait à notre requête de « surveillance » est affecté par le type d'événement que nous avons enregistré.
Pour continuer avec notre exemple, ceci est un rappel à écouter ItemAdded
ainsi que ItemUpdated
événements:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public void OnQueryResultSetChanged(string key, CQEventArg arg) { switch (arg.EventType) { case EventType.ItemAdded: // A new measurement for an ACME-001 machine was added // Do something here... Console.WriteLine($"Item with key '{key}' has been added to result set of continuous query"); break; case EventType.ItemUpdated: // A measurement for an ACME-001 machine was changed Console.WriteLine($"Item with key '{key}' has been updated in the result set of the continuous query"); // Since we passed DataWithMetadata, we have access to the cache entry itself if (arg.Item != null) { var old = arg.OldItem.GetValue<Measurement>() var updated = arg.Item.GetValue<Measurement>(); // Do something here with old and updated // Send a notification, push a message into a topic... Console.WriteLine($"Updated product '{key}' has '{updatedMeasurement.FailPartCount}'"); } break; } } |
Avec ce rappel en place, nous pouvons commencer à ingérer les mesures de nos machines dans notre cache. NCache surveillera les éléments du cache et nous informera lorsqu'une mesure avec le modèle « ACME-001 » sera ajoutée ou mise à jour. Ensuite, nous pouvons envoyer un e-mail ou générer une alerte si le nombre d'échecs d'une machine dépasse un seuil.
Étape 3 : Désenregistrer la requête et les notifications
La surveillance des requêtes et la notification des clients entraînent certains coûts. Mais cela n'affecte pas les clients de cache puisqu'il s'exécute de manière asynchrone. Pour mieux utiliser les requêtes continues, commençons désinscrire les notifications et des requêtes lorsque nous n'avons plus besoin de surveiller le résultat de notre requête, comme celui-ci.
1 2 3 4 5 6 |
query.UnRegisterNotification( callback: new QueryDataNotificationCallback(OnQueryResultSetChanged), eventType: EventType.ItemAdded | EventType.ItemUpdated); ICache cache = CacheManager.GetCache("demoCache"); cache.MessagingService.UnRegisterCQ(query); |
Conclusion
Voilà comment implémenter une requête continue dans les applications .NET. Avec une requête continue, nous surveillons le résultat d'une requête pour les ajouts, les modifications et les suppressions pendant une fenêtre de temps. Ici, nous avons écrit un exemple pour surveiller le décompte des pièces défectueuses des machines. Mais nous pouvons utiliser la requête continue pour la gestion des risques, la détection des fraudes, l'analyse des journaux et d'autres scénarios en temps réel.
Pour en savoir plus sur la requête continue, consultez la Présentation de la requête continue guider.