Skip to content

Commit

Permalink
Stop consumer when waiting for dependencies and restart when running
Browse files Browse the repository at this point in the history
  • Loading branch information
1maple1 committed Sep 19, 2024
1 parent dc6ed7a commit e1f8df4
Showing 1 changed file with 12 additions and 15 deletions.
27 changes: 12 additions & 15 deletions brewtils/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,31 +418,22 @@ def get_system_dependency(self, require, timeout=300):
def await_dependencies(self, requires, config):
for req in requires:
system = self.get_system_dependency(req, config.requires_timeout)
self.logger.info(
self.logger.debug(
f"Resolved system {system} for {req}: {config.name} {config.instance_name}"
)

def check_dependencies(self, next_dependency_check: int):
if self._system.requires and self.get_timestamp() >= next_dependency_check:
consuming = (
self._request_processor.consumer._consumer_tag
in self._request_processor.consumer._channel.consumer_tags
)
try:
self.await_dependencies(self._system.requires, self._config)
if not consuming:
self._logger.debug(
f"Dependency resolved {self.unique_name} - start consuming"
)
self._request_processor.consumer.start_consuming()
if "RUNNING" != self._instance.status:
self._start()
# Update the plugin status and shutdown the request consumer
self.logger.debug(
f"Dependency check resolved {self.unique_name}, start consuming"
)
self._request_processor.consumer.start_consuming()
except PluginValidationError:
self._logger.debug(
f"Dependency problem {self.unique_name} - stop consuming"
)
self._request_processor.consumer.stop_consuming()
self._logger.debug(f"Dependency check timeout {self.unique_name}")
return True

def _startup(self):
Expand Down Expand Up @@ -731,6 +722,12 @@ def _wait(self, timeout):
self._instance = self._ez_client.update_instance(
self._instance.id, new_status="AWAITING_SYSTEM"
)
if self._request_processor.consumer._consumer_tag:
self.logger.debug(
f"Waiting for dependency {self.unique_name}, stop consuming"
)
self._request_processor.consumer.stop_consuming()
self._request_processor.consumer._consumer_tag = None
self._shutdown_event.wait(timeout)

def _stop(self):
Expand Down

0 comments on commit e1f8df4

Please sign in to comment.