Skip to content

Commit

Permalink
Allow selecting which celery worker to ping
Browse files Browse the repository at this point in the history
  • Loading branch information
sevdog committed Jul 6, 2023
1 parent c6ca36b commit 0a29784
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 5 deletions.
23 changes: 23 additions & 0 deletions docs/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,26 @@ Using `django.settings` you may exert more fine-grained control over the behavio
- Number
- `3`
- Specifies the maximum total time for a task to complete and return a result, including queue time.


Celery-Ping Health Check
-------------------

Using `django.settings` you may exert more fine-grained control over the behavior of the celery-ping health check

.. list-table:: Additional Settings
:widths: 25 10 10 55
:header-rows: 1

* - Name
- Type
- Default
- Description
* - `HEALTHCHECK_CELERY_PING_TIMEOUT`
- Number
- `1`
- Specifies the maximum total time (in seconds) for which "pong" responses are awaited.
* - `HEALTHCHECK_CELERY_PING_DESTINATION`
- List of Strings
- `None`
- Specifies the list of workers which will receive the "ping" request.
16 changes: 13 additions & 3 deletions health_check/contrib/celery_ping/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ class CeleryPingHealthCheck(BaseHealthCheckBackend):

def check_status(self):
timeout = getattr(settings, "HEALTHCHECK_CELERY_PING_TIMEOUT", 1)
destination = getattr(settings, "HEALTHCHECK_CELERY_PING_DESTINATION", None)

try:
ping_result = app.control.ping(timeout=timeout)
ping_result = app.control.ping(destination=destination, timeout=timeout)
except IOError as e:
self.add_error(ServiceUnavailable("IOError"), e)
except NotImplementedError as exc:
Expand All @@ -30,9 +31,9 @@ def check_status(self):
ServiceUnavailable("Celery workers unavailable"),
)
else:
self._check_ping_result(ping_result)
self._check_ping_result(ping_result, destination)

def _check_ping_result(self, ping_result):
def _check_ping_result(self, ping_result, destination):
active_workers = []

for result in ping_result:
Expand All @@ -46,6 +47,15 @@ def _check_ping_result(self, ping_result):
continue
active_workers.append(worker)

if destination:
inactive_workers = set(destination) - set(active_workers)
if inactive_workers:
self.add_error(
ServiceUnavailable(
f"Celery workers {inactive_workers} did not respond"
)
)

if not self.errors:
self._check_active_queues(active_workers)

Expand Down
46 changes: 44 additions & 2 deletions tests/test_celery_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pytest
from django.apps import apps
from django.conf import settings

from health_check.contrib.celery_ping.apps import HealthCheckConfig
from health_check.contrib.celery_ping.backends import CeleryPingHealthCheck
Expand All @@ -20,7 +19,9 @@ class TestCeleryPingHealthCheck:
def health_check(self):
return CeleryPingHealthCheck()

def test_check_status_doesnt_add_errors_when_ping_successful(self, health_check):
def test_check_status_doesnt_add_errors_when_ping_successful(
self, health_check, settings
):
celery_worker = "celery@4cc150a7b49b"

with patch(
Expand Down Expand Up @@ -59,6 +60,7 @@ def test_check_status_reports_errors_if_ping_responses_are_incorrect(
def test_check_status_adds_errors_when_ping_successfull_but_not_all_defined_queues_have_consumers(
self,
health_check,
settings,
):
celery_worker = "celery@4cc150a7b49b"
queues = list(settings.CELERY_QUEUES)
Expand Down Expand Up @@ -123,6 +125,46 @@ def test_check_status_add_error_when_ping_result_failed(
assert len(health_check.errors) == 1
assert "workers unavailable" in health_check.errors[0].message.lower()

def test_check_status_reports_errors_if_ping_responses_are_missing(
self,
health_check,
settings,
):
settings.HEALTHCHECK_CELERY_PING_DESTINATION = [
"celery1@4cc150a7b49b",
"celery2@4cc150a7b49b",
]
with patch(
self.CELERY_APP_CONTROL_PING,
return_value=[
{"celery1@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
],
):
health_check.check_status()

assert len(health_check.errors) == 1

def test_check_status_reports_destinations(
self,
health_check,
settings,
):
settings.HEALTHCHECK_CELERY_PING_DESTINATION = [
"celery1@4cc150a7b49b",
"celery2@4cc150a7b49b",
]
with patch(
self.CELERY_APP_CONTROL_PING,
return_value=[
{"celery1@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
{"celery2@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
{"celery3@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
],
):
health_check.check_status()

assert len(health_check.errors) == 1


class TestCeleryPingHealthCheckApps:
def test_apps(self):
Expand Down

0 comments on commit 0a29784

Please sign in to comment.