De nos jours, de nombreuses applications serveur doivent coordonner leur travail avec d'autres applications et partager des messages et des données de manière asynchrone. Et cela se fait souvent dans un environnement de transactions élevées. Pour les environnements à transactions élevées, NCache est un candidat idéal pour ces communications asynchrones basées sur les événements.
NCache fournit un riche ensemble de fonctionnalités de messagerie et de flux pour répondre aux besoins de ces applications, notamment Messagerie Pub/Sub, Requêtes continues, Événements au niveau de l'objetet Événements au niveau du cache. Chacun est décrit ci-dessous.
NCache fournit un puissant Messagerie Pub/Sub plate-forme pour applications où les applications Publisher peuvent envoyer des messages sans savoir qui sont les abonnés. Les messages peuvent être classés en Les sujets puis les abonnés peuvent s'abonner à des sujets spécifiques et recevoir ces messages de manière asynchrone.
Vous pouvez facilement utilisé NCache Messagerie Pub/Sub pour les applications distribuées une fois que vous aurez compris les composants de la messagerie Pub/Sub et leur fonctionnement.
Vous trouverez ci-dessous un exemple de code source sur comment un éditeur publie un message à un sujet, puis à un l'abonné s'abonne au sujet.
ITopic topic = _cache.MessagingService.CreateTopic(topicName)
// Creating new message containing a “payload”
var message = new Message(payload);
// Set the expiration TimeSpan of the message
message.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publishing the above created message.
topic.Publish(message, DeliveryOption.All, true);
// Get the already created topic to subscribe to it
ITopic orderTopic = cache.MessagingService.GetTopic(topic);
// Create and register subscribers for the topic created
// MessageRecieved callback is registered
orderTopic.CreateSubscription(MessageReceived);
String topicName = "orderUpdates";
Topic topic = cache.getMessagingService().createTopic(topicName);
// Creating new message containing a “payload”
Customer payload = new Customer("David Jones", "XYZCompany");
var message = new Message(payload);
// Set the expiration TimeSpan of the message
message.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publishing the above created message.
topic.publish(message, DeliveryOption.All, true);
// Get the already created topic to subscribe to it
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
// Create and register subscribers for the topic created
orderTopic.createSubscription(new MessageReceivedListener() {
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// message received
}
});
Requête continue est une fonctionnalité puissante de NCache qui vous permet de surveiller les modifications apportées à un « ensemble de données » dans le cache distribué sur la base d'une requête de type SQL. Vous créez une requête continue (CQ) avec des critères de type SQL sur le cache et l'enregistrez avec NCache. NCache puis commence à surveiller toute modification apportée à cet « ensemble de données » dans le cache comprenant :
Voici comment vous pouvez créer une requête continue avec des critères spécifiques et vous inscrire sur le serveur.
// Query for required operation
string query = "SELECT $VALUE$ FROM FQN.Customer WHERE Country = ?";
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Country", "USA");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Register to be notified when a qualified item is added to the cache
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded, EventDataFilter.None);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
// Query for required operation
String query = "SELECT $VALUE$ FROM com.alachisoft.ncache.sample.Customer WHERE country = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Country", "USA");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Register to be notified when a qualified item is added to the cache
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg args) {
switch (args.getEventType())
{
case ItemAdded:
// 'key' has been added to the cache
break;
}
}
}, EnumSet.allOf(EventType.class), EventDataFilter.None);
// Register continuousQuery on server
cache.getMessagingService().registerCQ(cQuery);
Pour plus de détails, vous pouvez voir comment utiliser la requête continue dans les documents de cache.
Les clients s'inscrivent auprès de NCache recevoir notifications d'événements ils sont intéressés en fournissant une fonction de rappel. NCache fournit des filtres avec des événements utilisés pour spécifier la quantité d'informations renvoyées avec l'événement. Ces filtres sont Aucun (par défaut), Métadonnées et DonnéesAvecMétadonnées. Les types d'événements suivants sont pris en charge :
Le code ci-dessous montre l'enregistrement des événements au niveau des éléments.
public static void RegisterItemLevelEvents()
{
// Get cache
string cacheName = "demoCache";
var cache = CacheManager.GetCache(cacheName);
// Register Item level events
string key = "Product:1001";
cache.MessagingService.RegisterCacheNotification(key, CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
public static void CacheDataNotificationCallbackImpl(string key, CacheEventArg cacheEventArgs)
{
switch (cacheEventArgs.EventType)
{
case EventType.ItemAdded:
// 'key' has been added to the cache
break;
case EventType.ItemUpdated:
// 'key' has been updated in the cache
break;
case EventType.ItemRemoved:
// 'key' has been removed from the cache
break;
}
}
private static void RegisterItemLevelEvents() throws Exception {
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
String key = "Product:1001";
cache.getMessagingService().addCacheNotificationListener(key, new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
private static class CacheDataModificationListenerImpl implements CacheDataModificationListener {
@Override
public void onCacheDataModified(String key, CacheEventArg eventArgs) {
switch (eventArgs.getEventType())
{
case ItemAdded:
// 'key' has been added to the cache
break;
case ItemUpdated:
// 'key' has been updated in the cache
break;
case ItemRemoved:
// 'key' has been removed from the cache
break;
}
}
@Override
public void onCacheCleared(String cacheName) {
//cacheName cleared
}
}
Pour les événements au niveau du cache, vous pouvez utiliser le même rappel que celui utilisé dans les événements au niveau de l'élément.
public static void RegisterCacheLevelEvents()
{
// Get cache
string cacheName = "demoCache";
var cache = CacheManager.GetCache(cacheName);
// Register cache level events
cache.MessagingService.RegisterCacheNotification(EventsSample.CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
private static void RegisterCacheLevelEvents() throws Exception {
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
CacheEventDescriptor eventDescriptor = cache.getMessagingService().addCacheNotificationListener(new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
Pour .NET et Java applications, NCache est le choix idéal pour la messagerie Pub/Sub à transactions élevées, les requêtes continues et les événements. Voici pourquoi.
Avec cela, vous pouvez utiliser NCache comme une plate-forme de messagerie puissante mais simple. En savoir plus sur NCache à partir des liens ci-dessous et téléchargez une version d'essai de 30 jours entièrement fonctionnelle.