Skip to content

Commit

Permalink
Creating Publisher Client
Browse files Browse the repository at this point in the history
  • Loading branch information
TheBurchLog committed Oct 30, 2023
1 parent daee0e9 commit d9ad4a0
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 4 deletions.
5 changes: 4 additions & 1 deletion brewtils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# -*- coding: utf-8 -*-
from brewtils.__version__ import __version__
from brewtils.config import get_argument_parser, get_connection_info, load_config
from brewtils.decorators import client, command, parameter, system
from brewtils.decorators import client, command, parameter, system, subscribe
from brewtils.log import configure_logging
from brewtils.plugin import Plugin, RemotePlugin # noqa F401
from brewtils.rest import normalize_url_prefix
from brewtils.rest.easy_client import get_easy_client, EasyClient
from brewtils.rest.system_client import SystemClient
from brewtils.rest.publish_client import PublishClient
from brewtils.auto_decorator import AutoDecorator

__all__ = [
Expand All @@ -15,9 +16,11 @@
"command",
"parameter",
"system",
"subscribe",
"Plugin",
"EasyClient",
"SystemClient",
"PublishClient",
"get_easy_client",
"get_argument_parser",
"get_connection_info",
Expand Down
41 changes: 40 additions & 1 deletion brewtils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"command",
"parameter",
"parameters",
"subscribe",
"system",
]

Expand Down Expand Up @@ -377,6 +378,41 @@ def cmd1(self, **kwargs):
return _wrapped


def subscribe(_wrapped=None, topic: str = None, topics=[]):
"""Decorator for specifiying topic to listen to.
for example::
@subscribe(topics=["myTopic])
def returnTrue(self):
return True
All commands are automatically subscribe to the topic of their "{Namespace}.{System Name}.{System Version}.{Instance}.{Command}"
Command will only be triggered once per publish event, even if it matches on multiple topics.
Args:
_wrapped: The function to decorate. This is handled as a positional argument and
shouldn't be explicitly set.
topic: The topic to subscribe to
topics: A list of topics to subscribe to
"""

if topic:
topics.append(topic)

if _wrapped is None:
return functools.partial(subscribe, topics=topics)

# Python 2 compatibility
if hasattr(_wrapped, "__func__"):
_wrapped.__func__.subscribe_topics = topics
else:
_wrapped.subscribe_topics = topics

return _wrapped


def _parse_client(client):
# type: (object) -> List[Command]
"""Get a list of Beergarden Commands from a client object
Expand Down Expand Up @@ -417,10 +453,13 @@ def _parse_method(method):
Beergarden Command targeting the given method
"""
if (inspect.ismethod(method) or inspect.isfunction(method)) and (
hasattr(method, "_command") or hasattr(method, "parameters")
hasattr(method, "_command")
or hasattr(method, "parameters")
or hasattr(method, "subscribe_topics")
):
# Create a command object if there isn't one already
method_command = _initialize_command(method)
method_command.topics = getattr(method, "subscribe_topics", [])

try:
# Need to initialize existing parameters before attempting to add parameters
Expand Down
5 changes: 4 additions & 1 deletion brewtils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Events(Enum):
REQUEST_UPDATED = 22
REQUEST_COMPLETED = 7
REQUEST_CANCELED = 42
REQUEST_TOPIC_PUBLISH = 51
INSTANCE_INITIALIZED = 8
INSTANCE_STARTED = 9
INSTANCE_UPDATED = 23
Expand Down Expand Up @@ -91,7 +92,7 @@ class Events(Enum):
COMMAND_PUBLISHING_BLOCKLIST_REMOVE = 49
COMMAND_PUBLISHING_BLOCKLIST_UPDATE = 50

# Next: 51
# Next: 52


class BaseModel(object):
Expand All @@ -117,6 +118,7 @@ def __init__(
icon_name=None,
hidden=False,
metadata=None,
topics=None,
):
self.name = name
self.description = description
Expand All @@ -129,6 +131,7 @@ def __init__(
self.icon_name = icon_name
self.hidden = hidden
self.metadata = metadata or {}
self.topics = topics or []

def __str__(self):
return self.name
Expand Down
124 changes: 124 additions & 0 deletions brewtils/rest/publish_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
import logging

import brewtils.plugin
from brewtils.models import Request, Event, Events
from brewtils.rest.easy_client import EasyClient


class PublishClient(Object):
"""High-level client for publishing requests on Beer-garden topics.
**Topics are internal routing values for Beer Garden. These are not RabbitMQ/Pika topics.**
PublishClient creation:
This class is intended to be the main way to create Beer-garden topic based requestes.
Create an instance with Beer-garden connection information:
client = PublishClient(
bg_host="host",
bg_port=2337,
)
Note: Passing an empty string as the system_namespace parameter will evalutate
to the local garden's default namespace.
Making a Request:
The standard way to create and send requests is by calling object attributes::
wasPublished = client.example_command(_topic="myTopic", param_1='example_param')
The request will be published to any commands that are listening to "myTopic" or a
topic that can be resolved to "myTopic" through regex.
Just like the SystemClient, param_1 will be passed as a request parameter to be executed
If a command listens for the topic and the parameter requirements do not match, the command
will fail to execute. Requests are validated against their subscribing commands.
When a Request is published, regardless of the status of children, it can move forward.
Published child commands can fail and not impact that overall status of the parent.
Args:
bg_host (str): Beer-garden hostname
bg_port (int): Beer-garden port
bg_url_prefix (str): URL path that will be used as a prefix when communicating
with Beer-garden. Useful if Beer-garden is running on a URL other than '/'.
ssl_enabled (bool): Whether to use SSL for Beer-garden communication
ca_cert (str): Path to certificate file containing the certificate of the
authority that issued the Beer-garden server certificate
ca_verify (bool): Whether to verify Beer-garden server certificate
client_cert (str): Path to client certificate to use when communicating with
Beer-garden
api_version (int): Beer-garden API version to use
client_timeout (int): Max time to wait for Beer-garden server response
username (str): Username for Beer-garden authentication
password (str): Password for Beer-garden authentication
access_token (str): Access token for Beer-garden authentication
refresh_token (str): Refresh token for Beer-garden authentication
"""

def __init__(self, *args, **kwargs):
self._logger = logging.getLogger(__name__)
self._easy_client = EasyClient(*args, **kwargs)

def publish(
self, _topic: str, _regex_only: bool = False, _propagate: bool = False, **kwargs
) -> bool:
"""Publishes event containing Request to be processed
Args:
_topic (str): The topic to publish to
_regex_only (bool): If the request will be resolved against only annotated topics from the @subscribe command
_propagate (bool): If the request will be pushed up to the parent to be resolved.
kwargs (dict): All necessary request parameters, including Beer-garden
internal parameters
"""

comment = kwargs.pop("_comment", None)
output_type = kwargs.pop("_output_type", None)
metadata = kwargs.pop("_metadata", {})
metadata["_topic"] = _topic

request = Request(
comment=comment,
output_type=output_type,
parent=self._get_parent_for_request(),
metadata=metadata,
parameters=kwargs,
)

event = Event(
name=Events.REQUEST_TOPIC_PUBLISH.name,
metadata={
"topic": _topic,
"propagate": _propagate,
"regex_only": _regex_only,
},
payload=request,
payload_type="Request",
)

return self._easy_client.publish_event(event)

def _get_parent_for_request(self):
# type: () -> Optional[Request]
parent = getattr(brewtils.plugin.request_context, "current_request", None)
if parent is None:
return None

if brewtils.plugin.CONFIG and (
brewtils.plugin.CONFIG.bg_host.upper()
!= self._easy_client.client.bg_host.upper()
or brewtils.plugin.CONFIG.bg_port != self._easy_client.client.bg_port
):
self._logger.warning(
"A parent request was found, but the destination beer-garden "
"appears to be different than the beer-garden to which this plugin "
"is assigned. Cross-server parent/child requests are not supported "
"at this time. Removing the parent context so the request doesn't fail."
)
return None

return Request(id=str(parent.id))
9 changes: 9 additions & 0 deletions brewtils/rest/system_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,18 @@ def _construct_bg_request(self, **kwargs):
output_type = kwargs.pop("_output_type", None)
metadata = kwargs.pop("_metadata", {})
parent = kwargs.pop("_parent", self._get_parent_for_request())
publish = kwargs.pop("_publish", None)
topic = kwargs.pop("_topic", None)
propagate = kwargs.pop("_propagate", None)

if system_display:
metadata["system_display_name"] = system_display
if publish:
metadata["_publish"] = publish
if topic:
metadata["_topic"] = topic
if propagate:
metadata["_propagate"] = propagate

# Don't check namespace - https://github.com/beer-garden/beer-garden/issues/827
if command is None:
Expand Down
3 changes: 2 additions & 1 deletion brewtils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class CommandSchema(BaseSchema):
icon_name = fields.Str(allow_none=True)
hidden = fields.Boolean(allow_none=True)
metadata = fields.Dict(allow_none=True)
topics = fields.List(fields.Str(), allow_none=True)


class InstanceSchema(BaseSchema):
Expand Down Expand Up @@ -401,7 +402,7 @@ class EventSchema(BaseSchema):
timestamp = DateTime(allow_none=True, format="epoch", example="1500065932000")

payload_type = fields.Str(allow_none=True)
payload = ModelField(allow_none=True)
payload = ModelField(allow_none=True, type_field="payload_type")

error = fields.Bool(allow_none=True)
error_message = fields.Str(allow_none=True)
Expand Down

0 comments on commit d9ad4a0

Please sign in to comment.