Skip to content

Commit

Permalink
Added support for requests that contain an ETA in the future. (#59)
Browse files Browse the repository at this point in the history
Allow Batches tasks to be called with `apply_async(..., eta=...)` or
`apply_async(..., countdown=...)`.
  • Loading branch information
weetster authored Apr 27, 2022
1 parent 75b5ef3 commit 49b6ee3
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 11 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ Improvements
feature for ``Batches`` tasks. (`#39 <https://github.com/clokep/celery-batches/pull/39>`_)
* Support |using a custom Request class|_ for ``Batches`` tasks.
(`#63 <https://github.com/clokep/celery-batches/pull/63>`_)
* Support calling tasks with an ``eta`` or ``countdown`` specified. Contributed by
`@weetster <https://github.com/weetster>`_.
(`#59 <https://github.com/clokep/celery-batches/pull/59>`_)

Bugfixes
--------
Expand Down
61 changes: 51 additions & 10 deletions celery_batches/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from itertools import count, filterfalse, tee
from queue import Empty, Queue
from time import monotonic
from typing import (
Any,
Callable,
Expand All @@ -22,10 +23,11 @@
from celery.utils.imports import symbol_by_name
from celery.utils.log import get_logger
from celery.utils.nodenames import gethostname
from celery.utils.time import timezone
from celery.worker.consumer import Consumer
from celery.worker.request import Request, create_request_cls
from celery.worker.strategy import hybrid_to_proto2, proto1_to_proto2
from kombu.asynchronous.timer import Timer
from kombu.asynchronous.timer import Timer, to_timestamp
from kombu.message import Message
from kombu.utils.uuid import uuid
from vine import promise
Expand Down Expand Up @@ -171,6 +173,7 @@ class Batches(Task):

def __init__(self) -> None:
self._buffer: Queue[Request] = Queue()
self._pending: Queue[Request] = Queue()
self._count = count(1)
self._tref: Optional[Timer] = None
self._pool: BasePool = None
Expand All @@ -183,6 +186,8 @@ def Strategy(self, task: "Batches", app: Celery, consumer: Consumer) -> Callable
#
# This adds to a buffer at the end, instead of executing the task as
# the default strategy does.
#
# See Batches._do_flush for ETA handling.
self._pool = consumer.pool

hostname = consumer.hostname
Expand Down Expand Up @@ -285,15 +290,51 @@ def apply(
return super().apply(([request],), {}, *_args, **options)

def _do_flush(self) -> None:
logger.debug("Batches: Wake-up to flush buffer...")
requests = None
if self._buffer.qsize():
requests = list(consume_queue(self._buffer))
if requests:
logger.debug("Batches: Buffer complete: %s", len(requests))
self.flush(requests)
if not requests:
logger.debug("Batches: Canceling timer: Nothing in buffer.")
logger.debug("Batches: Wake-up to flush buffers...")

ready_requests = []
app = self.app
to_system_tz = timezone.to_system
now = monotonic()

all_requests = list(consume_queue(self._buffer)) + list(
consume_queue(self._pending)
)
for req in all_requests:
# Similar to logic in celery.worker.strategy.default.
if req.eta:
try:
if req.utc:
eta = to_timestamp(to_system_tz(req.eta))
else:
eta = to_timestamp(req.eta, app.timezone)
except (OverflowError, ValueError) as exc:
logger.error(
"Couldn't convert ETA %r to timestamp: %r. Task: %r",
req.eta,
exc,
req.info(safe=True),
exc_info=True,
)
req.reject(requeue=False)
continue

if eta <= now:
# ETA has elapsed, request is ready.
ready_requests.append(req)
else:
# ETA has not elapsed, add to pending queue.
self._pending.put(req)
else:
# Request does not have an ETA, ready immediately
ready_requests.append(req)

if len(ready_requests) > 0:
logger.debug("Batches: Ready buffer complete: %s", len(ready_requests))
self.flush(ready_requests)

if not ready_requests and self._pending.qsize() == 0:
logger.debug("Batches: Canceling timer: Nothing in buffers.")
if self._tref:
self._tref.cancel() # cancel timer.
self._tref = None
Expand Down
30 changes: 30 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,36 @@ used to provide values to signals and does not populate into the results backend
from celery import current_app
current_app.backend.mark_as_done(request.id, response, request=request)
Retrying tasks
##############

In order to retry a failed task, the task must be re-executed with the original
``task_id``, see the example below:

.. code-block:: python
@app.task(base=Batches, flush_every=100, flush_interval=10)
def flaky_task(requests):
for request in requests:
# Do something that might fail.
try:
response = might_fail(*request.args, **request.kwargs)
except TemporaryError:
# Retry the task 10 seconds from now with the same arguments and task_id.
flaky_task.apply_async(
args=request.args,
kwargs=request.kwargs,
countdown=10,
task_id=request.id,
)
else:
app.backend.mark_as_done(request.id, response, request=request)
Note that the retried task is still bound by the flush rules of the ``Batches``
task, it is used as a lower-bound and will not run *before* that timeout. In the
example above it will run between 10 - 20 seconds from now, assuming no other
tasks are in the queue.

.. toctree::
:hidden:

Expand Down
59 changes: 58 additions & 1 deletion t/integration/test_batches.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from datetime import datetime, timedelta
from time import sleep
from typing import Any, Callable, List, Optional, Union

from celery_batches import Batches, SimpleRequest

from celery import Celery, signals
from celery import Celery, signals, states
from celery.app.task import Task
from celery.contrib.testing.tasks import ping
from celery.contrib.testing.worker import TestWorkController
Expand Down Expand Up @@ -270,3 +271,59 @@ def acks(requests: List[SimpleRequest]) -> None:

# After the tasks are done, both results are acked.
assert acked == [result_1.id, result_2.id]


def test_countdown(celery_app: Celery, celery_worker: TestWorkController) -> None:
"""Ensure that countdowns work properly.
The batch task handles only the first request initially (as the second request
is not ready). A subsequent call handles the second request.
"""

if not celery_app.conf.broker_url.startswith("memory"):
raise pytest.skip("Flaky on live brokers")

result_1 = add.apply_async(args=(1,))
# The countdown is longer than the flush interval + first sleep, but shorter
# than the flush interval + first sleep + second sleep.
result_2 = add.apply_async(args=(2,), countdown=3)

# The flush interval is 0.1 seconds and the retry interval is 0.5 seconds,
# this is longer.
sleep(1)

# Let the worker work.
_wait_for_ping()

assert result_1.get() == 1
assert result_2.state == states.PENDING

sleep(3)

assert result_2.get() == 2


def test_eta(celery_app: Celery, celery_worker: TestWorkController) -> None:
"""Ensure that ETAs work properly."""

if not celery_app.conf.broker_url.startswith("memory"):
raise pytest.skip("Flaky on live brokers")

result_1 = add.apply_async(args=(1,))
# The countdown is longer than the flush interval + first sleep, but shorter
# than the flush interval + first sleep + second sleep.
result_2 = add.apply_async(args=(2,), eta=datetime.utcnow() + timedelta(seconds=3))

# The flush interval is 0.1 seconds and the retry interval is 0.5 seconds,
# this is longer.
sleep(1)

# Let the worker work.
_wait_for_ping()

assert result_1.get() == 1
assert result_2.state == states.PENDING

sleep(3)

assert result_2.get() == 2

0 comments on commit 49b6ee3

Please sign in to comment.