Skip to content

Commit

Permalink
Publish client and Subscribe Annotation (#414)
Browse files Browse the repository at this point in the history
Creating Publisher Client and subscription annotation
  • Loading branch information
TheBurchLog authored Nov 1, 2023
1 parent daee0e9 commit adee9e6
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ TBD

- Expanded Auto Generation to support Literal Type Hinting, if python version >= 3.8
- Fixed self reference bug in SystemClient
- Add PublishClient for broadcasting requests to Topics
- Add @subscribe annotation for commands to listen to topics

3.19.0
------
Expand Down
5 changes: 4 additions & 1 deletion brewtils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# -*- coding: utf-8 -*-
from brewtils.__version__ import __version__
from brewtils.config import get_argument_parser, get_connection_info, load_config
from brewtils.decorators import client, command, parameter, system
from brewtils.decorators import client, command, parameter, system, subscribe
from brewtils.log import configure_logging
from brewtils.plugin import Plugin, RemotePlugin # noqa F401
from brewtils.rest import normalize_url_prefix
from brewtils.rest.easy_client import get_easy_client, EasyClient
from brewtils.rest.system_client import SystemClient
from brewtils.rest.publish_client import PublishClient
from brewtils.auto_decorator import AutoDecorator

__all__ = [
Expand All @@ -15,9 +16,11 @@
"command",
"parameter",
"system",
"subscribe",
"Plugin",
"EasyClient",
"SystemClient",
"PublishClient",
"get_easy_client",
"get_argument_parser",
"get_connection_info",
Expand Down
2 changes: 1 addition & 1 deletion brewtils/__version__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# -*- coding: utf-8 -*-

__version__ = "3.19.0"
__version__ = "3.20.0"
42 changes: 41 additions & 1 deletion brewtils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"command",
"parameter",
"parameters",
"subscribe",
"system",
]

Expand Down Expand Up @@ -377,6 +378,42 @@ def cmd1(self, **kwargs):
return _wrapped


def subscribe(_wrapped=None, topic: str = None, topics=[]):
"""Decorator for specifiying topic to listen to.
for example::
@subscribe(topics=["myTopic])
def returnTrue(self):
return True
All commands are automatically subscribe to the topic of
their "{Namespace}.{System Name}.{System Version}.{Instance}.{Command}"
Command will only be triggered once per publish event, even if it matches on multiple topics.
Args:
_wrapped: The function to decorate. This is handled as a positional argument and
shouldn't be explicitly set.
topic: The topic to subscribe to
topics: A list of topics to subscribe to
"""

if topic:
topics.append(topic)

if _wrapped is None:
return functools.partial(subscribe, topics=topics)

# Python 2 compatibility
if hasattr(_wrapped, "__func__"):
_wrapped.__func__.subscribe_topics = topics
else:
_wrapped.subscribe_topics = topics

return _wrapped


def _parse_client(client):
# type: (object) -> List[Command]
"""Get a list of Beergarden Commands from a client object
Expand Down Expand Up @@ -417,10 +454,13 @@ def _parse_method(method):
Beergarden Command targeting the given method
"""
if (inspect.ismethod(method) or inspect.isfunction(method)) and (
hasattr(method, "_command") or hasattr(method, "parameters")
hasattr(method, "_command")
or hasattr(method, "parameters")
or hasattr(method, "subscribe_topics")
):
# Create a command object if there isn't one already
method_command = _initialize_command(method)
method_command.topics = getattr(method, "subscribe_topics", [])

try:
# Need to initialize existing parameters before attempting to add parameters
Expand Down
7 changes: 6 additions & 1 deletion brewtils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Events(Enum):
REQUEST_UPDATED = 22
REQUEST_COMPLETED = 7
REQUEST_CANCELED = 42
REQUEST_TOPIC_PUBLISH = 51
INSTANCE_INITIALIZED = 8
INSTANCE_STARTED = 9
INSTANCE_UPDATED = 23
Expand Down Expand Up @@ -91,7 +92,7 @@ class Events(Enum):
COMMAND_PUBLISHING_BLOCKLIST_REMOVE = 49
COMMAND_PUBLISHING_BLOCKLIST_UPDATE = 50

# Next: 51
# Next: 52


class BaseModel(object):
Expand All @@ -117,6 +118,7 @@ def __init__(
icon_name=None,
hidden=False,
metadata=None,
topics=None,
):
self.name = name
self.description = description
Expand All @@ -129,6 +131,7 @@ def __init__(
self.icon_name = icon_name
self.hidden = hidden
self.metadata = metadata or {}
self.topics = topics or []

def __str__(self):
return self.name
Expand Down Expand Up @@ -630,6 +633,7 @@ def __init__(
namespace=None,
command=None,
id=None, # noqa # shadows built-in
is_event=None,
parent=None,
children=None,
parameters=None,
Expand Down Expand Up @@ -660,6 +664,7 @@ def __init__(
output_type=output_type,
)
self.id = id
self.is_event = is_event or False
self.parent = parent
self.children = children
self.output = output
Expand Down
126 changes: 126 additions & 0 deletions brewtils/rest/publish_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
import logging

import brewtils.plugin
from brewtils.models import Request, Event, Events
from brewtils.rest.easy_client import EasyClient


class PublishClient(object):
"""High-level client for publishing requests on Beer-garden topics.
**Topics are internal routing values for Beer Garden. These are not RabbitMQ/Pika topics.**
PublishClient creation:
This class is intended to be the main way to create Beer-garden topic based requests.
Create an instance with Beer-garden connection information:
client = PublishClient(
bg_host="host",
bg_port=2337,
)
Note: Passing an empty string as the system_namespace parameter will evaluate
to the local garden's default namespace.
Making a Request:
The standard way to create and send requests is by calling object attributes::
wasPublished = client.example_command(_topic="myTopic", param_1='example_param')
The request will be published to any commands that are listening to "myTopic" or a
topic that can be resolved to "myTopic" through regex.
Just like the SystemClient, param_1 will be passed as a request parameter to be executed
If a command listens for the topic and the parameter requirements do not match, the command
will fail to execute. Requests are validated against their subscribing commands.
When a Request is published, regardless of the status of children, it can move forward.
Published child commands can fail and not impact that overall status of the parent.
Args:
bg_host (str): Beer-garden hostname
bg_port (int): Beer-garden port
bg_url_prefix (str): URL path that will be used as a prefix when communicating
with Beer-garden. Useful if Beer-garden is running on a URL other than '/'.
ssl_enabled (bool): Whether to use SSL for Beer-garden communication
ca_cert (str): Path to certificate file containing the certificate of the
authority that issued the Beer-garden server certificate
ca_verify (bool): Whether to verify Beer-garden server certificate
client_cert (str): Path to client certificate to use when communicating with
Beer-garden
api_version (int): Beer-garden API version to use
client_timeout (int): Max time to wait for Beer-garden server response
username (str): Username for Beer-garden authentication
password (str): Password for Beer-garden authentication
access_token (str): Access token for Beer-garden authentication
refresh_token (str): Refresh token for Beer-garden authentication
"""

def __init__(self, *args, **kwargs):
self._logger = logging.getLogger(__name__)
self._easy_client = EasyClient(*args, **kwargs)

def publish(
self, _topic: str, _regex_only: bool = False, _propagate: bool = False, **kwargs
) -> bool:
"""Publishes event containing Request to be processed
Args:
_topic (str): The topic to publish to
_regex_only (bool): If the request will be resolved against only annotated topics
from the @subscribe command
_propagate (bool): If the request will be pushed up to the parent to be resolved.
kwargs (dict): All necessary request parameters, including Beer-garden
internal parameters
"""

comment = kwargs.pop("_comment", None)
output_type = kwargs.pop("_output_type", None)
metadata = kwargs.pop("_metadata", {})
metadata["_topic"] = _topic
parent = kwargs.pop("_parent", self._get_parent_for_request())

request = Request(
comment=comment,
output_type=output_type,
parent=parent,
metadata=metadata,
parameters=kwargs,
)

event = Event(
name=Events.REQUEST_TOPIC_PUBLISH.name,
metadata={
"topic": _topic,
"propagate": _propagate,
"regex_only": _regex_only,
},
payload=request,
payload_type="Request",
)

return self._easy_client.publish_event(event)

def _get_parent_for_request(self):
# type: () -> Optional[Request]
parent = getattr(brewtils.plugin.request_context, "current_request", None)
if parent is None:
return None

if brewtils.plugin.CONFIG and (
brewtils.plugin.CONFIG.bg_host.upper()
!= self._easy_client.client.bg_host.upper()
or brewtils.plugin.CONFIG.bg_port != self._easy_client.client.bg_port
):
self._logger.warning(
"A parent request was found, but the destination beer-garden "
"appears to be different than the beer-garden to which this plugin "
"is assigned. Cross-server parent/child requests are not supported "
"at this time. Removing the parent context so the request doesn't fail."
)
return None

return Request(id=str(parent.id))
9 changes: 9 additions & 0 deletions brewtils/rest/system_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,18 @@ def _construct_bg_request(self, **kwargs):
output_type = kwargs.pop("_output_type", None)
metadata = kwargs.pop("_metadata", {})
parent = kwargs.pop("_parent", self._get_parent_for_request())
publish = kwargs.pop("_publish", None)
topic = kwargs.pop("_topic", None)
propagate = kwargs.pop("_propagate", None)

if system_display:
metadata["system_display_name"] = system_display
if publish:
metadata["_publish"] = publish
if topic:
metadata["_topic"] = topic
if propagate:
metadata["_propagate"] = propagate

# Don't check namespace - https://github.com/beer-garden/beer-garden/issues/827
if command is None:
Expand Down
4 changes: 3 additions & 1 deletion brewtils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class CommandSchema(BaseSchema):
icon_name = fields.Str(allow_none=True)
hidden = fields.Boolean(allow_none=True)
metadata = fields.Dict(allow_none=True)
topics = fields.List(fields.Str(), allow_none=True)


class InstanceSchema(BaseSchema):
Expand Down Expand Up @@ -328,6 +329,7 @@ class RequestTemplateSchema(BaseSchema):

class RequestSchema(RequestTemplateSchema):
id = fields.Str(allow_none=True)
is_event = fields.Bool(allow_none=True)
parent = fields.Nested("self", exclude=("children",), allow_none=True)
children = fields.Nested(
"self", exclude=("parent", "children"), many=True, default=None, allow_none=True
Expand Down Expand Up @@ -401,7 +403,7 @@ class EventSchema(BaseSchema):
timestamp = DateTime(allow_none=True, format="epoch", example="1500065932000")

payload_type = fields.Str(allow_none=True)
payload = ModelField(allow_none=True)
payload = ModelField(allow_none=True, type_field="payload_type")

error = fields.Bool(allow_none=True)
error_message = fields.Str(allow_none=True)
Expand Down
4 changes: 4 additions & 0 deletions brewtils/test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def command_dict(parameter_dict, system_id):
"template": "<html></html>",
"icon_name": "icon!",
"metadata": {"meta": "data"},
"topics": [],
}


Expand Down Expand Up @@ -287,6 +288,7 @@ def child_request_dict(ts_epoch):
"namespace": "ns",
"command": "say",
"id": "58542eb571afd47ead90d25f",
"is_event": False,
"parameters": {},
"comment": "bye!",
"output": "nested output",
Expand Down Expand Up @@ -324,6 +326,7 @@ def parent_request_dict(ts_epoch):
"namespace": "ns",
"command": "say",
"id": "58542eb571afd47ead90d25d",
"is_event": False,
"parent": None,
"parameters": {},
"comment": "bye!",
Expand Down Expand Up @@ -385,6 +388,7 @@ def request_dict(parent_request_dict, child_request_dict, ts_epoch):
"namespace": "ns",
"command": "speak",
"id": "58542eb571afd47ead90d25e",
"is_event": False,
"parent": parent_request_dict,
"children": [child_request_dict],
"parameters": {"message": "hey!"},
Expand Down
1 change: 1 addition & 0 deletions test/decorators_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ def test_basic(self, command_dict, bg_command):
bg_command.parameters = []
del command_dict["name"]
del command_dict["parameters"]
del command_dict["topics"]

@command(**command_dict)
def foo():
Expand Down

0 comments on commit adee9e6

Please sign in to comment.