Modul ncache.runtime.caching.Topic

Erweitern Sie den Quellcode
import asyncio
from asyncio import Task
from collections import Callable

from ncache.util.JavaInstancesFactory import *
from ncache.client.enum.DeliveryMode import DeliveryMode
from ncache.client.enum.DeliveryOption import DeliveryOption
from ncache.client.enum.SubscriptionPolicy import SubscriptionPolicy
from ncache.client.enum.TopicPriority import TopicPriority
from ncache.client.enum.TopicSearchOptions import TopicSearchOptions
from ncache.runtime.caching.Message import Message
from ncache.runtime.caching.TopicSubscription import TopicSubscription
from ncache.runtime.caching.events.MessageReceivedListener import MessageReceivedListener
from ncache.runtime.caching.events.TopicListener import TopicListener
from ncache.runtime.caching.messaging.DurableTopicSubscription import DurableTopicSubscription
from ncache.runtime.util.EnumUtil import EnumUtil
from ncache.runtime.util.TimeSpan import TimeSpan
from ncache.util.EventsListenerHelper import EventsListenerHelper
from ncache.util.TypeCaster import TypeCaster
from ncache.util.ValidateType import ValidateType


class Topic:
    """
    The Topic interface facilitates creating subscription and publishing of messages against the topic. This also
    provides event registrations for message delivery failure, receiving messages and deleting topics.
    """
    def __init__(self, value):
        self.__topic = value

    def get_instance(self):
        return self.__topic

    def set_instance(self, value):
        self.__topic = value

    def add_message_delivery_failure_listener(self, messagefailedeventlistener):
        """
        This method registers for message delivery failure events on this topic.

        :param messagefailedeventlistener: The listener that is invoked whenever there is failure in message delivery.
            This function should follow this signature: message_failed_event_listener(sender: object, args: MessageFailedEventArgs)
        :type messagefailedeventlistener: Callable
        """
        ValidateType.params_check(messagefailedeventlistener, 2, self.add_message_delivery_failure_listener)

        eventlistener = EventsListenerHelper.get_listener(messagefailedeventlistener, TopicListener)
        self.__topic.addMessageDeliveryFailureListener(eventlistener)

    def add_topic_deleted_listener(self, topicdeletedlistener):
        """
        This method registers for topic deleted event.

        :param topicdeletedlistener: The listener function that is invoked whenever this topic is deleted. This function
            should follow this signature: message_failed_event_listener(sender: TopicListener, args: TopicDeleteEventArgs)
        :type topicdeletedlistener: Callable
        """

        ValidateType.params_check(topicdeletedlistener, 2, self.add_topic_deleted_listener)

        eventlistener = EventsListenerHelper.get_listener(topicdeletedlistener, TopicListener)
        self.__topic.addTopicDeletedListener(eventlistener)

    def create_durable_subscription(self, subscriptionname, subscriptionpolicy, messagereceivedlistener, expirationtime, deliverymode=None):
        """
        This method is used to create a durable subscription to this topic.

        :param subscriptionname: Name of the durable subscription.
        :type subscriptionname: str
        :param subscriptionpolicy: Policy that is subscription is Shared or Exclusive.
        :type subscriptionpolicy: SubscriptionPolicy
        :param messagereceivedlistener: Message is delivered to registered user through this listener function. This
            function should follow this signature: message_received_listener(sender: TopicListener, args: MessageEventArgs)
        :type messagereceivedlistener: Callable
        :param expirationtime: A timespan that specifies the expiration time of the subscription.
        :type expirationtime: TimeSpan
        :param deliverymode: Specifies whether to deliver messages to register subscribers synchronously or asynchronously.
        :type deliverymode: DeliveryMode
        :return: Instance of DurableTopicSubscription.
        :rtype: DurableTopicSubscription
        """
        ValidateType.is_string(subscriptionname, self.create_durable_subscription)
        ValidateType.type_check(subscriptionpolicy, SubscriptionPolicy, self.create_durable_subscription)
        ValidateType.params_check(messagereceivedlistener, 2, self.create_durable_subscription)
        ValidateType.type_check(expirationtime, TimeSpan, self.create_durable_subscription)

        javasubscriptionname = TypeCaster.to_java_primitive_type(subscriptionname)
        javasubscriptionpolicy = EnumUtil.get_subscription_policy(subscriptionpolicy.value)
        javaexpirationtime = expirationtime.get_instance()
        eventlistener = EventsListenerHelper.get_listener(messagereceivedlistener, MessageReceivedListener)

        if deliverymode is not None:
            ValidateType.type_check(deliverymode, DeliveryMode, self.create_durable_subscription)
            javadeliverymode = EnumUtil.get_delivery_mode(deliverymode.name)

            result = self.__topic.createDurableSubscription(javasubscriptionname, javasubscriptionpolicy, eventlistener, javaexpirationtime, javadeliverymode)

        else:
            result = self.__topic.createDurableSubscription(javasubscriptionname, javasubscriptionpolicy, eventlistener, javaexpirationtime)

        if result is not None:
            result = DurableTopicSubscription(result)
            return result

    def get_is_closed(self):
        """
        Specifies whether topic is closed or not.

        :return: True if topic is closed, otherwise False.
        :rtype: bool
        """
        result = self.__topic.getIsClosed()

        if result is not None:
            result = TypeCaster.to_python_primitive_type(result)

        return result

    def get_search_options(self):
        """
        Specifies whether user has subscribed to pattern based or simple subscription.

        :return: The topic search options for this topic.
        :rtype: TopicSearchOptions
        """
        result = self.__topic.getSearchOptions()

        if result is not None:
            result = EnumUtil.get_topic_search_options_value(result)

        return result

    def publish(self, message, deliveryoption, notifydeliveryfailure=None, sequencename=None, ):
        """
        This method is used to publish the message to the topic with the specified DeliveryOption and sequence name if
        provided. Order of messages with same sequence name is retained.

        :param message: Message to be published.
        :type message: Message
        :param deliveryoption: Specifies how message is delivered to registered subscribers.
        :type deliveryoption: DeliveryOption
        :param notifydeliveryfailure: Specifies whether MessageDeliveryFailure event is required for this message.
        :type notifydeliveryfailure: bool
        :param sequencename: Sequence name of the message to be published. The messages with same sequence name will be
            delivered in the same order as they are published.
        :type sequencename: str
        """
        ValidateType.type_check(message, Message, self.publish)
        ValidateType.type_check(deliveryoption, DeliveryOption, self.publish)

        javamessage = message.get_instance()
        javadeliveryoption = EnumUtil.get_delivery_option(deliveryoption.value)

        if notifydeliveryfailure is not None and sequencename is not None:
            ValidateType.type_check(notifydeliveryfailure, bool, self.publish)
            ValidateType.is_string(sequencename, self.publish)

            javanotifydeliveryfailure = TypeCaster.to_java_primitive_type(notifydeliveryfailure)
            javasequencename = TypeCaster.to_java_primitive_type(sequencename)

            self.__topic.publish(javamessage, javadeliveryoption, javasequencename, javanotifydeliveryfailure)
            return

        elif notifydeliveryfailure is not None and sequencename is None:
            ValidateType.type_check(notifydeliveryfailure, bool, self.publish)
            javanotifydeliveryfailure = TypeCaster.to_java_primitive_type(notifydeliveryfailure)

            self.__topic.publish(javamessage, javadeliveryoption, javanotifydeliveryfailure)
            return

        elif notifydeliveryfailure is None and sequencename is not None:
            ValidateType.is_string(sequencename, self.publish)
            javasequencename = TypeCaster.to_java_primitive_type(sequencename)

            self.__topic.publish(javamessage, javadeliveryoption, javasequencename)
            return

        elif notifydeliveryfailure is None and sequencename is None:
            self.__topic.publish(javamessage, javadeliveryoption)
            return

    async def publish_async(self, message, deliveryoption, notifydeliveryfailure=None):
        """
        This method is used to Publish a message asynchronously to the topic with specified delivery option.

        :param message: Message to be published.
        :type message: Message
        :param deliveryoption: Specifies how message is delivered to registered subscribers.
        :type deliveryoption: DeliveryOption
        :param notifydeliveryfailure: Specifies whether MessageDeliveryFailure event is required for this message.
        :type notifydeliveryfailure: bool
        :return: Task that performs a publish operation in the background.
        :rtype: Task
        """
        ValidateType.type_check(message, Message, self.publish_async)
        ValidateType.type_check(deliveryoption, DeliveryOption, self.publish_async)

        if notifydeliveryfailure is not None:
            ValidateType.type_check(notifydeliveryfailure, bool, self.publish_async)

            return asyncio.create_task(self.__return_coroutine(self.publish, message, deliveryoption, notifydeliveryfailure))
        return asyncio.create_task(self.__return_coroutine(self.publish, message, deliveryoption))

    def publish_bulk(self, messages, notifydeliveryfailure=None):
        """
        This method is used to Publish messages to the topic with specified DeliveryOption.

        :param messages: Collection of message-deliveryoption pairs in form of a dict.
        :type messages: dict
        :param notifydeliveryfailure: Specifies whether MessageDeliveryFailure event is required for this message.
        :type notifydeliveryfailure: bool
        :return: Dict that contains message along with the exception that occurred while message publishing.
        :rtype: dict
        """
        ValidateType.type_check(messages, dict, self.publish_bulk)
        for message in messages:
            ValidateType.type_check(message, Message, self.publish_bulk)
            ValidateType.type_check(messages[message], DeliveryOption, self.publish_bulk)

        javamessages = self.__publish_bulk_messages_to_hashmap(messages)

        if notifydeliveryfailure is not None:
            ValidateType.type_check(notifydeliveryfailure, bool, self.publish_bulk)
            javanotifydeliveryfailure = TypeCaster.to_java_primitive_type(notifydeliveryfailure)

            result = self.__topic.publishBulk(javamessages, javanotifydeliveryfailure)

        else:
            result = self.__topic.publishBulk(javamessages)

        if result is not None:
            result = self.__publish_bulk_result_to_dict(result)

        return result

    def remove_message_delivery_failure_listener(self):
        """
        This method unregisters for message delivery failure callbacks on this topic.
        """
        self.__topic.removeMessageDeliveryFailureListener()

    def remove_topic_deleted_listener(self):
        """
        This method unregisters for topic deleted event.
        """
        self.__topic.removeTopicDeletedListener()

    def get_name(self):
        """
        Gets the name of the Topic.

        :return: The name of the Topic.
        :rtype: str
        """
        result = self.__topic.getName()

        if result is not None:
            result = TypeCaster.to_python_primitive_type(result)

        return result

    def get_message_count(self):
        """
        Gets the number of messages published for this topic.

        :return: The number of messages published for this topic.
        :rtype: int
        """

        result = self.__topic.getMessageCount()

        if result is not None:
            result = TypeCaster.to_python_primitive_type(result)

        return result

    def get_priority(self):
        """
        The relative priority of the topic.

        :return: The priority of the topic.
        :rtype: TopicPriority
        """
        result = self.__topic.getPriority()

        if result is not None:
            result = EnumUtil.get_topic_priority_value(result)

        return result

    def get_expiration_time(self):
        """
        Gets the expiry time of message for this topic. Its default value is TimeSpan.MaxValue.

        :return: The expiry time of message for this topic.
        :rtype: TimeSpan
        """
        result = self.__topic.getExpirationTime()

        if result is not None:
            timespan = TimeSpan(123)
            timespan.set_instance(result)

            return timespan

    def set_expiration_time(self, timespan):
        """
        Sets the expiry time of message for this topic.

        :param timespan: The expiry time of message for this topic.
        :type timespan: TimeSpan
        """
        ValidateType.type_check(timespan, TimeSpan, self.set_expiration_time)
        javatimespan = timespan.get_instance()

        self.__topic.setExpirationTime(javatimespan)

    def create_subscription(self, messagereceivedeventlistener, deliverymode=None):
        """
        This method is used to subscribe against topic on cache if topic exists.

        :param messagereceivedeventlistener: The callable listener function that is invoked whenever a message is
            published against the topic. This function should follow this signature:
            message_received_listener(sender: object, args: MessageEventArgs)
        :type messagereceivedeventlistener: Callable
        :param deliverymode: Specifies whether to deliver messages to register subscribers synchronously or asynchronously.
        :type deliverymode: DeliveryMode
        :return: The created topic subscription.
        :rtype: TopicSubscription
        """
        ValidateType.params_check(messagereceivedeventlistener, 2, self.create_subscription)
        eventlistener = EventsListenerHelper.get_listener(messagereceivedeventlistener, MessageReceivedListener)

        if deliverymode is not None:
            ValidateType.type_check(deliverymode, DeliveryMode, self.create_subscription)
            javadeliverymode = EnumUtil.get_delivery_mode(deliverymode.name)

            result = self.__topic.createSubscription(eventlistener, javadeliverymode)

        else:
            result = self.__topic.createSubscription(eventlistener)

        if result is not None:
            subscription = TopicSubscription(result)
            return subscription

    def close(self):
        """
        Closes this resource, relinquishing any underlying resources.
        """
        self.__topic.close()

    @staticmethod
    async def __return_coroutine(function, *args):
        if len(args) == 2:
            return function(args[0], args[1])
        else:
            return function(args[0], args[1], args[2])
        # For the time being, we have only 2 or 3 possible arguments. This function has to be made generic if needed in future.

    @staticmethod
    def __publish_bulk_messages_to_hashmap(messagesdict):
        javahashmap = jp.java.util.HashMap()

        for item in messagesdict:
            javamessage = item.get_instance()
            javadeliveryoption = EnumUtil.get_delivery_option(messagesdict[item].value)

            javahashmap.put(javamessage, javadeliveryoption)

        return javahashmap

    @staticmethod
    def __publish_bulk_result_to_dict(javabulkmap):
        pythondict = {}

        for item in javabulkmap:
            key = TypeCaster.to_python_primitive_type(item)
            pythondict[key] = javabulkmap[item]
        return pythondict

Klassen

class Topic (value)

Die Topic-Schnittstelle erleichtert das Erstellen von Abonnements und das Veröffentlichen von Nachrichten zum Thema. Dies ermöglicht auch Ereignisregistrierungen für fehlgeschlagene Nachrichtenübermittlungen, den Empfang von Nachrichten und das Löschen von Themen.

Erweitern Sie den Quellcode
class Topic:
    """
    The Topic interface facilitates creating subscription and publishing of messages against the topic. This also
    provides event registrations for message delivery failure, receiving messages and deleting topics.
    """
    def __init__(self, value):
        self.__topic = value

    def get_instance(self):
        return self.__topic

    def set_instance(self, value):
        self.__topic = value

    def add_message_delivery_failure_listener(self, messagefailedeventlistener):
        """
        This method registers for message delivery failure events on this topic.

        :param messagefailedeventlistener: The listener that is invoked whenever there is failure in message delivery.
            This function should follow this signature: message_failed_event_listener(sender: object, args: MessageFailedEventArgs)
        :type messagefailedeventlistener: Callable
        """
        ValidateType.params_check(messagefailedeventlistener, 2, self.add_message_delivery_failure_listener)

        eventlistener = EventsListenerHelper.get_listener(messagefailedeventlistener, TopicListener)
        self.__topic.addMessageDeliveryFailureListener(eventlistener)

    def add_topic_deleted_listener(self, topicdeletedlistener):
        """
        This method registers for topic deleted event.

        :param topicdeletedlistener: The listener function that is invoked whenever this topic is deleted. This function
            should follow this signature: message_failed_event_listener(sender: TopicListener, args: TopicDeleteEventArgs)
        :type topicdeletedlistener: Callable
        """

        ValidateType.params_check(topicdeletedlistener, 2, self.add_topic_deleted_listener)

        eventlistener = EventsListenerHelper.get_listener(topicdeletedlistener, TopicListener)
        self.__topic.addTopicDeletedListener(eventlistener)

    def create_durable_subscription(self, subscriptionname, subscriptionpolicy, messagereceivedlistener, expirationtime, deliverymode=None):
        """
        This method is used to create a durable subscription to this topic.

        :param subscriptionname: Name of the durable subscription.
        :type subscriptionname: str
        :param subscriptionpolicy: Policy that is subscription is Shared or Exclusive.
        :type subscriptionpolicy: SubscriptionPolicy
        :param messagereceivedlistener: Message is delivered to registered user through this listener function. This
            function should follow this signature: message_received_listener(sender: TopicListener, args: MessageEventArgs)
        :type messagereceivedlistener: Callable
        :param expirationtime: A timespan that specifies the expiration time of the subscription.
        :type expirationtime: TimeSpan
        :param deliverymode: Specifies whether to deliver messages to register subscribers synchronously or asynchronously.
        :type deliverymode: DeliveryMode
        :return: Instance of DurableTopicSubscription.
        :rtype: DurableTopicSubscription
        """
        ValidateType.is_string(subscriptionname, self.create_durable_subscription)
        ValidateType.type_check(subscriptionpolicy, SubscriptionPolicy, self.create_durable_subscription)
        ValidateType.params_check(messagereceivedlistener, 2, self.create_durable_subscription)
        ValidateType.type_check(expirationtime, TimeSpan, self.create_durable_subscription)

        javasubscriptionname = TypeCaster.to_java_primitive_type(subscriptionname)
        javasubscriptionpolicy = EnumUtil.get_subscription_policy(subscriptionpolicy.value)
        javaexpirationtime = expirationtime.get_instance()
        eventlistener = EventsListenerHelper.get_listener(messagereceivedlistener, MessageReceivedListener)

        if deliverymode is not None:
            ValidateType.type_check(deliverymode, DeliveryMode, self.create_durable_subscription)
            javadeliverymode = EnumUtil.get_delivery_mode(deliverymode.name)

            result = self.__topic.createDurableSubscription(javasubscriptionname, javasubscriptionpolicy, eventlistener, javaexpirationtime, javadeliverymode)

        else:
            result = self.__topic.createDurableSubscription(javasubscriptionname, javasubscriptionpolicy, eventlistener, javaexpirationtime)

        if result is not None:
            result = DurableTopicSubscription(result)
            return result

    def get_is_closed(self):
        """
        Specifies whether topic is closed or not.

        :return: True if topic is closed, otherwise False.
        :rtype: bool
        """
        result = self.__topic.getIsClosed()

        if result is not None:
            result = TypeCaster.to_python_primitive_type(result)

        return result

    def get_search_options(self):
        """
        Specifies whether user has subscribed to pattern based or simple subscription.

        :return: The topic search options for this topic.
        :rtype: TopicSearchOptions
        """
        result = self.__topic.getSearchOptions()

        if result is not None:
            result = EnumUtil.get_topic_search_options_value(result)

        return result

    def publish(self, message, deliveryoption, notifydeliveryfailure=None, sequencename=None, ):
        """
        This method is used to publish the message to the topic with the specified DeliveryOption and sequence name if
        provided. Order of messages with same sequence name is retained.

        :param message: Message to be published.
        :type message: Message
        :param deliveryoption: Specifies how message is delivered to registered subscribers.
        :type deliveryoption: DeliveryOption
        :param notifydeliveryfailure: Specifies whether MessageDeliveryFailure event is required for this message.
        :type notifydeliveryfailure: bool
        :param sequencename: Sequence name of the message to be published. The messages with same sequence name will be
            delivered in the same order as they are published.
        :type sequencename: str
        """
        ValidateType.type_check(message, Message, self.publish)
        ValidateType.type_check(deliveryoption, DeliveryOption, self.publish)

        javamessage = message.get_instance()
        javadeliveryoption = EnumUtil.get_delivery_option(deliveryoption.value)

        if notifydeliveryfailure is not None and sequencename is not None:
            ValidateType.type_check(notifydeliveryfailure, bool, self.publish)
            ValidateType.is_string(sequencename, self.publish)

            javanotifydeliveryfailure = TypeCaster.to_java_primitive_type(notifydeliveryfailure)
            javasequencename = TypeCaster.to_java_primitive_type(sequencename)

            self.__topic.publish(javamessage, javadeliveryoption, javasequencename, javanotifydeliveryfailure)
            return

        elif notifydeliveryfailure is not None and sequencename is None:
            ValidateType.type_check(notifydeliveryfailure, bool, self.publish)
            javanotifydeliveryfailure = TypeCaster.to_java_primitive_type(notifydeliveryfailure)

            self.__topic.publish(javamessage, javadeliveryoption, javanotifydeliveryfailure)
            return

        elif notifydeliveryfailure is None and sequencename is not None:
            ValidateType.is_string(sequencename, self.publish)
            javasequencename = TypeCaster.to_java_primitive_type(sequencename)

            self.__topic.publish(javamessage, javadeliveryoption, javasequencename)
            return

        elif notifydeliveryfailure is None and sequencename is None:
            self.__topic.publish(javamessage, javadeliveryoption)
            return

    async def publish_async(self, message, deliveryoption, notifydeliveryfailure=None):
        """
        This method is used to Publish a message asynchronously to the topic with specified delivery option.

        :param message: Message to be published.
        :type message: Message
        :param deliveryoption: Specifies how message is delivered to registered subscribers.
        :type deliveryoption: DeliveryOption
        :param notifydeliveryfailure: Specifies whether MessageDeliveryFailure event is required for this message.
        :type notifydeliveryfailure: bool
        :return: Task that performs a publish operation in the background.
        :rtype: Task
        """
        ValidateType.type_check(message, Message, self.publish_async)
        ValidateType.type_check(deliveryoption, DeliveryOption, self.publish_async)

        if notifydeliveryfailure is not None:
            ValidateType.type_check(notifydeliveryfailure, bool, self.publish_async)

            return asyncio.create_task(self.__return_coroutine(self.publish, message, deliveryoption, notifydeliveryfailure))
        return asyncio.create_task(self.__return_coroutine(self.publish, message, deliveryoption))

    def publish_bulk(self, messages, notifydeliveryfailure=None):
        """
        This method is used to Publish messages to the topic with specified DeliveryOption.

        :param messages: Collection of message-deliveryoption pairs in form of a dict.
        :type messages: dict
        :param notifydeliveryfailure: Specifies whether MessageDeliveryFailure event is required for this message.
        :type notifydeliveryfailure: bool
        :return: Dict that contains message along with the exception that occurred while message publishing.
        :rtype: dict
        """
        ValidateType.type_check(messages, dict, self.publish_bulk)
        for message in messages:
            ValidateType.type_check(message, Message, self.publish_bulk)
            ValidateType.type_check(messages[message], DeliveryOption, self.publish_bulk)

        javamessages = self.__publish_bulk_messages_to_hashmap(messages)

        if notifydeliveryfailure is not None:
            ValidateType.type_check(notifydeliveryfailure, bool, self.publish_bulk)
            javanotifydeliveryfailure = TypeCaster.to_java_primitive_type(notifydeliveryfailure)

            result = self.__topic.publishBulk(javamessages, javanotifydeliveryfailure)

        else:
            result = self.__topic.publishBulk(javamessages)

        if result is not None:
            result = self.__publish_bulk_result_to_dict(result)

        return result

    def remove_message_delivery_failure_listener(self):
        """
        This method unregisters for message delivery failure callbacks on this topic.
        """
        self.__topic.removeMessageDeliveryFailureListener()

    def remove_topic_deleted_listener(self):
        """
        This method unregisters for topic deleted event.
        """
        self.__topic.removeTopicDeletedListener()

    def get_name(self):
        """
        Gets the name of the Topic.

        :return: The name of the Topic.
        :rtype: str
        """
        result = self.__topic.getName()

        if result is not None:
            result = TypeCaster.to_python_primitive_type(result)

        return result

    def get_message_count(self):
        """
        Gets the number of messages published for this topic.

        :return: The number of messages published for this topic.
        :rtype: int
        """

        result = self.__topic.getMessageCount()

        if result is not None:
            result = TypeCaster.to_python_primitive_type(result)

        return result

    def get_priority(self):
        """
        The relative priority of the topic.

        :return: The priority of the topic.
        :rtype: TopicPriority
        """
        result = self.__topic.getPriority()

        if result is not None:
            result = EnumUtil.get_topic_priority_value(result)

        return result

    def get_expiration_time(self):
        """
        Gets the expiry time of message for this topic. Its default value is TimeSpan.MaxValue.

        :return: The expiry time of message for this topic.
        :rtype: TimeSpan
        """
        result = self.__topic.getExpirationTime()

        if result is not None:
            timespan = TimeSpan(123)
            timespan.set_instance(result)

            return timespan

    def set_expiration_time(self, timespan):
        """
        Sets the expiry time of message for this topic.

        :param timespan: The expiry time of message for this topic.
        :type timespan: TimeSpan
        """
        ValidateType.type_check(timespan, TimeSpan, self.set_expiration_time)
        javatimespan = timespan.get_instance()

        self.__topic.setExpirationTime(javatimespan)

    def create_subscription(self, messagereceivedeventlistener, deliverymode=None):
        """
        This method is used to subscribe against topic on cache if topic exists.

        :param messagereceivedeventlistener: The callable listener function that is invoked whenever a message is
            published against the topic. This function should follow this signature:
            message_received_listener(sender: object, args: MessageEventArgs)
        :type messagereceivedeventlistener: Callable
        :param deliverymode: Specifies whether to deliver messages to register subscribers synchronously or asynchronously.
        :type deliverymode: DeliveryMode
        :return: The created topic subscription.
        :rtype: TopicSubscription
        """
        ValidateType.params_check(messagereceivedeventlistener, 2, self.create_subscription)
        eventlistener = EventsListenerHelper.get_listener(messagereceivedeventlistener, MessageReceivedListener)

        if deliverymode is not None:
            ValidateType.type_check(deliverymode, DeliveryMode, self.create_subscription)
            javadeliverymode = EnumUtil.get_delivery_mode(deliverymode.name)

            result = self.__topic.createSubscription(eventlistener, javadeliverymode)

        else:
            result = self.__topic.createSubscription(eventlistener)

        if result is not None:
            subscription = TopicSubscription(result)
            return subscription

    def close(self):
        """
        Closes this resource, relinquishing any underlying resources.
        """
        self.__topic.close()

    @staticmethod
    async def __return_coroutine(function, *args):
        if len(args) == 2:
            return function(args[0], args[1])
        else:
            return function(args[0], args[1], args[2])
        # For the time being, we have only 2 or 3 possible arguments. This function has to be made generic if needed in future.

    @staticmethod
    def __publish_bulk_messages_to_hashmap(messagesdict):
        javahashmap = jp.java.util.HashMap()

        for item in messagesdict:
            javamessage = item.get_instance()
            javadeliveryoption = EnumUtil.get_delivery_option(messagesdict[item].value)

            javahashmap.put(javamessage, javadeliveryoption)

        return javahashmap

    @staticmethod
    def __publish_bulk_result_to_dict(javabulkmap):
        pythondict = {}

        for item in javabulkmap:
            key = TypeCaster.to_python_primitive_type(item)
            pythondict[key] = javabulkmap[item]
        return pythondict

Methoden

def add_message_delivery_failure_listener(self, messagefailedeventlistener)

Diese Methode registriert Nachrichtenübermittlungsfehlerereignisse zu diesem Thema.

:param messagefailedeventlistener: Der Listener, der immer dann aufgerufen wird, wenn bei der Nachrichtenzustellung ein Fehler auftritt. Diese Funktion sollte dieser Signatur folgen: message_failed_event_listener(sender: object, args: MessageFailedEventArgs) :type messagefailedeventlistener: Callable

Erweitern Sie den Quellcode
def add_message_delivery_failure_listener(self, messagefailedeventlistener):
    """
    This method registers for message delivery failure events on this topic.

    :param messagefailedeventlistener: The listener that is invoked whenever there is failure in message delivery.
        This function should follow this signature: message_failed_event_listener(sender: object, args: MessageFailedEventArgs)
    :type messagefailedeventlistener: Callable
    """
    ValidateType.params_check(messagefailedeventlistener, 2, self.add_message_delivery_failure_listener)

    eventlistener = EventsListenerHelper.get_listener(messagefailedeventlistener, TopicListener)
    self.__topic.addMessageDeliveryFailureListener(eventlistener)
def add_topic_deleted_listener(self, topicdeletedlistener)

Diese Methode registriert sich für das gelöschte Ereignis des Themas.

:param topicdeletedlistener: Die Listener-Funktion, die immer dann aufgerufen wird, wenn dieses Thema gelöscht wird. Diese Funktion sollte dieser Signatur folgen: message_failed_event_listener(sender: TopicListener, args: TopicDeleteEventArgs) :type topicdeletedlistener: Callable

Erweitern Sie den Quellcode
def add_topic_deleted_listener(self, topicdeletedlistener):
    """
    This method registers for topic deleted event.

    :param topicdeletedlistener: The listener function that is invoked whenever this topic is deleted. This function
        should follow this signature: message_failed_event_listener(sender: TopicListener, args: TopicDeleteEventArgs)
    :type topicdeletedlistener: Callable
    """

    ValidateType.params_check(topicdeletedlistener, 2, self.add_topic_deleted_listener)

    eventlistener = EventsListenerHelper.get_listener(topicdeletedlistener, TopicListener)
    self.__topic.addTopicDeletedListener(eventlistener)
def close(self)

Schließt diese Ressource und gibt alle zugrunde liegenden Ressourcen frei.

Erweitern Sie den Quellcode
def close(self):
    """
    Closes this resource, relinquishing any underlying resources.
    """
    self.__topic.close()
def create_durable_subscription(self, subscriptionname, subscriptionpolicy, messagereceivedlistener, expirationtime, deliverymode=None)

Diese Methode wird verwendet, um ein dauerhaftes Abonnement für dieses Thema zu erstellen.

:param subscriptionname: Name des dauerhaften Abonnements. :type subscriptionname: str :param subscriptionpolicy: Die Richtlinie, bei der es sich um ein Abonnement handelt, ist „Gemeinsam“ oder „Exklusiv“. :type subscriptionpolicy: SubscriptionPolicy :param messagereceivedlistener: Die Nachricht wird über diese Listener-Funktion an den registrierten Benutzer übermittelt. Diese Funktion sollte dieser Signatur folgen: message_received_listener(sender: TopicListener, args: MessageEventArgs) :type messagereceivedlistener: Callable :param expirationtime: Eine Zeitspanne, die die Ablaufzeit des Abonnements angibt. :type expirationtime: TimeSpan :param Deliverymode: Gibt an, ob Nachrichten synchron oder asynchron an registrierte Abonnenten übermittelt werden sollen. :type Deliverymode: DeliveryMode :return: Instanz von DurableTopicSubscription. :rtype: DurableTopicSubscription

Erweitern Sie den Quellcode
def create_durable_subscription(self, subscriptionname, subscriptionpolicy, messagereceivedlistener, expirationtime, deliverymode=None):
    """
    This method is used to create a durable subscription to this topic.

    :param subscriptionname: Name of the durable subscription.
    :type subscriptionname: str
    :param subscriptionpolicy: Policy that is subscription is Shared or Exclusive.
    :type subscriptionpolicy: SubscriptionPolicy
    :param messagereceivedlistener: Message is delivered to registered user through this listener function. This
        function should follow this signature: message_received_listener(sender: TopicListener, args: MessageEventArgs)
    :type messagereceivedlistener: Callable
    :param expirationtime: A timespan that specifies the expiration time of the subscription.
    :type expirationtime: TimeSpan
    :param deliverymode: Specifies whether to deliver messages to register subscribers synchronously or asynchronously.
    :type deliverymode: DeliveryMode
    :return: Instance of DurableTopicSubscription.
    :rtype: DurableTopicSubscription
    """
    ValidateType.is_string(subscriptionname, self.create_durable_subscription)
    ValidateType.type_check(subscriptionpolicy, SubscriptionPolicy, self.create_durable_subscription)
    ValidateType.params_check(messagereceivedlistener, 2, self.create_durable_subscription)
    ValidateType.type_check(expirationtime, TimeSpan, self.create_durable_subscription)

    javasubscriptionname = TypeCaster.to_java_primitive_type(subscriptionname)
    javasubscriptionpolicy = EnumUtil.get_subscription_policy(subscriptionpolicy.value)
    javaexpirationtime = expirationtime.get_instance()
    eventlistener = EventsListenerHelper.get_listener(messagereceivedlistener, MessageReceivedListener)

    if deliverymode is not None:
        ValidateType.type_check(deliverymode, DeliveryMode, self.create_durable_subscription)
        javadeliverymode = EnumUtil.get_delivery_mode(deliverymode.name)

        result = self.__topic.createDurableSubscription(javasubscriptionname, javasubscriptionpolicy, eventlistener, javaexpirationtime, javadeliverymode)

    else:
        result = self.__topic.createDurableSubscription(javasubscriptionname, javasubscriptionpolicy, eventlistener, javaexpirationtime)

    if result is not None:
        result = DurableTopicSubscription(result)
        return result
def create_subscription(self, messagereceivedeventlistener, deliverymode=None)

Diese Methode wird verwendet, um ein Thema im Cache zu abonnieren, wenn ein Thema vorhanden ist.

:param messagereceivedeventlistener: Die aufrufbare Listener-Funktion, die immer dann aufgerufen wird, wenn eine Nachricht für das Thema veröffentlicht wird. Diese Funktion sollte dieser Signatur folgen: message_received_listener(sender: object, args: MessageEventArgs) :type messagereceivedeventlistener: Callable :param Deliverymode: Gibt an, ob Nachrichten synchron oder asynchron an registrierte Abonnenten übermittelt werden sollen. :type Deliverymode: DeliveryMode :return: Das erstellte Themenabonnement. :rtype: TopicSubscription

Erweitern Sie den Quellcode
def create_subscription(self, messagereceivedeventlistener, deliverymode=None):
    """
    This method is used to subscribe against topic on cache if topic exists.

    :param messagereceivedeventlistener: The callable listener function that is invoked whenever a message is
        published against the topic. This function should follow this signature:
        message_received_listener(sender: object, args: MessageEventArgs)
    :type messagereceivedeventlistener: Callable
    :param deliverymode: Specifies whether to deliver messages to register subscribers synchronously or asynchronously.
    :type deliverymode: DeliveryMode
    :return: The created topic subscription.
    :rtype: TopicSubscription
    """
    ValidateType.params_check(messagereceivedeventlistener, 2, self.create_subscription)
    eventlistener = EventsListenerHelper.get_listener(messagereceivedeventlistener, MessageReceivedListener)

    if deliverymode is not None:
        ValidateType.type_check(deliverymode, DeliveryMode, self.create_subscription)
        javadeliverymode = EnumUtil.get_delivery_mode(deliverymode.name)

        result = self.__topic.createSubscription(eventlistener, javadeliverymode)

    else:
        result = self.__topic.createSubscription(eventlistener)

    if result is not None:
        subscription = TopicSubscription(result)
        return subscription
def get_expiration_time(self)

Ruft die Ablaufzeit der Nachricht für dieses Thema ab. Der Standardwert ist TimeSpan.MaxValue.

:return: Die Ablaufzeit der Nachricht für dieses Thema. :rtype: TimeSpan

Erweitern Sie den Quellcode
def get_expiration_time(self):
    """
    Gets the expiry time of message for this topic. Its default value is TimeSpan.MaxValue.

    :return: The expiry time of message for this topic.
    :rtype: TimeSpan
    """
    result = self.__topic.getExpirationTime()

    if result is not None:
        timespan = TimeSpan(123)
        timespan.set_instance(result)

        return timespan
def get_instance(self)
Erweitern Sie den Quellcode
def get_instance(self):
    return self.__topic
def get_is_closed(self)

Gibt an, ob das Thema geschlossen ist oder nicht.

:return: True, wenn das Thema geschlossen ist, andernfalls False. :rtype: bool

Erweitern Sie den Quellcode
def get_is_closed(self):
    """
    Specifies whether topic is closed or not.

    :return: True if topic is closed, otherwise False.
    :rtype: bool
    """
    result = self.__topic.getIsClosed()

    if result is not None:
        result = TypeCaster.to_python_primitive_type(result)

    return result
def get_message_count(self)

Ruft die Anzahl der für dieses Thema veröffentlichten Nachrichten ab.

:return: Die Anzahl der für dieses Thema veröffentlichten Nachrichten. :rtype: int

Erweitern Sie den Quellcode
def get_message_count(self):
    """
    Gets the number of messages published for this topic.

    :return: The number of messages published for this topic.
    :rtype: int
    """

    result = self.__topic.getMessageCount()

    if result is not None:
        result = TypeCaster.to_python_primitive_type(result)

    return result
def get_name(self)

Ruft den Namen des Themas ab.

:return: Der Name des Themas. :rtype: str

Erweitern Sie den Quellcode
def get_name(self):
    """
    Gets the name of the Topic.

    :return: The name of the Topic.
    :rtype: str
    """
    result = self.__topic.getName()

    if result is not None:
        result = TypeCaster.to_python_primitive_type(result)

    return result
def get_priority(self)

Die relative Priorität des Themas.

:return: Die Priorität des Themas. :rtype: TopicPriority

Erweitern Sie den Quellcode
def get_priority(self):
    """
    The relative priority of the topic.

    :return: The priority of the topic.
    :rtype: TopicPriority
    """
    result = self.__topic.getPriority()

    if result is not None:
        result = EnumUtil.get_topic_priority_value(result)

    return result
def get_search_options(self)

Gibt an, ob der Benutzer ein musterbasiertes oder ein einfaches Abonnement abonniert hat.

:return: Die Themensuchoptionen für dieses Thema. :rtype: TopicSearchOptions

Erweitern Sie den Quellcode
def get_search_options(self):
    """
    Specifies whether user has subscribed to pattern based or simple subscription.

    :return: The topic search options for this topic.
    :rtype: TopicSearchOptions
    """
    result = self.__topic.getSearchOptions()

    if result is not None:
        result = EnumUtil.get_topic_search_options_value(result)

    return result
def publish(self, message, deliveryoption, notifydeliveryfailure=None, sequencename=None)

Diese Methode wird verwendet, um die Nachricht im Thema mit der angegebenen DeliveryOption und dem Sequenznamen (sofern angegeben) zu veröffentlichen. Die Reihenfolge der Nachrichten mit demselben Sequenznamen bleibt erhalten.

:param message: Zu veröffentlichende Nachricht. :type message: Nachricht :param Deliveryoption: Gibt an, wie die Nachricht an registrierte Abonnenten übermittelt wird. :type Deliveryoption: DeliveryOption :param notifydeliveryfailure: Gibt an, ob das MessageDeliveryFailure-Ereignis für diese Nachricht erforderlich ist. :type notifydeliveryfailure: bool :param sequencename: Sequenzname der zu veröffentlichenden Nachricht. Die Nachrichten mit demselben Sequenznamen werden in derselben Reihenfolge zugestellt, in der sie veröffentlicht werden. :typ Sequenzname: str

Erweitern Sie den Quellcode
def publish(self, message, deliveryoption, notifydeliveryfailure=None, sequencename=None, ):
    """
    This method is used to publish the message to the topic with the specified DeliveryOption and sequence name if
    provided. Order of messages with same sequence name is retained.

    :param message: Message to be published.
    :type message: Message
    :param deliveryoption: Specifies how message is delivered to registered subscribers.
    :type deliveryoption: DeliveryOption
    :param notifydeliveryfailure: Specifies whether MessageDeliveryFailure event is required for this message.
    :type notifydeliveryfailure: bool
    :param sequencename: Sequence name of the message to be published. The messages with same sequence name will be
        delivered in the same order as they are published.
    :type sequencename: str
    """
    ValidateType.type_check(message, Message, self.publish)
    ValidateType.type_check(deliveryoption, DeliveryOption, self.publish)

    javamessage = message.get_instance()
    javadeliveryoption = EnumUtil.get_delivery_option(deliveryoption.value)

    if notifydeliveryfailure is not None and sequencename is not None:
        ValidateType.type_check(notifydeliveryfailure, bool, self.publish)
        ValidateType.is_string(sequencename, self.publish)

        javanotifydeliveryfailure = TypeCaster.to_java_primitive_type(notifydeliveryfailure)
        javasequencename = TypeCaster.to_java_primitive_type(sequencename)

        self.__topic.publish(javamessage, javadeliveryoption, javasequencename, javanotifydeliveryfailure)
        return

    elif notifydeliveryfailure is not None and sequencename is None:
        ValidateType.type_check(notifydeliveryfailure, bool, self.publish)
        javanotifydeliveryfailure = TypeCaster.to_java_primitive_type(notifydeliveryfailure)

        self.__topic.publish(javamessage, javadeliveryoption, javanotifydeliveryfailure)
        return

    elif notifydeliveryfailure is None and sequencename is not None:
        ValidateType.is_string(sequencename, self.publish)
        javasequencename = TypeCaster.to_java_primitive_type(sequencename)

        self.__topic.publish(javamessage, javadeliveryoption, javasequencename)
        return

    elif notifydeliveryfailure is None and sequencename is None:
        self.__topic.publish(javamessage, javadeliveryoption)
        return
async def publish_async(self, message, deliveryoption, notifydeliveryfailure=None)

Diese Methode wird verwendet, um eine Nachricht asynchron zum Thema mit der angegebenen Übermittlungsoption zu veröffentlichen.

:param message: Zu veröffentlichende Nachricht. :type message: Nachricht :param Deliveryoption: Gibt an, wie die Nachricht an registrierte Abonnenten übermittelt wird. :type Deliveryoption: DeliveryOption :param notifydeliveryfailure: Gibt an, ob das MessageDeliveryFailure-Ereignis für diese Nachricht erforderlich ist. :type notifydeliveryfailure: bool :return: Aufgabe, die einen Veröffentlichungsvorgang im Hintergrund ausführt. :rtype: Aufgabe

Erweitern Sie den Quellcode
async def publish_async(self, message, deliveryoption, notifydeliveryfailure=None):
    """
    This method is used to Publish a message asynchronously to the topic with specified delivery option.

    :param message: Message to be published.
    :type message: Message
    :param deliveryoption: Specifies how message is delivered to registered subscribers.
    :type deliveryoption: DeliveryOption
    :param notifydeliveryfailure: Specifies whether MessageDeliveryFailure event is required for this message.
    :type notifydeliveryfailure: bool
    :return: Task that performs a publish operation in the background.
    :rtype: Task
    """
    ValidateType.type_check(message, Message, self.publish_async)
    ValidateType.type_check(deliveryoption, DeliveryOption, self.publish_async)

    if notifydeliveryfailure is not None:
        ValidateType.type_check(notifydeliveryfailure, bool, self.publish_async)

        return asyncio.create_task(self.__return_coroutine(self.publish, message, deliveryoption, notifydeliveryfailure))
    return asyncio.create_task(self.__return_coroutine(self.publish, message, deliveryoption))
def publish_bulk(self, messages, notifydeliveryfailure=None)

Diese Methode wird zum Veröffentlichen von Nachrichten im Thema mit der angegebenen DeliveryOption verwendet.

:param-Nachrichten: Sammlung von Nachrichten-Zustellungsoptionspaaren in Form eines Diktats. :type messages: dict :param notifydeliveryfailure: Gibt an, ob das MessageDeliveryFailure-Ereignis für diese Nachricht erforderlich ist. :type notifydeliveryfailure: bool :return: Dict, das die Nachricht zusammen mit der Ausnahme enthält, die beim Veröffentlichen der Nachricht aufgetreten ist. :rtype: dict

Erweitern Sie den Quellcode
def publish_bulk(self, messages, notifydeliveryfailure=None):
    """
    This method is used to Publish messages to the topic with specified DeliveryOption.

    :param messages: Collection of message-deliveryoption pairs in form of a dict.
    :type messages: dict
    :param notifydeliveryfailure: Specifies whether MessageDeliveryFailure event is required for this message.
    :type notifydeliveryfailure: bool
    :return: Dict that contains message along with the exception that occurred while message publishing.
    :rtype: dict
    """
    ValidateType.type_check(messages, dict, self.publish_bulk)
    for message in messages:
        ValidateType.type_check(message, Message, self.publish_bulk)
        ValidateType.type_check(messages[message], DeliveryOption, self.publish_bulk)

    javamessages = self.__publish_bulk_messages_to_hashmap(messages)

    if notifydeliveryfailure is not None:
        ValidateType.type_check(notifydeliveryfailure, bool, self.publish_bulk)
        javanotifydeliveryfailure = TypeCaster.to_java_primitive_type(notifydeliveryfailure)

        result = self.__topic.publishBulk(javamessages, javanotifydeliveryfailure)

    else:
        result = self.__topic.publishBulk(javamessages)

    if result is not None:
        result = self.__publish_bulk_result_to_dict(result)

    return result
def remove_message_delivery_failure_listener(self)

Diese Methode hebt die Registrierung für Rückrufe wegen fehlgeschlagener Nachrichtenübermittlung zu diesem Thema auf.

Erweitern Sie den Quellcode
def remove_message_delivery_failure_listener(self):
    """
    This method unregisters for message delivery failure callbacks on this topic.
    """
    self.__topic.removeMessageDeliveryFailureListener()
def remove_topic_deleted_listener(self)

Diese Methode hebt die Registrierung für das gelöschte Ereignis des Themas auf.

Erweitern Sie den Quellcode
def remove_topic_deleted_listener(self):
    """
    This method unregisters for topic deleted event.
    """
    self.__topic.removeTopicDeletedListener()
def set_expiration_time(self, timespan)

Legt die Ablaufzeit der Nachricht für dieses Thema fest.

:param timespan: Die Ablaufzeit der Nachricht für dieses Thema. :Typ Zeitspanne: TimeSpan

Erweitern Sie den Quellcode
def set_expiration_time(self, timespan):
    """
    Sets the expiry time of message for this topic.

    :param timespan: The expiry time of message for this topic.
    :type timespan: TimeSpan
    """
    ValidateType.type_check(timespan, TimeSpan, self.set_expiration_time)
    javatimespan = timespan.get_instance()

    self.__topic.setExpirationTime(javatimespan)
def set_instance(self, value)
Erweitern Sie den Quellcode
def set_instance(self, value):
    self.__topic = value