Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish client and Subscribe Annotation #414

Merged
merged 9 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ TBD

- Expanded Auto Generation to support Literal Type Hinting, if python version >= 3.8
- Fixed self reference bug in SystemClient
- Add PublishClient for broadcasting requests to Topics
- Add @subscribe annotation for commands to listen to topics

3.19.0
------
Expand Down
5 changes: 4 additions & 1 deletion brewtils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# -*- coding: utf-8 -*-
from brewtils.__version__ import __version__
from brewtils.config import get_argument_parser, get_connection_info, load_config
from brewtils.decorators import client, command, parameter, system
from brewtils.decorators import client, command, parameter, system, subscribe
from brewtils.log import configure_logging
from brewtils.plugin import Plugin, RemotePlugin # noqa F401
from brewtils.rest import normalize_url_prefix
from brewtils.rest.easy_client import get_easy_client, EasyClient
from brewtils.rest.system_client import SystemClient
from brewtils.rest.publish_client import PublishClient
from brewtils.auto_decorator import AutoDecorator

__all__ = [
Expand All @@ -15,9 +16,11 @@
"command",
"parameter",
"system",
"subscribe",
"Plugin",
"EasyClient",
"SystemClient",
"PublishClient",
"get_easy_client",
"get_argument_parser",
"get_connection_info",
Expand Down
2 changes: 1 addition & 1 deletion brewtils/__version__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# -*- coding: utf-8 -*-

__version__ = "3.19.0"
__version__ = "3.20.0"
42 changes: 41 additions & 1 deletion brewtils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"command",
"parameter",
"parameters",
"subscribe",
"system",
]

Expand Down Expand Up @@ -377,6 +378,42 @@ def cmd1(self, **kwargs):
return _wrapped


def subscribe(_wrapped=None, topic: str = None, topics=[]):
"""Decorator for specifiying topic to listen to.

for example::

@subscribe(topics=["myTopic])
def returnTrue(self):
return True

All commands are automatically subscribe to the topic of
their "{Namespace}.{System Name}.{System Version}.{Instance}.{Command}"

Command will only be triggered once per publish event, even if it matches on multiple topics.

Args:
_wrapped: The function to decorate. This is handled as a positional argument and
shouldn't be explicitly set.
topic: The topic to subscribe to
topics: A list of topics to subscribe to
"""

if topic:
topics.append(topic)

if _wrapped is None:
return functools.partial(subscribe, topics=topics)

# Python 2 compatibility
if hasattr(_wrapped, "__func__"):
_wrapped.__func__.subscribe_topics = topics
else:
_wrapped.subscribe_topics = topics

return _wrapped


def _parse_client(client):
# type: (object) -> List[Command]
"""Get a list of Beergarden Commands from a client object
Expand Down Expand Up @@ -417,10 +454,13 @@ def _parse_method(method):
Beergarden Command targeting the given method
"""
if (inspect.ismethod(method) or inspect.isfunction(method)) and (
hasattr(method, "_command") or hasattr(method, "parameters")
hasattr(method, "_command")
or hasattr(method, "parameters")
or hasattr(method, "subscribe_topics")
):
# Create a command object if there isn't one already
method_command = _initialize_command(method)
method_command.topics = getattr(method, "subscribe_topics", [])

try:
# Need to initialize existing parameters before attempting to add parameters
Expand Down
7 changes: 6 additions & 1 deletion brewtils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Events(Enum):
REQUEST_UPDATED = 22
REQUEST_COMPLETED = 7
REQUEST_CANCELED = 42
REQUEST_TOPIC_PUBLISH = 51
INSTANCE_INITIALIZED = 8
INSTANCE_STARTED = 9
INSTANCE_UPDATED = 23
Expand Down Expand Up @@ -91,7 +92,7 @@ class Events(Enum):
COMMAND_PUBLISHING_BLOCKLIST_REMOVE = 49
COMMAND_PUBLISHING_BLOCKLIST_UPDATE = 50

# Next: 51
# Next: 52


class BaseModel(object):
Expand All @@ -117,6 +118,7 @@ def __init__(
icon_name=None,
hidden=False,
metadata=None,
topics=None,
):
self.name = name
self.description = description
Expand All @@ -129,6 +131,7 @@ def __init__(
self.icon_name = icon_name
self.hidden = hidden
self.metadata = metadata or {}
self.topics = topics or []

def __str__(self):
return self.name
Expand Down Expand Up @@ -630,6 +633,7 @@ def __init__(
namespace=None,
command=None,
id=None, # noqa # shadows built-in
is_event=None,
parent=None,
children=None,
parameters=None,
Expand Down Expand Up @@ -660,6 +664,7 @@ def __init__(
output_type=output_type,
)
self.id = id
self.is_event = is_event or False
self.parent = parent
self.children = children
self.output = output
Expand Down
126 changes: 126 additions & 0 deletions brewtils/rest/publish_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
import logging

import brewtils.plugin
from brewtils.models import Request, Event, Events
from brewtils.rest.easy_client import EasyClient


class PublishClient(object):
"""High-level client for publishing requests on Beer-garden topics.

**Topics are internal routing values for Beer Garden. These are not RabbitMQ/Pika topics.**

PublishClient creation:
This class is intended to be the main way to create Beer-garden topic based requests.
Create an instance with Beer-garden connection information:

client = PublishClient(
bg_host="host",
bg_port=2337,
)

Note: Passing an empty string as the system_namespace parameter will evaluate
to the local garden's default namespace.

Making a Request:
The standard way to create and send requests is by calling object attributes::

wasPublished = client.example_command(_topic="myTopic", param_1='example_param')

The request will be published to any commands that are listening to "myTopic" or a
topic that can be resolved to "myTopic" through regex.

Just like the SystemClient, param_1 will be passed as a request parameter to be executed

If a command listens for the topic and the parameter requirements do not match, the command
will fail to execute. Requests are validated against their subscribing commands.

When a Request is published, regardless of the status of children, it can move forward.
Published child commands can fail and not impact that overall status of the parent.

Args:
bg_host (str): Beer-garden hostname
bg_port (int): Beer-garden port
bg_url_prefix (str): URL path that will be used as a prefix when communicating
with Beer-garden. Useful if Beer-garden is running on a URL other than '/'.
ssl_enabled (bool): Whether to use SSL for Beer-garden communication
ca_cert (str): Path to certificate file containing the certificate of the
authority that issued the Beer-garden server certificate
ca_verify (bool): Whether to verify Beer-garden server certificate
client_cert (str): Path to client certificate to use when communicating with
Beer-garden
api_version (int): Beer-garden API version to use
client_timeout (int): Max time to wait for Beer-garden server response
username (str): Username for Beer-garden authentication
password (str): Password for Beer-garden authentication
access_token (str): Access token for Beer-garden authentication
refresh_token (str): Refresh token for Beer-garden authentication
"""

def __init__(self, *args, **kwargs):
self._logger = logging.getLogger(__name__)
self._easy_client = EasyClient(*args, **kwargs)

def publish(
self, _topic: str, _regex_only: bool = False, _propagate: bool = False, **kwargs
) -> bool:
"""Publishes event containing Request to be processed

Args:
_topic (str): The topic to publish to
_regex_only (bool): If the request will be resolved against only annotated topics
from the @subscribe command
_propagate (bool): If the request will be pushed up to the parent to be resolved.
kwargs (dict): All necessary request parameters, including Beer-garden
internal parameters

"""

comment = kwargs.pop("_comment", None)
output_type = kwargs.pop("_output_type", None)
metadata = kwargs.pop("_metadata", {})
metadata["_topic"] = _topic
parent = kwargs.pop("_parent", self._get_parent_for_request())

request = Request(
comment=comment,
output_type=output_type,
parent=parent,
metadata=metadata,
parameters=kwargs,
)

event = Event(
name=Events.REQUEST_TOPIC_PUBLISH.name,
metadata={
"topic": _topic,
"propagate": _propagate,
"regex_only": _regex_only,
},
payload=request,
payload_type="Request",
)

return self._easy_client.publish_event(event)

def _get_parent_for_request(self):
# type: () -> Optional[Request]
parent = getattr(brewtils.plugin.request_context, "current_request", None)
if parent is None:
return None

if brewtils.plugin.CONFIG and (
brewtils.plugin.CONFIG.bg_host.upper()
!= self._easy_client.client.bg_host.upper()
or brewtils.plugin.CONFIG.bg_port != self._easy_client.client.bg_port
):
self._logger.warning(
"A parent request was found, but the destination beer-garden "
"appears to be different than the beer-garden to which this plugin "
"is assigned. Cross-server parent/child requests are not supported "
"at this time. Removing the parent context so the request doesn't fail."
)
return None

return Request(id=str(parent.id))
9 changes: 9 additions & 0 deletions brewtils/rest/system_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,18 @@ def _construct_bg_request(self, **kwargs):
output_type = kwargs.pop("_output_type", None)
metadata = kwargs.pop("_metadata", {})
parent = kwargs.pop("_parent", self._get_parent_for_request())
publish = kwargs.pop("_publish", None)
topic = kwargs.pop("_topic", None)
propagate = kwargs.pop("_propagate", None)

if system_display:
metadata["system_display_name"] = system_display
if publish:
metadata["_publish"] = publish
if topic:
metadata["_topic"] = topic
if propagate:
metadata["_propagate"] = propagate

# Don't check namespace - https://github.com/beer-garden/beer-garden/issues/827
if command is None:
Expand Down
4 changes: 3 additions & 1 deletion brewtils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class CommandSchema(BaseSchema):
icon_name = fields.Str(allow_none=True)
hidden = fields.Boolean(allow_none=True)
metadata = fields.Dict(allow_none=True)
topics = fields.List(fields.Str(), allow_none=True)


class InstanceSchema(BaseSchema):
Expand Down Expand Up @@ -328,6 +329,7 @@ class RequestTemplateSchema(BaseSchema):

class RequestSchema(RequestTemplateSchema):
id = fields.Str(allow_none=True)
is_event = fields.Bool(allow_none=True)
parent = fields.Nested("self", exclude=("children",), allow_none=True)
children = fields.Nested(
"self", exclude=("parent", "children"), many=True, default=None, allow_none=True
Expand Down Expand Up @@ -401,7 +403,7 @@ class EventSchema(BaseSchema):
timestamp = DateTime(allow_none=True, format="epoch", example="1500065932000")

payload_type = fields.Str(allow_none=True)
payload = ModelField(allow_none=True)
payload = ModelField(allow_none=True, type_field="payload_type")

error = fields.Bool(allow_none=True)
error_message = fields.Str(allow_none=True)
Expand Down
4 changes: 4 additions & 0 deletions brewtils/test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def command_dict(parameter_dict, system_id):
"template": "<html></html>",
"icon_name": "icon!",
"metadata": {"meta": "data"},
"topics": [],
}


Expand Down Expand Up @@ -287,6 +288,7 @@ def child_request_dict(ts_epoch):
"namespace": "ns",
"command": "say",
"id": "58542eb571afd47ead90d25f",
"is_event": False,
"parameters": {},
"comment": "bye!",
"output": "nested output",
Expand Down Expand Up @@ -324,6 +326,7 @@ def parent_request_dict(ts_epoch):
"namespace": "ns",
"command": "say",
"id": "58542eb571afd47ead90d25d",
"is_event": False,
"parent": None,
"parameters": {},
"comment": "bye!",
Expand Down Expand Up @@ -385,6 +388,7 @@ def request_dict(parent_request_dict, child_request_dict, ts_epoch):
"namespace": "ns",
"command": "speak",
"id": "58542eb571afd47ead90d25e",
"is_event": False,
"parent": parent_request_dict,
"children": [child_request_dict],
"parameters": {"message": "hey!"},
Expand Down
1 change: 1 addition & 0 deletions test/decorators_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ def test_basic(self, command_dict, bg_command):
bg_command.parameters = []
del command_dict["name"]
del command_dict["parameters"]
del command_dict["topics"]

@command(**command_dict)
def foo():
Expand Down
Loading