Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Brewtils Plugin requires #500

Merged
merged 17 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Brewtils Changelog
==================

3.27.1
------
TBD

- Updated Plugin class to accept requires and requires_timeout attributes to require plugin to wait for dependencies before starting.

3.27.0
------
8/13/24
Expand Down
5 changes: 5 additions & 0 deletions brewtils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class Instance(BaseModel):
"STARTING",
"STOPPING",
"UNKNOWN",
"AWAITING_SYSTEM",
}

def __init__(
Expand Down Expand Up @@ -821,6 +822,8 @@ def __init__(
template=None,
groups=None,
prefix_topic=None,
requires=None,
requires_timeout=None,
):
self.name = name
self.description = description
Expand All @@ -837,6 +840,8 @@ def __init__(
self.template = template
self.groups = groups or []
self.prefix_topic = prefix_topic
self.requires = requires or []
self.requires_timeout = requires_timeout

def __str__(self):
return "%s:%s-%s" % (self.namespace, self.name, self.version)
Expand Down
107 changes: 86 additions & 21 deletions brewtils/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class Plugin(object):
- ``display_name``
- ``group``
- ``groups``
- ``require``
- ``requires``
- ``requires_timeout``

Connection information tells the Plugin how to communicate with Beer-garden. The
most important of these is the ``bg_host`` (to tell the plugin where to find the
Expand Down Expand Up @@ -172,6 +175,10 @@ class Plugin(object):
instance_name (str): Instance name
namespace (str): Namespace name

require (str): Required system dependency
requires (list): Required systems dependencies
requires_timeout (int): Timeout to wait for dependencies

group (str): Grouping label applied to plugin
groups (list): Grouping labels applied to plugin

Expand Down Expand Up @@ -256,20 +263,24 @@ def run(self):
"attribute to an instance of a class decorated with @brewtils.system"
)

self._startup()
self._logger.info("Plugin %s has started", self.unique_name)

try:
# Need the timeout param so this works correctly in Python 2
while not self._shutdown_event.wait(timeout=0.1):
pass
except KeyboardInterrupt:
self._logger.debug("Received KeyboardInterrupt - shutting down")
except Exception as ex:
self._logger.exception("Exception during wait, shutting down: %s", ex)
self._startup()
self._logger.info("Plugin %s has started", self.unique_name)

self._shutdown()
self._logger.info("Plugin %s has terminated", self.unique_name)
try:
# Need the timeout param so this works correctly in Python 2
while not self._shutdown_event.wait(timeout=0.1):
pass
except KeyboardInterrupt:
self._logger.debug("Received KeyboardInterrupt - shutting down")
except Exception as ex:
self._logger.exception("Exception during wait, shutting down: %s", ex)

self._shutdown()
except PluginValidationError:
self._shutdown(status="ERROR")
finally:
self._logger.info("Plugin %s has terminated", self.unique_name)

@property
def client(self):
Expand Down Expand Up @@ -368,6 +379,35 @@ def _hook(exc_type, exc_value, traceback):

sys.excepthook = _hook

def get_system_dependency(self, require, timeout=300):
wait_time = 0.1
while timeout > 0:
system = self._ez_client.find_unique_system(name=require)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to check, but if we have access to the Garden name of the local plugin, we should pass that forward as a filter criteria.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local=True seems to be ok

if (
system
and system.instances
and any("RUNNING" == instance.status for instance in system.instances)
):
return system
self._wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't give it the wait time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now using the wait time

self.logger.error(
f"Waiting {wait_time:.1f} seconds before next attempt for {self._system} "
f"dependency for {require}"
)
timeout = timeout - wait_time
wait_time = min(wait_time * 2, 30)

raise PluginValidationError(
f"Failed to resolve {self._system} dependency for {require}"
)

def await_dependencies(self, config):
for req in config.requires:
system = self.get_system_dependency(req, config.requires_timeout)
self.logger.info(
f"Resolved system {system} for {req}: {config.name} {config.instance_name}"
)

def _startup(self):
"""Plugin startup procedure

Expand Down Expand Up @@ -406,12 +446,20 @@ def _startup(self):
self._logger.debug("Initializing and starting processors")
self._admin_processor, self._request_processor = self._initialize_processors()
self._admin_processor.startup()
self._request_processor.startup()

self._logger.debug("Setting signal handlers")
self._set_signal_handlers()
try:
if self._config.requires:
self.await_dependencies(self._config)
except PluginValidationError:
raise
else:
self._start()
self._request_processor.startup()
finally:
self._logger.debug("Setting signal handlers")
self._set_signal_handlers()

def _shutdown(self):
def _shutdown(self, status="STOPPED"):
"""Plugin shutdown procedure

This method gracefully stops the plugin. When it completes the plugin should be
Expand All @@ -422,14 +470,18 @@ def _shutdown(self):
self._shutdown_event.set()

self._logger.debug("Shutting down processors")
self._request_processor.shutdown()
# Join will cause an exception if processor thread wasn't started
try:
self._request_processor.shutdown()
except RuntimeError:
pass
self._admin_processor.shutdown()

try:
self._ez_client.update_instance(self._instance.id, new_status="STOPPED")
self._ez_client.update_instance(self._instance.id, new_status=status)
except Exception:
self._logger.warning(
"Unable to notify Beer-garden that this plugin is STOPPED, so this "
f"Unable to notify Beer-garden that this plugin is {status}, so this "
"plugin's status may be incorrect in Beer-garden"
)

Expand Down Expand Up @@ -540,6 +592,7 @@ def _initialize_system(self):
"icon_name": self._system.icon_name,
"template": self._system.template,
"groups": self._system.groups,
"requires": self._system.requires,
}

# And if this particular instance doesn't exist we want to add it
Expand Down Expand Up @@ -590,13 +643,13 @@ def _initialize_processors(self):
thread_name="Admin Consumer",
queue_name=self._instance.queue_info["admin"]["name"],
max_concurrent=1,
**common_args
**common_args,
)
request_consumer = RequestConsumer.create(
thread_name="Request Consumer",
queue_name=self._instance.queue_info["request"]["name"],
max_concurrent=self._config.max_concurrent,
**common_args
**common_args,
)

# Both RequestProcessors need an updater
Expand Down Expand Up @@ -635,6 +688,13 @@ def _start(self):
self._instance.id, new_status="RUNNING"
)

def _wait(self):
"""Handle wait request"""
# Set the status to wait
self._instance = self._ez_client.update_instance(
self._instance.id, new_status="AWAITING_SYSTEM"
)

def _stop(self):
"""Handle stop Request"""
# Because the run() method is on a 0.1s sleep there's a race regarding if the
Expand Down Expand Up @@ -847,6 +907,9 @@ def _setup_system(self, system, plugin_kwargs):
if self._config.group:
self._config.groups.append(self._config.group)

if self._config.require:
self._config.requires.append(self._config.require)

system = System(
name=self._config.name,
version=self._config.version,
Expand All @@ -860,6 +923,8 @@ def _setup_system(self, system, plugin_kwargs):
template=self._config.template,
groups=self._config.groups,
prefix_topic=self._config.prefix_topic,
requires=self._config.requires,
requires_timeout=self._config.requires_timeout,
)

return system
Expand Down
5 changes: 5 additions & 0 deletions brewtils/rest/easy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ def update_system(self, system_id, new_commands=None, **kwargs):
icon_name (str): New System icon name
template (str): New System template
groups (list): New System groups
requires (list): New System dependencies

Returns:
System: The updated system
Expand All @@ -424,6 +425,10 @@ def update_system(self, system_id, new_commands=None, **kwargs):
if groups:
operations.append(PatchOperation("replace", "/groups", groups))

requires = kwargs.pop("requires", [])
if requires:
operations.append(PatchOperation("replace", "/requires", requires))

# The remaining kwargs are all strings
# Sending an empty string (instead of None) ensures they're actually cleared
for key, value in kwargs.items():
Expand Down
2 changes: 2 additions & 0 deletions brewtils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ class SystemSchema(BaseSchema):
template = fields.Str(allow_none=True)
groups = fields.List(fields.Str(), allow_none=True)
prefix_topic = fields.Str(allow_none=True)
requires = fields.List(fields.Str(), allow_none=True)
requires_timeout = fields.Integer(allow_none=True)


class SystemDomainIdentifierSchema(BaseSchema):
Expand Down
17 changes: 17 additions & 0 deletions brewtils/specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ def _is_json_dict(s):
"<garden name>.<namespace>.<system name>.<system version>.<system instance>.<command name>"
"if a prefix is provided, then it is `<prefix>.<command name>`",
},
"require": {
"type": "str",
"description": "A requires system dependency",
"required": False,
},
"requires": {
"type": "list",
"description": "The required system dependencies",
"items": {"name": {"type": "str"}},
"required": False,
"default": [],
},
"requires_timeout": {
"type": "int",
"description": "The dependency timeout to use",
"default": 300,
},
}

_PLUGIN_SPEC = {
Expand Down
2 changes: 2 additions & 0 deletions brewtils/test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def system_dict(instance_dict, command_dict, command_dict_2, system_id):
"template": "<html>template</html>",
"groups": ["GroupB", "GroupA"],
"prefix_topic": "custom_topic",
"requires": ["SystemA"],
"requires_timeout": 300,
}


Expand Down
30 changes: 30 additions & 0 deletions test/plugin_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ def test_groups(self, client):

assert plugin._system.groups == ["GroupB", "GroupA"]

def test_requires(self, client):
logger = Mock()
plugin = Plugin(
client,
bg_host="host1",
bg_port=2338,
bg_url_prefix="/beer/",
ssl_enabled=False,
ca_verify=False,
logger=logger,
max_concurrent=1,
require="SystemA",
requires=["SystemB"],
)

assert plugin._system.requires == ["SystemB", "SystemA"]

def test_kwargs(self, client, bg_system):
logger = Mock()

Expand All @@ -152,6 +169,9 @@ def test_kwargs(self, client, bg_system):
group="GroupA",
groups=["GroupB"],
prefix_topic="custom.topic",
require="SystemA",
requires=["SystemB"],
requires_timeout=200,
)

assert plugin._logger == logger
Expand All @@ -164,6 +184,10 @@ def test_kwargs(self, client, bg_system):
assert "GroupA" == plugin._config.group
assert "GroupB" in plugin._config.groups
assert "GroupA" not in plugin._config.groups
assert "SystemA" == plugin._config.require
assert "SystemB" in plugin._config.requires
assert "SystemA" not in plugin._config.requires
assert plugin._config.requires_timeout == 200

def test_env(self, client, bg_system):
os.environ["BG_HOST"] = "remotehost"
Expand All @@ -173,6 +197,7 @@ def test_env(self, client, bg_system):
os.environ["BG_CA_VERIFY"] = "False"
os.environ["BG_GROUP"] = "GroupA"
os.environ["BG_PREFIX_TOPIC"] = "custom.topic"
os.environ["BG_REQUIRE"] = "SystemA"

plugin = Plugin(client, system=bg_system, max_concurrent=1)

Expand All @@ -183,6 +208,7 @@ def test_env(self, client, bg_system):
assert plugin._config.ca_verify is False
assert plugin._config.prefix_topic == "custom.topic"
assert "GroupA" == plugin._config.group
assert "SystemA" in plugin._config.require

def test_conflicts(self, client, bg_system):
os.environ["BG_HOST"] = "remotehost"
Expand Down Expand Up @@ -504,6 +530,7 @@ def test_system_exists(
display_name=bg_system.display_name,
template="<html>template</html>",
groups=bg_system.groups,
requires=bg_system.requires,
)
# assert ez_client.create_system.return_value == plugin.system

Expand Down Expand Up @@ -534,6 +561,7 @@ def test_new_instance(self, plugin, ez_client, bg_system, bg_instance):
template="<html>template</html>",
add_instance=ANY,
groups=["GroupB", "GroupA"],
requires=["SystemA"],
)
assert ez_client.update_system.call_args[1]["add_instance"].name == new_name

Expand Down Expand Up @@ -811,6 +839,7 @@ def test_construct_system(self, plugin):
"icon_name": "icon",
"display_name": "display_name",
"metadata": '{"foo": "bar"}',
"requires": [],
}
)

Expand All @@ -825,6 +854,7 @@ def _validate_system(new_system):
assert new_system.icon_name == "icon"
assert new_system.metadata == {"foo": "bar"}
assert new_system.display_name == "display_name"
assert new_system.requires == []


class TestValidateSystem(object):
Expand Down
Loading