diff --git a/brewtils/__init__.py b/brewtils/__init__.py index 638075a9..82e5e0b2 100644 --- a/brewtils/__init__.py +++ b/brewtils/__init__.py @@ -1,12 +1,13 @@ # -*- coding: utf-8 -*- from brewtils.__version__ import __version__ from brewtils.config import get_argument_parser, get_connection_info, load_config -from brewtils.decorators import client, command, parameter, system +from brewtils.decorators import client, command, parameter, system, subscribe from brewtils.log import configure_logging from brewtils.plugin import Plugin, RemotePlugin # noqa F401 from brewtils.rest import normalize_url_prefix from brewtils.rest.easy_client import get_easy_client, EasyClient from brewtils.rest.system_client import SystemClient +from brewtils.rest.publish_client import PublishClient from brewtils.auto_decorator import AutoDecorator __all__ = [ @@ -15,9 +16,11 @@ "command", "parameter", "system", + "subscribe", "Plugin", "EasyClient", "SystemClient", + "PublishClient", "get_easy_client", "get_argument_parser", "get_connection_info", diff --git a/brewtils/decorators.py b/brewtils/decorators.py index 0082faf0..aea2d66e 100644 --- a/brewtils/decorators.py +++ b/brewtils/decorators.py @@ -27,6 +27,7 @@ "command", "parameter", "parameters", + "subscribe", "system", ] @@ -377,6 +378,41 @@ def cmd1(self, **kwargs): return _wrapped +def subscribe(_wrapped=None, topic: str = None, topics=[]): + """Decorator for specifiying topic to listen to. + + for example:: + + @subscribe(topics=["myTopic]) + def returnTrue(self): + return True + + All commands are automatically subscribe to the topic of their "{Namespace}.{System Name}.{System Version}.{Instance}.{Command}" + + Command will only be triggered once per publish event, even if it matches on multiple topics. + + Args: + _wrapped: The function to decorate. This is handled as a positional argument and + shouldn't be explicitly set. + topic: The topic to subscribe to + topics: A list of topics to subscribe to + """ + + if topic: + topics.append(topic) + + if _wrapped is None: + return functools.partial(subscribe, topics=topics) + + # Python 2 compatibility + if hasattr(_wrapped, "__func__"): + _wrapped.__func__.subscribe_topics = topics + else: + _wrapped.subscribe_topics = topics + + return _wrapped + + def _parse_client(client): # type: (object) -> List[Command] """Get a list of Beergarden Commands from a client object @@ -417,10 +453,13 @@ def _parse_method(method): Beergarden Command targeting the given method """ if (inspect.ismethod(method) or inspect.isfunction(method)) and ( - hasattr(method, "_command") or hasattr(method, "parameters") + hasattr(method, "_command") + or hasattr(method, "parameters") + or hasattr(method, "subscribe_topics") ): # Create a command object if there isn't one already method_command = _initialize_command(method) + method_command.topics = getattr(method, "subscribe_topics", []) try: # Need to initialize existing parameters before attempting to add parameters diff --git a/brewtils/models.py b/brewtils/models.py index 70d244ee..7807b537 100644 --- a/brewtils/models.py +++ b/brewtils/models.py @@ -49,6 +49,7 @@ class Events(Enum): REQUEST_UPDATED = 22 REQUEST_COMPLETED = 7 REQUEST_CANCELED = 42 + REQUEST_TOPIC_PUBLISH = 51 INSTANCE_INITIALIZED = 8 INSTANCE_STARTED = 9 INSTANCE_UPDATED = 23 @@ -91,7 +92,7 @@ class Events(Enum): COMMAND_PUBLISHING_BLOCKLIST_REMOVE = 49 COMMAND_PUBLISHING_BLOCKLIST_UPDATE = 50 - # Next: 51 + # Next: 52 class BaseModel(object): @@ -117,6 +118,7 @@ def __init__( icon_name=None, hidden=False, metadata=None, + topics=None, ): self.name = name self.description = description @@ -129,6 +131,7 @@ def __init__( self.icon_name = icon_name self.hidden = hidden self.metadata = metadata or {} + self.topics = topics or [] def __str__(self): return self.name diff --git a/brewtils/rest/publish_client.py b/brewtils/rest/publish_client.py new file mode 100644 index 00000000..fa1a2ee3 --- /dev/null +++ b/brewtils/rest/publish_client.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +import logging + +import brewtils.plugin +from brewtils.models import Request, Event, Events +from brewtils.rest.easy_client import EasyClient + + +class PublishClient(Object): + """High-level client for publishing requests on Beer-garden topics. + + **Topics are internal routing values for Beer Garden. These are not RabbitMQ/Pika topics.** + + PublishClient creation: + This class is intended to be the main way to create Beer-garden topic based requestes. + Create an instance with Beer-garden connection information: + + client = PublishClient( + bg_host="host", + bg_port=2337, + ) + + Note: Passing an empty string as the system_namespace parameter will evalutate + to the local garden's default namespace. + + Making a Request: + The standard way to create and send requests is by calling object attributes:: + + wasPublished = client.example_command(_topic="myTopic", param_1='example_param') + + The request will be published to any commands that are listening to "myTopic" or a + topic that can be resolved to "myTopic" through regex. + + Just like the SystemClient, param_1 will be passed as a request parameter to be executed + + If a command listens for the topic and the parameter requirements do not match, the command + will fail to execute. Requests are validated against their subscribing commands. + + When a Request is published, regardless of the status of children, it can move forward. + Published child commands can fail and not impact that overall status of the parent. + + Args: + bg_host (str): Beer-garden hostname + bg_port (int): Beer-garden port + bg_url_prefix (str): URL path that will be used as a prefix when communicating + with Beer-garden. Useful if Beer-garden is running on a URL other than '/'. + ssl_enabled (bool): Whether to use SSL for Beer-garden communication + ca_cert (str): Path to certificate file containing the certificate of the + authority that issued the Beer-garden server certificate + ca_verify (bool): Whether to verify Beer-garden server certificate + client_cert (str): Path to client certificate to use when communicating with + Beer-garden + api_version (int): Beer-garden API version to use + client_timeout (int): Max time to wait for Beer-garden server response + username (str): Username for Beer-garden authentication + password (str): Password for Beer-garden authentication + access_token (str): Access token for Beer-garden authentication + refresh_token (str): Refresh token for Beer-garden authentication + """ + + def __init__(self, *args, **kwargs): + self._logger = logging.getLogger(__name__) + self._easy_client = EasyClient(*args, **kwargs) + + def publish( + self, _topic: str, _regex_only: bool = False, _propagate: bool = False, **kwargs + ) -> bool: + """Publishes event containing Request to be processed + + Args: + _topic (str): The topic to publish to + _regex_only (bool): If the request will be resolved against only annotated topics from the @subscribe command + _propagate (bool): If the request will be pushed up to the parent to be resolved. + kwargs (dict): All necessary request parameters, including Beer-garden + internal parameters + + """ + + comment = kwargs.pop("_comment", None) + output_type = kwargs.pop("_output_type", None) + metadata = kwargs.pop("_metadata", {}) + metadata["_topic"] = _topic + + request = Request( + comment=comment, + output_type=output_type, + parent=self._get_parent_for_request(), + metadata=metadata, + parameters=kwargs, + ) + + event = Event( + name=Events.REQUEST_TOPIC_PUBLISH.name, + metadata={ + "topic": _topic, + "propagate": _propagate, + "regex_only": _regex_only, + }, + payload=request, + payload_type="Request", + ) + + return self._easy_client.publish_event(event) + + def _get_parent_for_request(self): + # type: () -> Optional[Request] + parent = getattr(brewtils.plugin.request_context, "current_request", None) + if parent is None: + return None + + if brewtils.plugin.CONFIG and ( + brewtils.plugin.CONFIG.bg_host.upper() + != self._easy_client.client.bg_host.upper() + or brewtils.plugin.CONFIG.bg_port != self._easy_client.client.bg_port + ): + self._logger.warning( + "A parent request was found, but the destination beer-garden " + "appears to be different than the beer-garden to which this plugin " + "is assigned. Cross-server parent/child requests are not supported " + "at this time. Removing the parent context so the request doesn't fail." + ) + return None + + return Request(id=str(parent.id)) diff --git a/brewtils/rest/system_client.py b/brewtils/rest/system_client.py index 82628706..2e66a848 100644 --- a/brewtils/rest/system_client.py +++ b/brewtils/rest/system_client.py @@ -529,9 +529,18 @@ def _construct_bg_request(self, **kwargs): output_type = kwargs.pop("_output_type", None) metadata = kwargs.pop("_metadata", {}) parent = kwargs.pop("_parent", self._get_parent_for_request()) + publish = kwargs.pop("_publish", None) + topic = kwargs.pop("_topic", None) + propagate = kwargs.pop("_propagate", None) if system_display: metadata["system_display_name"] = system_display + if publish: + metadata["_publish"] = publish + if topic: + metadata["_topic"] = topic + if propagate: + metadata["_propagate"] = propagate # Don't check namespace - https://github.com/beer-garden/beer-garden/issues/827 if command is None: diff --git a/brewtils/schemas.py b/brewtils/schemas.py index e632c324..2cb06be8 100644 --- a/brewtils/schemas.py +++ b/brewtils/schemas.py @@ -221,6 +221,7 @@ class CommandSchema(BaseSchema): icon_name = fields.Str(allow_none=True) hidden = fields.Boolean(allow_none=True) metadata = fields.Dict(allow_none=True) + topics = fields.List(fields.Str(), allow_none=True) class InstanceSchema(BaseSchema): @@ -401,7 +402,7 @@ class EventSchema(BaseSchema): timestamp = DateTime(allow_none=True, format="epoch", example="1500065932000") payload_type = fields.Str(allow_none=True) - payload = ModelField(allow_none=True) + payload = ModelField(allow_none=True, type_field="payload_type") error = fields.Bool(allow_none=True) error_message = fields.Str(allow_none=True)