Skip to content

Commit

Permalink
Periodically monitor plugin dependency status
Browse files Browse the repository at this point in the history
  • Loading branch information
1maple1 committed Sep 17, 2024
1 parent c7c5972 commit eeadbf5
Showing 1 changed file with 35 additions and 1 deletion.
36 changes: 35 additions & 1 deletion brewtils/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import appdirs
from box import Box
from datetime import datetime, UTC
from packaging.version import Version
from requests import ConnectionError as RequestsConnectionError

Expand Down Expand Up @@ -268,9 +269,12 @@ def run(self):
self._logger.info("Plugin %s has started", self.unique_name)

try:
check_interval = 60
next_dependency_check = self.get_timestamp(check_interval)
# Need the timeout param so this works correctly in Python 2
while not self._shutdown_event.wait(timeout=0.1):
pass
if self.check_dependencies(next_dependency_check):
next_dependency_check = self.get_timestamp(check_interval)
except KeyboardInterrupt:
self._logger.debug("Received KeyboardInterrupt - shutting down")
except Exception as ex:
Expand Down Expand Up @@ -382,6 +386,13 @@ def _hook(exc_type, exc_value, traceback):

sys.excepthook = _hook

@staticmethod
def get_timestamp(add_time: int = None):
current_timestamp = int(datetime.now(UTC).timestamp())
if add_time:
return current_timestamp + add_time
return current_timestamp

def get_system_dependency(self, require, timeout=300):
wait_time = 0.1
while timeout > 0:
Expand Down Expand Up @@ -411,6 +422,29 @@ def await_dependencies(self, requires, config):
f"Resolved system {system} for {req}: {config.name} {config.instance_name}"
)

def check_dependencies(self, next_dependency_check: int):
if self._system.requires and self.get_timestamp() >= next_dependency_check:
consuming = (
self._request_processor.consumer._consumer_tag
in self._request_processor.consumer._channel.consumer_tags
)
try:
self.await_dependencies(self._system.requires, self._config)
if not consuming:
self._logger.debug(
f"Dependency resolved {self.unique_name} - start consuming"
)
self._request_processor.consumer.start_consuming()
if "RUNNING" != self._instance.status:
self._start()
# Update the plugin status and shutdown the request consumer
except PluginValidationError:
self._logger.debug(
f"Dependency problem {self.unique_name} - stop consuming"
)
self._request_processor.consumer.stop_consuming()
return True

def _startup(self):
"""Plugin startup procedure
Expand Down

0 comments on commit eeadbf5

Please sign in to comment.