Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Brewtils Plugin requires #500

Merged
merged 17 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Brewtils Changelog
==================

3.27.1
------
TBD

- Updated Plugin class to accept requires and requires_timeout attributes to require plugin to wait for dependencies before starting.

3.27.0
------
8/13/24
Expand Down
10 changes: 10 additions & 0 deletions brewtils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def client(
group=None, # type: str
groups=[], # type: Optional[List[str]]
prefix_topic=None, # type: Optional[str]
require=None, # type: str
requires=[], # type: Optional[List[str]]
):
# type: (...) -> Type
"""Class decorator that marks a class as a beer-garden Client
Expand Down Expand Up @@ -70,6 +72,8 @@ def client(
group: Optional plugin group
groups: Optional plugin groups
prefix_topic: Optional prefix for Generated Command to Topic mappings
require: Optional system dependency
requires: Optional system dependencies

Returns:
The decorated class
Expand All @@ -83,6 +87,8 @@ def client(
groups=groups,
group=group,
prefix_topic=prefix_topic,
require=require,
requires=requires,
) # noqa

# Assign these here so linters don't complain
Expand All @@ -92,10 +98,14 @@ def client(
_wrapped._current_request = None
_wrapped._groups = groups
_wrapped._prefix_topic = prefix_topic
_wrapped._requires = requires

if group:
_wrapped._groups.append(group)

if require:
_wrapped._requires.append(require)

return _wrapped


Expand Down
5 changes: 5 additions & 0 deletions brewtils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class Instance(BaseModel):
"STARTING",
"STOPPING",
"UNKNOWN",
"AWAITING_SYSTEM",
}

def __init__(
Expand Down Expand Up @@ -821,6 +822,8 @@ def __init__(
template=None,
groups=None,
prefix_topic=None,
requires=None,
requires_timeout=None,
):
self.name = name
self.description = description
Expand All @@ -837,6 +840,8 @@ def __init__(
self.template = template
self.groups = groups or []
self.prefix_topic = prefix_topic
self.requires = requires or []
self.requires_timeout = requires_timeout

def __str__(self):
return "%s:%s-%s" % (self.namespace, self.name, self.version)
Expand Down
111 changes: 90 additions & 21 deletions brewtils/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class Plugin(object):
- ``display_name``
- ``group``
- ``groups``
- ``require``
- ``requires``
- ``requires_timeout``

Connection information tells the Plugin how to communicate with Beer-garden. The
most important of these is the ``bg_host`` (to tell the plugin where to find the
Expand Down Expand Up @@ -172,6 +175,10 @@ class Plugin(object):
instance_name (str): Instance name
namespace (str): Namespace name

require (str): Required system dependency
requires (list): Required systems dependencies
requires_timeout (int): Timeout to wait for dependencies

group (str): Grouping label applied to plugin
groups (list): Grouping labels applied to plugin

Expand Down Expand Up @@ -256,20 +263,24 @@ def run(self):
"attribute to an instance of a class decorated with @brewtils.system"
)

self._startup()
self._logger.info("Plugin %s has started", self.unique_name)

try:
# Need the timeout param so this works correctly in Python 2
while not self._shutdown_event.wait(timeout=0.1):
pass
except KeyboardInterrupt:
self._logger.debug("Received KeyboardInterrupt - shutting down")
except Exception as ex:
self._logger.exception("Exception during wait, shutting down: %s", ex)
self._startup()
self._logger.info("Plugin %s has started", self.unique_name)

self._shutdown()
self._logger.info("Plugin %s has terminated", self.unique_name)
try:
# Need the timeout param so this works correctly in Python 2
while not self._shutdown_event.wait(timeout=0.1):
pass
except KeyboardInterrupt:
self._logger.debug("Received KeyboardInterrupt - shutting down")
except Exception as ex:
self._logger.exception("Exception during wait, shutting down: %s", ex)

self._shutdown()
except PluginValidationError:
self._shutdown(status="ERROR")
finally:
self._logger.info("Plugin %s has terminated", self.unique_name)

@property
def client(self):
Expand Down Expand Up @@ -299,6 +310,8 @@ def _set_client(self, new_client):
self._system.prefix_topic = getattr(
new_client, "_prefix_topic", None
) # noqa
if not self._system.requires:
self._system.requires = getattr(new_client, "_requires", []) # noqa
# Now roll up / interpret all metadata to get the Commands
self._system.commands = _parse_client(new_client)

Expand All @@ -315,6 +328,7 @@ def _set_client(self, new_client):
client_clazz._bg_commands = self._system.commands
client_clazz._groups = self._system.groups
client_clazz._prefix_topic = self._system.prefix_topic
client_clazz._requires = self._system.requires
client_clazz._current_request = client_clazz.current_request
except TypeError:
if sys.version_info.major != 2:
Expand Down Expand Up @@ -368,6 +382,35 @@ def _hook(exc_type, exc_value, traceback):

sys.excepthook = _hook

def get_system_dependency(self, require, timeout=300):
wait_time = 0.1
while timeout > 0:
system = self._ez_client.find_unique_system(name=require, local=True)
if (
system
and system.instances
and any("RUNNING" == instance.status for instance in system.instances)
):
return system
self.logger.error(
f"Waiting {wait_time:.1f} seconds before next attempt for {self._system} "
f"dependency for {require}"
)
timeout = timeout - wait_time
wait_time = min(wait_time * 2, 30)
self._wait(wait_time)

raise PluginValidationError(
f"Failed to resolve {self._system} dependency for {require}"
)

def await_dependencies(self, requires, config):
for req in requires:
system = self.get_system_dependency(req, config.requires_timeout)
self.logger.info(
f"Resolved system {system} for {req}: {config.name} {config.instance_name}"
)

def _startup(self):
"""Plugin startup procedure

Expand Down Expand Up @@ -406,12 +449,20 @@ def _startup(self):
self._logger.debug("Initializing and starting processors")
self._admin_processor, self._request_processor = self._initialize_processors()
self._admin_processor.startup()
self._request_processor.startup()

self._logger.debug("Setting signal handlers")
self._set_signal_handlers()
try:
if self._system.requires:
self.await_dependencies(self._system.requires, self._config)
except PluginValidationError:
raise
else:
self._start()
self._request_processor.startup()
finally:
self._logger.debug("Setting signal handlers")
self._set_signal_handlers()

def _shutdown(self):
def _shutdown(self, status="STOPPED"):
"""Plugin shutdown procedure

This method gracefully stops the plugin. When it completes the plugin should be
Expand All @@ -422,14 +473,18 @@ def _shutdown(self):
self._shutdown_event.set()

self._logger.debug("Shutting down processors")
self._request_processor.shutdown()
# Join will cause an exception if processor thread wasn't started
try:
self._request_processor.shutdown()
except RuntimeError:
pass
self._admin_processor.shutdown()

try:
self._ez_client.update_instance(self._instance.id, new_status="STOPPED")
self._ez_client.update_instance(self._instance.id, new_status=status)
except Exception:
self._logger.warning(
"Unable to notify Beer-garden that this plugin is STOPPED, so this "
f"Unable to notify Beer-garden that this plugin is {status}, so this "
"plugin's status may be incorrect in Beer-garden"
)

Expand Down Expand Up @@ -540,6 +595,7 @@ def _initialize_system(self):
"icon_name": self._system.icon_name,
"template": self._system.template,
"groups": self._system.groups,
"requires": self._system.requires,
}

# And if this particular instance doesn't exist we want to add it
Expand Down Expand Up @@ -590,13 +646,13 @@ def _initialize_processors(self):
thread_name="Admin Consumer",
queue_name=self._instance.queue_info["admin"]["name"],
max_concurrent=1,
**common_args
**common_args,
)
request_consumer = RequestConsumer.create(
thread_name="Request Consumer",
queue_name=self._instance.queue_info["request"]["name"],
max_concurrent=self._config.max_concurrent,
**common_args
**common_args,
)

# Both RequestProcessors need an updater
Expand Down Expand Up @@ -635,6 +691,14 @@ def _start(self):
self._instance.id, new_status="RUNNING"
)

def _wait(self, timeout):
"""Handle wait request"""
# Set the status to wait
self._instance = self._ez_client.update_instance(
self._instance.id, new_status="AWAITING_SYSTEM"
)
self._shutdown_event.wait(timeout)

def _stop(self):
"""Handle stop Request"""
# Because the run() method is on a 0.1s sleep there's a race regarding if the
Expand Down Expand Up @@ -847,6 +911,9 @@ def _setup_system(self, system, plugin_kwargs):
if self._config.group:
self._config.groups.append(self._config.group)

if self._config.require:
self._config.requires.append(self._config.require)

system = System(
name=self._config.name,
version=self._config.version,
Expand All @@ -860,6 +927,8 @@ def _setup_system(self, system, plugin_kwargs):
template=self._config.template,
groups=self._config.groups,
prefix_topic=self._config.prefix_topic,
requires=self._config.requires,
requires_timeout=self._config.requires_timeout,
)

return system
Expand Down
5 changes: 5 additions & 0 deletions brewtils/rest/easy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ def update_system(self, system_id, new_commands=None, **kwargs):
icon_name (str): New System icon name
template (str): New System template
groups (list): New System groups
requires (list): New System dependencies

Returns:
System: The updated system
Expand All @@ -424,6 +425,10 @@ def update_system(self, system_id, new_commands=None, **kwargs):
if groups:
operations.append(PatchOperation("replace", "/groups", groups))

requires = kwargs.pop("requires", [])
if requires:
operations.append(PatchOperation("replace", "/requires", requires))

# The remaining kwargs are all strings
# Sending an empty string (instead of None) ensures they're actually cleared
for key, value in kwargs.items():
Expand Down
2 changes: 2 additions & 0 deletions brewtils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ class SystemSchema(BaseSchema):
template = fields.Str(allow_none=True)
groups = fields.List(fields.Str(), allow_none=True)
prefix_topic = fields.Str(allow_none=True)
requires = fields.List(fields.Str(), allow_none=True)
requires_timeout = fields.Integer(allow_none=True)


class SystemDomainIdentifierSchema(BaseSchema):
Expand Down
17 changes: 17 additions & 0 deletions brewtils/specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ def _is_json_dict(s):
"<garden name>.<namespace>.<system name>.<system version>.<system instance>.<command name>"
"if a prefix is provided, then it is `<prefix>.<command name>`",
},
"require": {
"type": "str",
"description": "A requires system dependency",
"required": False,
},
"requires": {
"type": "list",
"description": "The required system dependencies",
"items": {"name": {"type": "str"}},
"required": False,
"default": [],
},
"requires_timeout": {
"type": "int",
"description": "The dependency timeout to use",
"default": 300,
},
}

_PLUGIN_SPEC = {
Expand Down
2 changes: 2 additions & 0 deletions brewtils/test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def system_dict(instance_dict, command_dict, command_dict_2, system_id):
"template": "<html>template</html>",
"groups": ["GroupB", "GroupA"],
"prefix_topic": "custom_topic",
"requires": ["SystemA"],
"requires_timeout": 300,
}


Expand Down
Loading
Loading