From e1f8df488a0462b9f3c9091528e194068437e314 Mon Sep 17 00:00:00 2001 From: 1maple1 <160027655+1maple1@users.noreply.github.com> Date: Thu, 19 Sep 2024 17:42:16 +0000 Subject: [PATCH] Stop consumer when waiting for dependencies and restart when running --- brewtils/plugin.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/brewtils/plugin.py b/brewtils/plugin.py index 729d76c3..2b6bd11e 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -418,31 +418,22 @@ def get_system_dependency(self, require, timeout=300): def await_dependencies(self, requires, config): for req in requires: system = self.get_system_dependency(req, config.requires_timeout) - self.logger.info( + self.logger.debug( f"Resolved system {system} for {req}: {config.name} {config.instance_name}" ) def check_dependencies(self, next_dependency_check: int): if self._system.requires and self.get_timestamp() >= next_dependency_check: - consuming = ( - self._request_processor.consumer._consumer_tag - in self._request_processor.consumer._channel.consumer_tags - ) try: self.await_dependencies(self._system.requires, self._config) - if not consuming: - self._logger.debug( - f"Dependency resolved {self.unique_name} - start consuming" - ) - self._request_processor.consumer.start_consuming() if "RUNNING" != self._instance.status: self._start() - # Update the plugin status and shutdown the request consumer + self.logger.debug( + f"Dependency check resolved {self.unique_name}, start consuming" + ) + self._request_processor.consumer.start_consuming() except PluginValidationError: - self._logger.debug( - f"Dependency problem {self.unique_name} - stop consuming" - ) - self._request_processor.consumer.stop_consuming() + self._logger.debug(f"Dependency check timeout {self.unique_name}") return True def _startup(self): @@ -731,6 +722,12 @@ def _wait(self, timeout): self._instance = self._ez_client.update_instance( self._instance.id, new_status="AWAITING_SYSTEM" ) + if self._request_processor.consumer._consumer_tag: + self.logger.debug( + f"Waiting for dependency {self.unique_name}, stop consuming" + ) + self._request_processor.consumer.stop_consuming() + self._request_processor.consumer._consumer_tag = None self._shutdown_event.wait(timeout) def _stop(self):