Molte applicazioni server oggi necessitano di coordinare il lavoro con altre applicazioni e condividere messaggi e dati in modo asincrono. E spesso ciò avviene in un ambiente ad alta transazione. Per ambienti con transazioni elevate, NCache è un candidato ideale per questi comunicazioni asincrone guidate da eventi.
NCache fornisce un ricco set di funzionalità di messaggistica e flussi per soddisfare le esigenze di queste applicazioni, incluse Messaggistica Pub/Sub, Query continue, Eventi a livello di oggettoe Eventi a livello di cache. Ciascuno è descritto di seguito.
NCache fornisce un potente Messaggistica Pub/Sub piattaforma per applicazioni in cui le applicazioni Editore possono inviare messaggi senza sapere chi sono gli Abbonati. I messaggi possono essere classificati in Argomenti quindi gli abbonati possono iscriversi ad argomenti specifici e ricevere questi messaggi in modo asincrono.
Si può facilmente uso NCache Messaggistica Pub/Sub per applicazioni distribuite una volta compresi i componenti della messaggistica Pub/Sub e il loro funzionamento.
Di seguito è riportato un esempio di codice sorgente su come un editore pubblica un messaggio ad un argomento e poi a l'abbonato si iscrive all'argomento.
ITopic topic = _cache.MessagingService.CreateTopic(topicName)
// Creating new message containing a “payload”
var message = new Message(payload);
// Set the expiration TimeSpan of the message
message.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publishing the above created message.
topic.Publish(message, DeliveryOption.All, true);
// Get the already created topic to subscribe to it
ITopic orderTopic = cache.MessagingService.GetTopic(topic);
// Create and register subscribers for the topic created
// MessageRecieved callback is registered
orderTopic.CreateSubscription(MessageReceived);
String topicName = "orderUpdates";
Topic topic = cache.getMessagingService().createTopic(topicName);
// Creating new message containing a “payload”
Customer payload = new Customer("David Jones", "XYZCompany");
var message = new Message(payload);
// Set the expiration TimeSpan of the message
message.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publishing the above created message.
topic.publish(message, DeliveryOption.All, true);
// Get the already created topic to subscribe to it
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
// Create and register subscribers for the topic created
orderTopic.createSubscription(new MessageReceivedListener() {
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// message received
}
});
Interrogazione continua è una caratteristica potente di NCache che consente di monitorare le modifiche a un "set di dati" nella cache distribuita in base a query di tipo SQL. Crei una query continua (CQ) con criteri simili a SQL rispetto alla cache e la registri NCache. NCache poi inizia a monitorare eventuali modifiche a questo "set di dati" nella cache tra cui:
Ecco come puoi creare una query continua con criteri specifici e registrarti sul server.
// Query for required operation
string query = "SELECT $VALUE$ FROM FQN.Customer WHERE Country = ?";
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Country", "USA");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Register to be notified when a qualified item is added to the cache
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded, EventDataFilter.None);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
// Query for required operation
String query = "SELECT $VALUE$ FROM com.alachisoft.ncache.sample.Customer WHERE country = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Country", "USA");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Register to be notified when a qualified item is added to the cache
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg args) {
switch (args.getEventType())
{
case ItemAdded:
// 'key' has been added to the cache
break;
}
}
}, EnumSet.allOf(EventType.class), EventDataFilter.None);
// Register continuousQuery on server
cache.getMessagingService().registerCQ(cQuery);
Per ulteriori dettagli, puoi vedere come utilizzare la query continua nei documenti della cache.
I clienti si registrano con NCache ricevere notifiche di eventi sono interessati fornendo una funzione di callback. NCache fornisce filtri con eventi utilizzati per specificare la quantità di informazioni restituite con l'evento. Questi filtri sono Nessuno (predefinito), Metadati ed Dati con metadati. Sono supportati i seguenti tipi di eventi:
Il codice fornito di seguito mostra la registrazione degli eventi a livello di articolo.
public static void RegisterItemLevelEvents()
{
// Get cache
string cacheName = "demoCache";
var cache = CacheManager.GetCache(cacheName);
// Register Item level events
string key = "Product:1001";
cache.MessagingService.RegisterCacheNotification(key, CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
public static void CacheDataNotificationCallbackImpl(string key, CacheEventArg cacheEventArgs)
{
switch (cacheEventArgs.EventType)
{
case EventType.ItemAdded:
// 'key' has been added to the cache
break;
case EventType.ItemUpdated:
// 'key' has been updated in the cache
break;
case EventType.ItemRemoved:
// 'key' has been removed from the cache
break;
}
}
private static void RegisterItemLevelEvents() throws Exception {
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
String key = "Product:1001";
cache.getMessagingService().addCacheNotificationListener(key, new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
private static class CacheDataModificationListenerImpl implements CacheDataModificationListener {
@Override
public void onCacheDataModified(String key, CacheEventArg eventArgs) {
switch (eventArgs.getEventType())
{
case ItemAdded:
// 'key' has been added to the cache
break;
case ItemUpdated:
// 'key' has been updated in the cache
break;
case ItemRemoved:
// 'key' has been removed from the cache
break;
}
}
@Override
public void onCacheCleared(String cacheName) {
//cacheName cleared
}
}
Per gli eventi a livello di cache, puoi utilizzare lo stesso callback utilizzato negli eventi a livello di elemento.
public static void RegisterCacheLevelEvents()
{
// Get cache
string cacheName = "demoCache";
var cache = CacheManager.GetCache(cacheName);
// Register cache level events
cache.MessagingService.RegisterCacheNotification(EventsSample.CacheDataNotificationCallbackImpl, EventType.ItemUpdated | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
}
private static void RegisterCacheLevelEvents() throws Exception {
String cacheName = "demoCache";
// Initialize an instance of the cache to begin performing operations:
Cache cache = CacheManager.getCache(cacheName);
CacheEventDescriptor eventDescriptor = cache.getMessagingService().addCacheNotificationListener(new CacheDataModificationListenerImpl(), EnumSet.allOf(EventType.class), EventDataFilter.DataWithMetadata);
}
Nel .NET e Java applicazioni, NCache è la scelta ideale per eseguire messaggi Pub/Sub con transazioni elevate, query continue ed eventi. Ecco perché.
Con questo, puoi usare NCache come piattaforma di messaggistica potente ma semplice. Leggi di più su NCache dai collegamenti sottostanti e scarica una versione di prova di 30 giorni completamente funzionante.