diff --git a/eb_sqs/management/commands/healthcheck.py b/eb_sqs/management/commands/healthcheck.py new file mode 100644 index 0000000..e996617 --- /dev/null +++ b/eb_sqs/management/commands/healthcheck.py @@ -0,0 +1,34 @@ +import sys + +import logging +from datetime import timedelta + +from django.core.management import BaseCommand +from django.utils import timezone +from django.utils.dateparse import parse_datetime + +from eb_sqs import settings + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = 'Checks the SQS worker is healthy, and if not returns a failure code' + + def add_arguments(self, parser): + pass + + def handle(self, *args, **options): + try: + with open(settings.HEALTHCHECK_FILE_NAME, 'r') as file: + last_healthcheck_date_str = file.readlines()[0] + + if parse_datetime(last_healthcheck_date_str) < timezone.now() - timedelta(seconds=settings.HEALTHCHECK_UNHEALTHY_PERIOD_S): + self._return_failure() + except Exception: + self._return_failure() + + @staticmethod + def _return_failure(): + logger.warning('[django-eb-sqs] Health check failed') + sys.exit(1) diff --git a/eb_sqs/settings.py b/eb_sqs/settings.py index 291ff7f..c7ae840 100644 --- a/eb_sqs/settings.py +++ b/eb_sqs/settings.py @@ -37,3 +37,7 @@ QUEUE_MESSAGE_RETENTION = getattr(settings, 'EB_SQS_QUEUE_MESSAGE_RETENTION', '1209600') # type: str QUEUE_VISIBILITY_TIMEOUT = getattr(settings, 'EB_SQS_QUEUE_VISIBILITY_TIMEOUT', '300') # type: str + +MIN_HEALTHCHECK_WRITE_PERIOD_S = getattr(settings, 'EB_SQS_MIN_HEALTHCHECK_WRITE_PERIOD_S', 10) # type: int +HEALTHCHECK_UNHEALTHY_PERIOD_S = getattr(settings, 'EB_SQS_HEALTHCHECK_UNHEALTHY_PERIOD_S', int(QUEUE_VISIBILITY_TIMEOUT)) # type: int +HEALTHCHECK_FILE_NAME = getattr(settings, 'EB_SQS_HEALTHCHECK_FILE_NAME', 'healthcheck.txt') # type: str diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index db8e34b..ec41c08 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -43,6 +43,9 @@ def process_queues(self, queue_names): static_queues = queues last_update_time = timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) + self.write_healthcheck_file() + last_healthcheck_time = timezone.now() + logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names))) worker = WorkerFactory.default().create() @@ -67,6 +70,10 @@ def process_queues(self, queue_names): logger.debug('[django-eb-sqs] Processing {} queues'.format(len(queues))) self.process_messages(queues, worker, static_queues) + if timezone.now() - timedelta(seconds=settings.MIN_HEALTHCHECK_WRITE_PERIOD_S) > last_healthcheck_time: + self.write_healthcheck_file() + last_healthcheck_time = timezone.now() + def process_messages(self, queues, worker, static_queues): # type: (list, Worker, list) -> None for queue in queues: @@ -144,3 +151,7 @@ def get_queues_by_prefixes(self, sqs, prefixes): queues += sqs.queues.filter(QueueNamePrefix=prefix) return queues + + def write_healthcheck_file(self): + with open(settings.HEALTHCHECK_FILE_NAME, 'w') as file: + file.write(timezone.now().isoformat()) diff --git a/setup.py b/setup.py index 17af45f..3009e02 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='django-eb-sqs', - version='1.16', + version='1.20', package_dir={'eb_sqs': 'eb_sqs'}, include_package_data=True, packages=find_packages(),