diff --git a/CHANGELOG.rst b/CHANGELOG.rst index cb72571d..cd2c48b6 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,16 @@ Brewtils Changelog ================== +3.29.0 +------ +TBD + +- Added new annotation/configuration support for shutdown functions. These functions will be executed at the start + of the shutdown process. +- Added new annotation/configuration support for startup functions. These functions will be executed after `Plugin().run()` + has completed startup processes + + 3.28.0 ------ 10/9/24 diff --git a/brewtils/__init__.py b/brewtils/__init__.py index 0fe5ec1d..fb513096 100644 --- a/brewtils/__init__.py +++ b/brewtils/__init__.py @@ -2,13 +2,21 @@ from brewtils.__version__ import __version__ from brewtils.auto_decorator import AutoDecorator from brewtils.config import get_argument_parser, get_connection_info, load_config -from brewtils.decorators import client, command, parameter, subscribe, system +from brewtils.decorators import ( + client, + command, + parameter, + shutdown, + startup, + subscribe, + system, +) from brewtils.log import configure_logging -from brewtils.plugin import ( - get_current_request_read_only, +from brewtils.plugin import ( # noqa F401 Plugin, RemotePlugin, -) # noqa F401 + get_current_request_read_only, +) from brewtils.rest import normalize_url_prefix from brewtils.rest.easy_client import EasyClient, get_easy_client from brewtils.rest.publish_client import PublishClient @@ -19,6 +27,8 @@ "client", "command", "parameter", + "shutdown", + "startup", "system", "subscribe", "Plugin", diff --git a/brewtils/decorators.py b/brewtils/decorators.py index d6e332ba..dde4da9a 100644 --- a/brewtils/decorators.py +++ b/brewtils/decorators.py @@ -448,6 +448,45 @@ def cmd1(self, **kwargs): return _wrapped +def shutdown(_wrapped=None): + """Decorator for specifying a function to run before a plugin is shutdown. + + Functions called should short actions. Locally hosted plugin threads will be + pruned if not stopped within the plugin.timeout.shutdown time window + + for example:: + + @shutdown + def pre_shutdown(self): + # Run pre-shutdown processing + return + + Args: + _wrapped: The function to decorate. This is handled as a positional argument and + shouldn't be explicitly set. + """ + _wrapped._shutdown = True + return _wrapped + + +def startup(_wrapped=None): + """Decorator for specifying a function to run before a plugin is running. + + for example:: + + @startup + def pre_running(self): + # Run pre-running processing + return + + Args: + _wrapped: The function to decorate. This is handled as a positional argument and + shouldn't be explicitly set. + """ + _wrapped._startup = True + return _wrapped + + def subscribe(_wrapped=None, topic: str = None, topics=[]): """Decorator for specifiying topic to listen to. @@ -490,6 +529,42 @@ def returnTrue(self): return _wrapped +def _parse_shutdown_functions(client): + # type: (object) -> List[Callable] + """Get a list of callable fields labeled with the shutdown annotation + + This will iterate over everything returned from dir, looking for metadata added + by the shutdown decorator. + """ + + shutdown_functions = [] + + for attr in dir(client): + method = getattr(client, attr) + if callable(method) and getattr(method, "_shutdown", False): + shutdown_functions.append(method) + + return shutdown_functions + + +def _parse_startup_functions(client): + # type: (object) -> List[Callable] + """Get a list of callable fields labeled with the startup annotation + + This will iterate over everything returned from dir, looking for metadata added + by the startup decorator. + """ + + startup_functions = [] + + for attr in dir(client): + method = getattr(client, attr) + if callable(method) and getattr(method, "_startup", False): + startup_functions.append(method) + + return startup_functions + + def _parse_client(client): # type: (object) -> List[Command] """Get a list of Beergarden Commands from a client object diff --git a/brewtils/plugin.py b/brewtils/plugin.py index 87458ad2..c7cfd1f4 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -7,17 +7,21 @@ import signal import sys import threading +from datetime import datetime, timezone from pathlib import Path import appdirs from box import Box -from datetime import datetime, timezone from packaging.version import Version from requests import ConnectionError as RequestsConnectionError import brewtils from brewtils.config import load_config -from brewtils.decorators import _parse_client +from brewtils.decorators import ( + _parse_client, + _parse_shutdown_functions, + _parse_startup_functions, +) from brewtils.display import resolve_template from brewtils.errors import ( ConflictError, @@ -183,6 +187,22 @@ class Plugin(object): group (str): Grouping label applied to plugin groups (list): Grouping labels applied to plugin + client_shutdown_function (str): Function to be executed at start of shutdown + within client code + client_shutdown_functions (list): Functions to be executed at start of shutdown + within client code + + shutdown_function (func): Function to be executed at start of shutdown + shutdown_functions (list): Functions to be executed at start of shutdown + + client_startup_function (str): Function to be executed at start of running plugin + within client code + client_startup_functions (list): Functions to be executed at start of running plugin + within client code + + startup_function (func): Function to be executed at start of running plugin + startup_functions (list): Functions to be executed at start of running plugin + prefix_topic (str): Prefix for Generated Command Topics logger (:py:class:`logging.Logger`): Logger that will be used by the Plugin. @@ -218,6 +238,45 @@ def __init__(self, client=None, system=None, logger=None, **kwargs): self._custom_logger = False self._logger = self._setup_logging(logger=logger, **kwargs) + # Need to pop out shutdown functions because these are not processed + # until shutdown + self.shutdown_functions = [] + for shutdown_function in kwargs.pop("shutdown_functions", []): + if callable(shutdown_function): + self.shutdown_functions.append(shutdown_function) + else: + raise PluginValidationError( + f"Provided un-callable shutdown function {shutdown_function}" + ) + + if "shutdown_function" in kwargs: + shutdown_function = kwargs.pop("shutdown_function") + if callable(shutdown_function): + self.shutdown_functions.append(shutdown_function) + else: + raise PluginValidationError( + f"Provided un-callable shutdown function {shutdown_function}" + ) + + self.startup_functions = [] + + for startup_function in kwargs.pop("startup_functions", []): + if callable(startup_function): + self.startup_functions.append(startup_function) + else: + raise PluginValidationError( + f"Provided un-callable startup function {shutdown_function}" + ) + + if "startup_function" in kwargs: + startup_function = kwargs.pop("startup_function") + if callable(startup_function): + self.startup_functions.append(startup_function) + else: + raise PluginValidationError( + f"Provided un-callable startup function {shutdown_function}" + ) + # Now that logging is configured we can load the real config self._config = load_config(**kwargs) @@ -266,6 +325,18 @@ def run(self): try: self._startup() + + # Run provided startup functions + self._logger.debug("About to run annotated startup functions") + startup_functions = _parse_startup_functions(self._client) + startup_functions.extend(self.startup_functions) + startup_functions.extend(self._config.client_startup_functions) + + if getattr(self._config, "client_startup_function"): + startup_functions.append(self._config.client_startup_function) + + self._run_configured_functions(startup_functions) + self._logger.info("Plugin %s has started", self.unique_name) try: @@ -300,6 +371,37 @@ def client(self, new_client): self._set_client(new_client) + def _run_configured_functions(self, functions): + executed_functions = [] + + for function in functions: + if callable(function): + if function not in executed_functions: + function() + executed_functions.append(function) + + elif self._client and hasattr(self._client, function): + client_function = getattr(self._client, function) + if callable(client_function): + if client_function not in executed_functions: + client_function() + executed_functions.append(client_function) + else: + self._logger.error( + f"Provided non callable function for function: {function}" + ) + elif self._client: + self._logger.error( + ( + "Provided function not existing on client " + f"for function: {function}" + ) + ) + else: + self._logger.error( + f"No client provided to check for function: {function}" + ) + def _set_client(self, new_client): # Several _system properties can come from the client, so update if needed if not self._system.name: @@ -490,7 +592,22 @@ def _shutdown(self, status="STOPPED"): considered in a "stopped" state - the message processors shut down and all connections closed. """ + self._logger.debug("About to shut down plugin %s", self.unique_name) + + # Run shutdown functions prior to setting shutdown event to allow for + # any functions that might generate Requests + + self._logger.debug("About to run shutdown functions") + shutdown_functions = _parse_shutdown_functions(self._client) + shutdown_functions.extend(self.shutdown_functions) + shutdown_functions.extend(self._config.client_shutdown_functions) + + if getattr(self._config, "client_shutdown_function"): + shutdown_functions.append(self._config.client_shutdown_function) + + self._run_configured_functions(shutdown_functions) + self._shutdown_event.set() self._logger.debug("Shutting down processors") diff --git a/brewtils/rest/publish_client.py b/brewtils/rest/publish_client.py index be96fac2..fc4fdfe3 100644 --- a/brewtils/rest/publish_client.py +++ b/brewtils/rest/publish_client.py @@ -90,7 +90,6 @@ def publish( """ if _topic is None: - if brewtils.plugin._system.prefix_topic: _topic = brewtils.plugin._system.prefix_topic elif ( diff --git a/brewtils/specification.py b/brewtils/specification.py index e3829563..2cbf112b 100644 --- a/brewtils/specification.py +++ b/brewtils/specification.py @@ -188,6 +188,30 @@ def _is_json_dict(s): "description": "The dependency timeout to use", "default": 300, }, + "client_shutdown_function": { + "type": "str", + "description": "The function in client to be executed at shutdown", + "required": False, + }, + "client_shutdown_functions": { + "type": "list", + "description": "The functions in client to be executed at shutdown", + "items": {"name": {"type": "str"}}, + "required": False, + "default": [], + }, + "client_startup_function": { + "type": "str", + "description": "The function in client to be executed at run", + "required": False, + }, + "client_startup_functions": { + "type": "list", + "description": "The functions in client to be executed at run", + "items": {"name": {"type": "str"}}, + "required": False, + "default": [], + }, } _PLUGIN_SPEC = { diff --git a/test/decorators_test.py b/test/decorators_test.py index ed01ca8a..33c4a7b3 100644 --- a/test/decorators_test.py +++ b/test/decorators_test.py @@ -28,6 +28,8 @@ parameters, plugin_param, register, + shutdown, + startup, system, ) from brewtils.errors import PluginParamError @@ -739,7 +741,6 @@ def cmd3(foo): assert cmd3.parameters[0].type == "String" def test_literal_mapping(self, basic_param): - del basic_param["type"] @parameter(**basic_param, type=str) @@ -1439,3 +1440,43 @@ def test_plugin_param(self, cmd, parameter_dict): _initialize_parameter(cmd.parameters[0]), _initialize_parameter(**parameter_dict), ) + + +class TestShutdown(object): + """Test shutdown decorator""" + + def test_shutdown(self): + @shutdown + def cmd(): + return True + + assert hasattr(cmd, "_shutdown") + assert cmd._shutdown + + def test_missing_shutdown(self): + @command + def cmd(): + return True + + assert not hasattr(cmd, "_shutdown") + assert not getattr(cmd, "_shutdown", False) + + +class TestStartup(object): + """Test shutdown decorator""" + + def test_startup(self): + @startup + def cmd(): + return True + + assert hasattr(cmd, "_startup") + assert cmd._startup + + def test_missing_startup(self): + @command + def cmd(): + return True + + assert not hasattr(cmd, "_startup") + assert not getattr(cmd, "_startup", False) diff --git a/test/plugin_test.py b/test/plugin_test.py index 51ec4688..d71a0f90 100644 --- a/test/plugin_test.py +++ b/test/plugin_test.py @@ -429,6 +429,26 @@ def test_update_error(self, caplog, plugin, ez_client, bg_instance): assert len(caplog.records) == 1 + def test_shutdown_plugin_shutdown_function_executed(self, plugin): + plugin._request_processor = Mock() + plugin._admin_processor = Mock() + + mock_shutdown_function = Mock() + plugin.shutdown_functions = [mock_shutdown_function] + + plugin._shutdown() + assert mock_shutdown_function.called is True + + def test_shutdown_config_shutdown_function_executed(self, plugin): + plugin._request_processor = Mock() + plugin._admin_processor = Mock() + + mock_shutdown_function = Mock() + plugin._config.client_shutdown_functions = [mock_shutdown_function] + + plugin._shutdown() + assert mock_shutdown_function.called is True + class TestInitializeLogging(object): @pytest.fixture(autouse=True)