From eeadbf5b6371a5a91e2f55a44b86c6eee5765d9d Mon Sep 17 00:00:00 2001 From: 1maple1 <160027655+1maple1@users.noreply.github.com> Date: Tue, 17 Sep 2024 13:25:09 +0000 Subject: [PATCH 1/6] Periodically monitor plugin dependency status --- brewtils/plugin.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/brewtils/plugin.py b/brewtils/plugin.py index b5b37e8b..33de5098 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -11,6 +11,7 @@ import appdirs from box import Box +from datetime import datetime, UTC from packaging.version import Version from requests import ConnectionError as RequestsConnectionError @@ -268,9 +269,12 @@ def run(self): self._logger.info("Plugin %s has started", self.unique_name) try: + check_interval = 60 + next_dependency_check = self.get_timestamp(check_interval) # Need the timeout param so this works correctly in Python 2 while not self._shutdown_event.wait(timeout=0.1): - pass + if self.check_dependencies(next_dependency_check): + next_dependency_check = self.get_timestamp(check_interval) except KeyboardInterrupt: self._logger.debug("Received KeyboardInterrupt - shutting down") except Exception as ex: @@ -382,6 +386,13 @@ def _hook(exc_type, exc_value, traceback): sys.excepthook = _hook + @staticmethod + def get_timestamp(add_time: int = None): + current_timestamp = int(datetime.now(UTC).timestamp()) + if add_time: + return current_timestamp + add_time + return current_timestamp + def get_system_dependency(self, require, timeout=300): wait_time = 0.1 while timeout > 0: @@ -411,6 +422,29 @@ def await_dependencies(self, requires, config): f"Resolved system {system} for {req}: {config.name} {config.instance_name}" ) + def check_dependencies(self, next_dependency_check: int): + if self._system.requires and self.get_timestamp() >= next_dependency_check: + consuming = ( + self._request_processor.consumer._consumer_tag + in self._request_processor.consumer._channel.consumer_tags + ) + try: + self.await_dependencies(self._system.requires, self._config) + if not consuming: + self._logger.debug( + f"Dependency resolved {self.unique_name} - start consuming" + ) + self._request_processor.consumer.start_consuming() + if "RUNNING" != self._instance.status: + self._start() + # Update the plugin status and shutdown the request consumer + except PluginValidationError: + self._logger.debug( + f"Dependency problem {self.unique_name} - stop consuming" + ) + self._request_processor.consumer.stop_consuming() + return True + def _startup(self): """Plugin startup procedure From dc6ed7aa66896f5df2b5903831f9773c4238d8a0 Mon Sep 17 00:00:00 2001 From: 1maple1 <160027655+1maple1@users.noreply.github.com> Date: Tue, 17 Sep 2024 14:27:24 +0000 Subject: [PATCH 2/6] Fix datetime utc compatibility --- brewtils/plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/brewtils/plugin.py b/brewtils/plugin.py index 33de5098..729d76c3 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -11,7 +11,7 @@ import appdirs from box import Box -from datetime import datetime, UTC +from datetime import datetime, timezone from packaging.version import Version from requests import ConnectionError as RequestsConnectionError @@ -388,7 +388,7 @@ def _hook(exc_type, exc_value, traceback): @staticmethod def get_timestamp(add_time: int = None): - current_timestamp = int(datetime.now(UTC).timestamp()) + current_timestamp = int(datetime.now(timezone.utc).timestamp()) if add_time: return current_timestamp + add_time return current_timestamp From e1f8df488a0462b9f3c9091528e194068437e314 Mon Sep 17 00:00:00 2001 From: 1maple1 <160027655+1maple1@users.noreply.github.com> Date: Thu, 19 Sep 2024 17:42:16 +0000 Subject: [PATCH 3/6] Stop consumer when waiting for dependencies and restart when running --- brewtils/plugin.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/brewtils/plugin.py b/brewtils/plugin.py index 729d76c3..2b6bd11e 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -418,31 +418,22 @@ def get_system_dependency(self, require, timeout=300): def await_dependencies(self, requires, config): for req in requires: system = self.get_system_dependency(req, config.requires_timeout) - self.logger.info( + self.logger.debug( f"Resolved system {system} for {req}: {config.name} {config.instance_name}" ) def check_dependencies(self, next_dependency_check: int): if self._system.requires and self.get_timestamp() >= next_dependency_check: - consuming = ( - self._request_processor.consumer._consumer_tag - in self._request_processor.consumer._channel.consumer_tags - ) try: self.await_dependencies(self._system.requires, self._config) - if not consuming: - self._logger.debug( - f"Dependency resolved {self.unique_name} - start consuming" - ) - self._request_processor.consumer.start_consuming() if "RUNNING" != self._instance.status: self._start() - # Update the plugin status and shutdown the request consumer + self.logger.debug( + f"Dependency check resolved {self.unique_name}, start consuming" + ) + self._request_processor.consumer.start_consuming() except PluginValidationError: - self._logger.debug( - f"Dependency problem {self.unique_name} - stop consuming" - ) - self._request_processor.consumer.stop_consuming() + self._logger.debug(f"Dependency check timeout {self.unique_name}") return True def _startup(self): @@ -731,6 +722,12 @@ def _wait(self, timeout): self._instance = self._ez_client.update_instance( self._instance.id, new_status="AWAITING_SYSTEM" ) + if self._request_processor.consumer._consumer_tag: + self.logger.debug( + f"Waiting for dependency {self.unique_name}, stop consuming" + ) + self._request_processor.consumer.stop_consuming() + self._request_processor.consumer._consumer_tag = None self._shutdown_event.wait(timeout) def _stop(self): From 4b12c821b002eb4c3aa1693500689ec9bd52fc65 Mon Sep 17 00:00:00 2001 From: 1maple1 <160027655+1maple1@users.noreply.github.com> Date: Thu, 19 Sep 2024 17:58:27 +0000 Subject: [PATCH 4/6] Update CHANGELOG --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9ca8b8f7..cb771aa4 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,6 +5,7 @@ Brewtils Changelog ------ TBD +- Plugin will periodically monitor if required dependencies are running to update status accordingly - Updated SystemClient to utilize the local Garden name for default Namespace if none can be determined 3.27.2 From 900e8a735ba0ae7e437c435f44fadef0372158fd Mon Sep 17 00:00:00 2001 From: 1maple1 <160027655+1maple1@users.noreply.github.com> Date: Wed, 25 Sep 2024 07:32:30 -0400 Subject: [PATCH 5/6] Update plugin.py to not reset consumer_tag --- brewtils/plugin.py | 1 - 1 file changed, 1 deletion(-) diff --git a/brewtils/plugin.py b/brewtils/plugin.py index 2b6bd11e..a32311d4 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -727,7 +727,6 @@ def _wait(self, timeout): f"Waiting for dependency {self.unique_name}, stop consuming" ) self._request_processor.consumer.stop_consuming() - self._request_processor.consumer._consumer_tag = None self._shutdown_event.wait(timeout) def _stop(self): From 7fe1140aa5eb575da15c3d5a6e559725a75881d8 Mon Sep 17 00:00:00 2001 From: 1maple1 <160027655+1maple1@users.noreply.github.com> Date: Wed, 2 Oct 2024 13:12:56 -0400 Subject: [PATCH 6/6] Removing consumer manipulations from plugin.py --- brewtils/plugin.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/brewtils/plugin.py b/brewtils/plugin.py index a32311d4..bd12ff18 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -428,10 +428,6 @@ def check_dependencies(self, next_dependency_check: int): self.await_dependencies(self._system.requires, self._config) if "RUNNING" != self._instance.status: self._start() - self.logger.debug( - f"Dependency check resolved {self.unique_name}, start consuming" - ) - self._request_processor.consumer.start_consuming() except PluginValidationError: self._logger.debug(f"Dependency check timeout {self.unique_name}") return True @@ -722,11 +718,6 @@ def _wait(self, timeout): self._instance = self._ez_client.update_instance( self._instance.id, new_status="AWAITING_SYSTEM" ) - if self._request_processor.consumer._consumer_tag: - self.logger.debug( - f"Waiting for dependency {self.unique_name}, stop consuming" - ) - self._request_processor.consumer.stop_consuming() self._shutdown_event.wait(timeout) def _stop(self):