Messagerie et flux

De nos jours, de nombreuses applications serveur doivent coordonner leur travail avec d'autres applications et partager des messages et des données de manière asynchrone. Et cela se fait souvent dans un environnement de transactions élevées. Pour les environnements à transactions élevées, NCache est un candidat idéal pour ces communications asynchrones basées sur les événements.

NCache fournit un riche ensemble de fonctionnalités de messagerie et de flux pour répondre aux besoins de ces applications, notamment Messagerie Pub/Sub, Requêtes continues, Événements au niveau de l'objetet Événements au niveau du cache. Chacun est décrit ci-dessous.

Messagerie Pub/Sub, requête continue et événements
 

Messagerie Pub/Sub

NCache fournit un puissant Messagerie Pub/Sub plate-forme pour applications où les applications Publisher peuvent envoyer des messages sans savoir qui sont les abonnés. Les messages peuvent être classés en Les sujets puis les abonnés peuvent s'abonner à des sujets spécifiques et recevoir ces messages de manière asynchrone.

  • - Editeur: publie le message à consommer par plusieurs abonnés. L'éditeur ne sait pas qui sont les abonnés et combien.
  • - Abonné: s'enregistre pour recevoir des messages sur un sujet spécifique. L'abonné ne sait pas qui est l'éditeur ni combien d'autres abonnés il y a.
  • - Topic: tous les messages sont publiés et reçus par rapport aux sujets. Les éditeurs publient des messages à son sujet et les abonnés en reçoivent des messages. Vous pouvez avoir plusieurs sujets dans NCache.
  • - Message: il s'agit du message réel envoyé par les éditeurs et consommé par les abonnés.

Vous pouvez facilement utilisé NCache Messagerie Pub/Sub pour les applications distribuées une fois que vous aurez compris les composants de la messagerie Pub/Sub et leur fonctionnement.

Messagerie Pub/Sub avec NCache

Vous trouverez ci-dessous un exemple de code source sur comment un éditeur publie un message à un sujet, puis à un l'abonné s'abonne au sujet.

ITopic topic = _cache.MessagingService.CreateTopic(topicName)

// Creating new message containing a “payload” 
var message = new Message(payload);

// Set the expiration TimeSpan of the message
message.ExpirationTime = TimeSpan.FromSeconds(5000);

// Publishing the above created message.
topic.Publish(message, DeliveryOption.All, true);

// Get the already created topic to subscribe to it
ITopic orderTopic = cache.MessagingService.GetTopic(topic);

// Create and register subscribers for the topic created
// MessageRecieved callback is registered
orderTopic.CreateSubscription(MessageReceived);
String topicName = "orderUpdates";
Topic topic = cache.getMessagingService().createTopic(topicName);

// Creating new message containing a “payload”
Customer payload = new Customer("David Jones", "XYZCompany");
var message = new Message(payload);

// Set the expiration TimeSpan of the message
message.setExpirationTime(TimeSpan.FromSeconds(5000));

// Publishing the above created message.
topic.publish(message, DeliveryOption.All, true);

// Get the already created topic to subscribe to it
Topic orderTopic = cache.getMessagingService().getTopic(topicName);

// Create and register subscribers for the topic created
orderTopic.createSubscription(new MessageReceivedListener() {
    @Override
     public void onMessageReceived(Object sender, MessageEventArgs args) {
// message received
     }
});
 

Requête continue (CQ)

Requête continue est une fonctionnalité puissante de NCache qui vous permet de surveiller les modifications apportées à un « ensemble de données » dans le cache distribué sur la base d'une requête de type SQL. Vous créez une requête continue (CQ) avec des critères de type SQL sur le cache et l'enregistrez avec NCache. NCache puis commence à surveiller toute modification apportée à cet « ensemble de données » dans le cache comprenant :

  • - Ajouter : si un élément est ajouté au cache qui correspond aux critères CQ, le client est averti avec cet objet.
  • - Mettre à jour: si un élément est mis à jour dans le cache et correspond aux critères CQ, le client est averti avec cet objet.
  • - Retirer: si un élément est supprimé du cache et correspond aux critères CQ, le client est informé avec cet objet.

Voici comment vous pouvez créer une requête continue avec des critères spécifiques et vous inscrire sur le serveur.

// Query for required operation
string query = "SELECT $VALUE$ FROM FQN.Customer WHERE Country = ?";

var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Country", "USA");

// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);

// Register to be notified when a qualified item is added to the cache
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded, EventDataFilter.None);

// Register continuousQuery on server 
cache.MessagingService.RegisterCQ(cQuery);
String cacheName = "demoCache";

// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);

// Query for required operation
String query = "SELECT $VALUE$ FROM com.alachisoft.ncache.sample.Customer WHERE country = ?";

var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Country", "USA");

// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);

// Register to be notified when a qualified item is added to the cache
cQuery.addDataModificationListener(new QueryDataModificationListener() {
	@Override
	public void onQueryDataModified(String key, CQEventArg args) {
		switch (args.getEventType())
		{
			case ItemAdded:
				// 'key' has been added to the cache
				break;
		}
	}
  }, EnumSet.allOf(EventType.class), EventDataFilter.None);

// Register continuousQuery on server
cache.getMessagingService().registerCQ(cQuery);

Pour plus de détails, vous pouvez voir comment utiliser la requête continue dans les documents de cache.

 

Notifications d'événement

Les clients s'inscrivent auprès de NCache recevoir notifications d'événements ils sont intéressés en fournissant une fonction de rappel. NCache fournit des filtres avec des événements utilisés pour spécifier la quantité d'informations renvoyées avec l'événement. Ces filtres sont Aucun (par défaut), Métadonnées et DonnéesAvecMétadonnées. Les types d'événements suivants sont pris en charge :

  • - Événements au niveau de l'objet: le client s'est inscrit pour être averti chaque fois qu'un élément spécifique mis en cache (basé sur une clé) est mis à jour ou supprimé du cache et fournit un rappel pour celui-ci. Lorsque cet élément est mis à jour ou supprimé, le client en est informé.
  • - Événements au niveau du cache: Les événements de niveau cache sont les événements généraux qui peuvent être utiles lorsque toutes les applications clientes doivent être notifiées de toute modification dans le cache par n'importe quelle application cliente connectée.

Le code ci-dessous montre l'enregistrement des événements au niveau des éléments.

public static void RegisterItemLevelEvents()
{
	// Get cache
	string cacheName = "demoCache";
	var cache = CacheManager.GetCache(cacheName); 

	// Register Item level events
	string key = "Product:1001";
	cache.MessagingService.RegisterCacheNotification(key, CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
public static void CacheDataNotificationCallbackImpl(string key, CacheEventArg cacheEventArgs)
{
	switch (cacheEventArgs.EventType)
	{
	   case EventType.ItemAdded:
		   // 'key' has been added to the cache
		   break;
	   case EventType.ItemUpdated:
		   // 'key' has been updated in the cache
		   break;
	   case EventType.ItemRemoved:
		   // 'key' has been removed from the cache
		   break;
	}
}
private static void RegisterItemLevelEvents() throws Exception {
        String cacheName = "demoCache";

        // Initialize an instance of the cache to begin performing operations:
        Cache cache = CacheManager.getCache(cacheName);
        String key = "Product:1001";
        cache.getMessagingService().addCacheNotificationListener(key, new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
    }
private static class CacheDataModificationListenerImpl implements CacheDataModificationListener {
	@Override
	public void onCacheDataModified(String key, CacheEventArg eventArgs) {
		switch (eventArgs.getEventType())
		{
			case ItemAdded:
				// 'key' has been added to the cache
				break;
			case ItemUpdated:
				// 'key' has been updated in the cache
				break;
			case ItemRemoved:
				// 'key' has been removed from the cache
				break;
		}
	}
	@Override
	public void onCacheCleared(String cacheName) {
		//cacheName cleared
	}
}

Pour les événements au niveau du cache, vous pouvez utiliser le même rappel que celui utilisé dans les événements au niveau de l'élément.

public static void RegisterCacheLevelEvents()
{
	// Get cache
	string cacheName = "demoCache";
	var cache = CacheManager.GetCache(cacheName);

	// Register cache level events
	cache.MessagingService.RegisterCacheNotification(EventsSample.CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata); 
}
private static void RegisterCacheLevelEvents() throws Exception {
	String cacheName = "demoCache";

	// Initialize an instance of the cache to begin performing operations:
	Cache cache = CacheManager.getCache(cacheName);

	CacheEventDescriptor eventDescriptor = cache.getMessagingService().addCacheNotificationListener(new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
 

Constat NCache?

Pour .NET et Java applications, NCache est le choix idéal pour la messagerie Pub/Sub à transactions élevées, les requêtes continues et les événements. Voici pourquoi.

  • Extrêmement rapide et évolutif : NCache est extrêmement rapide car totalement en mémoire et vous offre des temps de réponse inférieurs à la milliseconde. Et c'est un cache distribué qui vous permet d'ajouter des serveurs au cluster de cache pour atteindre une évolutivité linéaire et gérer des charges de transaction extrêmes.
  • Haute disponibilité grâce à la réplication des données : NCache réplique intelligemment les données dans le cache sans compromettre les performances. Ainsi, vous ne perdrez aucune donnée (événements et messages) même si un serveur de cache tombe en panne.
  • Les applications .NET et Java s'envoient des messages : NCache permet aux applications .NET et Java de partager une plate-forme commune de messagerie et de flux et de s'envoyer des messages en sérialisant les messages en JSON ou en envoyant des documents JSON sous forme de messages.

Avec cela, vous pouvez utiliser NCache comme une plate-forme de messagerie puissante mais simple. En savoir plus sur NCache à partir des liens ci-dessous et téléchargez une version d'essai de 30 jours entièrement fonctionnelle.

Que faire ensuite?

© Copyright Alachisoft 2002 - . Tous droits réservés. NCache est une marque déposée de Diyatech Corp.