diff --git a/brewtils/plugin.py b/brewtils/plugin.py index b5b37e8b..33de5098 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -11,6 +11,7 @@ import appdirs from box import Box +from datetime import datetime, UTC from packaging.version import Version from requests import ConnectionError as RequestsConnectionError @@ -268,9 +269,12 @@ def run(self): self._logger.info("Plugin %s has started", self.unique_name) try: + check_interval = 60 + next_dependency_check = self.get_timestamp(check_interval) # Need the timeout param so this works correctly in Python 2 while not self._shutdown_event.wait(timeout=0.1): - pass + if self.check_dependencies(next_dependency_check): + next_dependency_check = self.get_timestamp(check_interval) except KeyboardInterrupt: self._logger.debug("Received KeyboardInterrupt - shutting down") except Exception as ex: @@ -382,6 +386,13 @@ def _hook(exc_type, exc_value, traceback): sys.excepthook = _hook + @staticmethod + def get_timestamp(add_time: int = None): + current_timestamp = int(datetime.now(UTC).timestamp()) + if add_time: + return current_timestamp + add_time + return current_timestamp + def get_system_dependency(self, require, timeout=300): wait_time = 0.1 while timeout > 0: @@ -411,6 +422,29 @@ def await_dependencies(self, requires, config): f"Resolved system {system} for {req}: {config.name} {config.instance_name}" ) + def check_dependencies(self, next_dependency_check: int): + if self._system.requires and self.get_timestamp() >= next_dependency_check: + consuming = ( + self._request_processor.consumer._consumer_tag + in self._request_processor.consumer._channel.consumer_tags + ) + try: + self.await_dependencies(self._system.requires, self._config) + if not consuming: + self._logger.debug( + f"Dependency resolved {self.unique_name} - start consuming" + ) + self._request_processor.consumer.start_consuming() + if "RUNNING" != self._instance.status: + self._start() + # Update the plugin status and shutdown the request consumer + except PluginValidationError: + self._logger.debug( + f"Dependency problem {self.unique_name} - stop consuming" + ) + self._request_processor.consumer.stop_consuming() + return True + def _startup(self): """Plugin startup procedure