Pub/Sub 모델의 주제에 메시지 게시
XNUMXD덴탈의 ITopic
/Topic
pub/Sub 모델의 인터페이스는 주제에 대한 메시지 게시를 용이하게 합니다. 메시지 전달 실패, 메시지 수신, 주제 삭제에 대한 이벤트 등록도 제공합니다.
주의 사항
이 기능은 다음에서도 사용할 수 있습니다. NCache Professional.
그것은 제공합니다 Publish
캐시의 특정 주제에 메시지를 게시하는 메서드입니다. 주제에 메시지를 게시하는 동안 게시자는 다음을 설정할 수 있습니다. 배송 옵션 메시지의 경우, 메시지 만료, 메시지 전달 실패 알림및 주제 삭제 알림.
여기에서는 방법을 설명합니다. 메시지 게시, 메시지를 비동기적으로 게시, 대량 메시지 게시 와 주문된 메시지 게시.
사전 조건
- 모든 작업에 필요한 표준 전제 조건에 대해 알아보려면 NCache 게시/구독 모델을 포함한 클라이언트측 기능은 다음 페이지를 참조하세요. 클라이언트 측 API 전제 조건.
- API 세부 정보는 다음을 참조하세요. 아이캐시, 캐시 아이템, 아이토픽, 게시이다., 주제 가져오기, 게시비동기화, 만료 시간, 메시지 배달 실패, 주제 삭제됨, 배송옵션, 보내실 내용, 대량 게시, MessageFailedEventArgs, 주제DeleteEventArgs.
- 모든 작업에 필요한 표준 전제 조건에 대해 알아보려면 NCache 게시/구독 모델을 포함한 클라이언트측 기능은 다음 페이지를 참조하세요. 클라이언트 측 API 전제 조건.
- API 세부 정보는 다음을 참조하세요. 캐시, 캐시 아이템, 주제, getTopic, 게시, 보내실 내용, 만료시간 설정, 토픽리스너, addMessageDeliveryFailureListener, addTopicDeletedListener, 배송옵션, All, 게시 비동기, 대량 게시, 주제DeleteEventArgs, MessageFailedEventArgs, 메시지 이벤트 인수.
- 모든 작업에 필요한 표준 전제 조건에 대해 알아보려면 NCache 게시/구독 모델을 포함한 클라이언트측 기능은 다음 페이지를 참조하세요. 클라이언트 측 API 전제 조건.
- API 세부 정보는 다음을 참조하세요. 캐시, 캐시 아이템, 주제, getTopic, 게시, getMessagingService, 시간 범위, 만료시간 설정, 토픽리스너, addMessageDeliveryFailureListener, addTopicDeletedListener, 배송옵션, 대량 게시, 메시지 수신 리스너.
- 모든 작업에 필요한 표준 전제 조건에 대해 알아보려면 NCache 게시/구독 모델을 포함한 클라이언트측 기능은 다음 페이지를 참조하세요. 클라이언트 측 API 전제 조건.
- API 세부 정보는 다음을 참조하세요. 캐시, 캐시 아이템, get_topic, get_messaging_service, 시간 범위, set_expiration_time, add_message_delivery_failure_listener, add_topic_deleted_listener, 게시_비동기화, 게시_대량, 메시지 이벤트 인수, get_topic_name, MessageFailedEventArgs, get_message_failure_reason, 주제DeleteEventArgs.
메시지 게시
다음 코드 샘플은 다음을 수행합니다.
- 주문 관련 메시지에 대한 전용 주제를 만듭니다.
- 등록
MessageDeliveryFailure
주제에 대한 이벤트입니다.
- 등록
OnTopicDeleted
주제에 대한 이벤트입니다.
- 만료 및 배달 옵션을 활성화하여 각 주제에 대한 메시지를 만듭니다.
- 메시지를 게시합니다.
// Precondition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
orderTopic.Publish(orderMessage, DeliveryOption.All, true);
}
else
{
// No topic exists
}
// Precondition: Cache is already connected
// Already existing topic
String topicName = "orderTopic";
// Get topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, DeliveryOption.All, true);
} else {
// No topic exists
}
// This is an async method
// Precondition: Cache is already connected
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (!(orderTopic == null)) {
// Create object to be sent in message
let order = await this.fetchOrdersFromDb(this.orderId);
// Create a new message with the object order
let orderMessage = new ncache.Message(order);
// Set the expiration time of the message
let expiryTime = new ncache.TimeSpan(12, 12, 12);
orderMessage.setExpirationTime(expiryTime);
let topicListener = new ncache.MyTopicListener();
// Register message delivery failure
orderTopic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, ncache.DeliveryOption.All, true);
} else {
// No topic exists
}
# Precondition: Cache is already connected
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True)
else:
# No topic exists
print("Topic not found")
주의 사항
작업이 안전하도록 하려면 에 설명된 대로 응용 프로그램 내에서 잠재적인 예외를 처리하는 것이 좋습니다. 처리 실패.
비동기식으로 게시
메시지는 다음을 사용하여 주제에 비동기적으로 게시할 수 있습니다. 게시비동기화 애플리케이션이 다음 작업을 수행하기 위해 작업 완료를 기다리지 않도록 합니다. 사용자는 추가 처리를 위해 즉시 컨트롤을 반환합니다.
다음 예에서는 메시지를 비동기식으로 게시할 수 있습니다.
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
Task task = orderTopic.PublishAsync(orderMessage, DeliveryOption.All, true);
if(task.IsFaulted)
{
// Task Failed
}
}
else
{
// No topic exists
}
// Topic orderTopic already exists in the cache
String topicName = "orderTopic";
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
TimeScheduler.Task task = (TimeScheduler.Task) orderTopic.publishAsync(orderMessage, DeliveryOption.All, true);
if (task.IsCancelled()) {
// task cancelled
}
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
task = order_topic.publish_async(order_message, ncache.DeliveryOption.ALL, True)
# Use this task object as per your business logic
else:
# No topic exists
print("Topic not found")
대량 메시지 게시
다음을 사용하여 한 번의 호출로 여러 메시지를 게시할 수 있습니다. 대량 게시 방법. 이렇게 하면 대량의 메시지가 단일 호출로 결합되고 게시되므로 성능과 메모리 사용량이 향상됩니다.
아래 코드는 이미 생성된 주제의 인스턴스를 사용합니다. orderTopic
, 주제에 대한 대량 메시지 게시를 보여줍니다.
// Topic "orderTopic" exists in cache
ITopic topic = cache.MessagingService.GetTopic("orderTopic");
if (topic != null)
{
// create dictionary for storing bulk
List<Tuple<Message, DeliveryOption>> messageList = new List<Tuple<Message, DeliveryOption>>();
Order[] orders = FetchOrdersFromDB();
for (int i = 0; i < 100; i++)
{
Message message = new Message(orders[i]);
message.ExpirationTime = TimeSpan.FromSeconds(10000);
messageList.Add(new Tuple<Message, DeliveryOption>(message, DeliveryOption.All));
}
// Register message delivery failure
topic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
topic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
// In case of failed publishing of messages, exceptions
// will be returned
IDictionary<Message, Exception> keys = topic.PublishBulk(messageList, true);
}
// topic already exists
String topicName = "orderTopic";
String customerID = "DUMON";
Topic topic = cache.getMessagingService().getTopic(topicName);
if (topic != null) {
// create dictionary for storing bulk
Map messageMap = new HashMap();
Order[] orders = fetchOrdersFromDb(customerID);
for (int i = 0; i < 100; i++) {
Message message = new Message(orders[i]);
message.setExpirationTime(TimeSpan.FromSeconds(10000));
messageMap.put(message, DeliveryOption.All);
}
MyTopicListener topicListener = new MyTopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
Map<Message, Exception> keys = topic.publishBulk(messageMap, true);
} else {
// topic is null
}
// This is an async method
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let topic = ncache.getMessagingService().getTopic(topicName);
if (topic != null) {
// Create Map for storing messages in bulk
let messageMap = new Map();
let orders = this.fetchOrdersByCustomerId(this.customerId);
var i;
for (i = 0; i < 100; i++) {
let message = new ncache.Message(orders[i]);
message.setExpirationTime(ncache.TimeSpan.FromSeconds(10000));
messageMap.set(message, ncache.DeliveryOption.All);
}
let topicListener = new ncache.TopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
// In case of failed publishing of messages, exception will be returned
let keys = topic.publishBulk(messageMap, true);
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
topic = cache.get_messaging_service().get_topic(topic_name)
if topic is not None:
# Create Dict for storing messages in bulk
messages_map = {}
orders = fetch_orders_by_customer_id("ALFKI")
for i in range(0, 100):
message = ncache.Message(orders[++i])
message.set_expiration_time(ncache.TimeSpan.from_seconds(10000))
messages_map[message] = ncache.DeliveryOption.ALL
# Register message delivery failure
topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
# In case of failed publishing of messages, exception will be returned
keys = topic.publish_bulk(messages_map, True)
else:
# No topic exists
print("Topic not found")
주문된 메시지 게시
주의 사항
이 기능은 다음에서만 사용할 수 있습니다. NCache 5.2 이상
메시지가 특정 순서로 게시되도록 하는 시퀀스 이름을 언급하여 메시지를 게시할 수 있습니다. 지정하려면 주문된 메시지, 문자열 시퀀스 이름이 메시지 체인에 추가되어 동일한 서버 노드의 특정 시퀀스 이름에 속하는 모든 메시지를 게시합니다.
아래 주어진 예에서 시퀀스 이름이 메시지와 함께 추가되고 메시지는 다음을 사용하여 게시됩니다. Publish
방법.
// Specify the topic name that already exists
string topicName = "orderTopic";
// Get the topic with the specified name
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (topicName != null)
{
for (int i = 0; i < 30; i++)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the messages
string sequenceName = "OrderMessages";
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publish message with the sequence name
orderTopic.Publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
}
else
{
// No topic found
}
// Specify the topic name that already exists
String topicName = "orderTopic";
// Get the topic with the specified name
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
for (int i = 0; i < 30; i++) {
// Create the object to be sent in the message
Order order = fetchOrdersFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the message
String sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
} else {
// No topic found
}
// This is an async method
// Specify the topic name that already exists
let topicName = "orderTopic";
// Get the topic with the specified name
let orderTopic = await this.cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
var i;
for (i = 0; i < 30; i++) {
// Create the object to be sent in the message
let order = await this.fetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new ncache.Message(order);
// Specify a unique sequence name for the message
let sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(ncache.TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(
orderMessage,
ncache.DeliveryOption.All,
sequenceName,
true
);
}
} else {
// No topic found
}
# Specify the topic name that already exists
topic_name = "orderTopic"
# Get the topic with the specified name
order_topic = cache.get_messaging_service().get_topic(topic_name)
if topic_name is not None:
for i in range(0, 30):
# Create the object to be sent in the message
order = fetch_order_from_db(10248)
# Create the new message with the object order
order_message = ncache.Message(order)
# Specify a unique sequence name for the message
sequence_name = "OrderMessage"
# Set the expiration time of the message
order_message.set_expiration_time(ncache.TimeSpan.from_seconds(5000))
# Publish message with the sequence name
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True, sequence_name)
else:
# No topic found
print("Topic not found")
콜백 등록
private void OnFailureMessageReceived(object sender, MessageFailedEventArgs args)
{
// Failure reason can be get from args.MessageFailureReason
}
private void TopicDeleted(object sender, TopicDeleteEventArgs args)
{
// Deleted topic is args.TopicName
}
@Override
public void onTopicDeleted(Object sender, TopicDeleteEventArgs args) {
// Deleted topic is args.getTopicName();
}
@Override
public void onMessageDeliveryFailure(Object sender, MessageFailedEventArgs args) {
// Failure reason is args.getMessageFailureReason();
}
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// Perform operations
}
onTopicDeleted(sender, args) {
// Deleted topic is args.getTopicName()
}
onMessageDeliveryFailure(sender, args) {
// Failure reason is args.getMessageFailureReason()
}
onMessageReceived(sender, args) {
// Perform operations
}
def on_topic_deleted(sender: object, args: ncache.TopicDeleteEventArgs):
# Perform operations
print("Deleted topic is " + args.get_topic_name())
def on_message_delivery_failure(sender: object, args: ncache.MessageFailedEventArgs):
# Perform operations
print("Failure reason is " + str(args.get_message_failure_reason()))
def on_message_received(sender: object, args: ncache.MessageEventArgs):
# Perform operations
print("Message received from topic " + args.get_topic_name())
추가 자료
NCache Pub/Sub용 샘플 애플리케이션 제공 GitHub의.
도 참조
.그물: Alachisoft.NCache.런타임.캐싱 네임 스페이스.
자바 : COM.alachisoft.ncache.런타임.캐싱 네임 스페이스.
Node.js : 주제 클래스입니다.
파이썬 : ncache.클라이언트.서비스 클래스입니다.