From 20ee68fca2b36501b1fa5f0559d50f406da88671 Mon Sep 17 00:00:00 2001 From: Julian Edwards Date: Thu, 28 Apr 2022 15:01:54 +1000 Subject: [PATCH] Return Jobs when scheduling them Engine.schedule* and Task.schedule* used to return None, now they will return the Job(s) actually scheduled. Fixes https://github.com/NicolasLM/spinach/issues/21 --- spinach/engine.py | 22 +++++++++++++++------ spinach/task.py | 25 +++++++++++++++++------- tests/test_engine.py | 46 +++++++++++++++++++++++++++++++++++++++++++- tests/test_task.py | 10 ++++++++-- 4 files changed, 87 insertions(+), 16 deletions(-) diff --git a/spinach/engine.py b/spinach/engine.py index 64e4bd2..d331a6f 100644 --- a/spinach/engine.py +++ b/spinach/engine.py @@ -1,7 +1,7 @@ from datetime import datetime, timezone from logging import getLogger import threading -from typing import Type +from typing import Iterable, Type from .task import Tasks, Batch, Schedulable from .utils import run_forever, handle_sigterm @@ -89,17 +89,21 @@ def attach_tasks(self, tasks: Tasks): def execute(self, task: Schedulable, *args, **kwargs): return self._tasks.get(task).func(*args, **kwargs) - def schedule(self, task: Schedulable, *args, **kwargs): + def schedule(self, task: Schedulable, *args, **kwargs) -> Job: """Schedule a job to be executed as soon as possible. :arg task: the task or its name to execute in the background :arg args: args to be passed to the task function :arg kwargs: kwargs to be passed to the task function + + :return: The Job that was created and scheduled. """ at = datetime.now(timezone.utc) return self.schedule_at(task, at, *args, **kwargs) - def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs): + def schedule_at( + self, task: Schedulable, at: datetime, *args, **kwargs + ) -> Job: """Schedule a job to be executed in the future. :arg task: the task or its name to execute in the background @@ -109,21 +113,26 @@ def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs): contain UTC time. :arg args: args to be passed to the task function :arg kwargs: kwargs to be passed to the task function + + :return: The Job that was created and scheduled. """ task = self._tasks.get(task) job = Job(task.name, task.queue, at, task.max_retries, task_args=args, task_kwargs=kwargs) job.task_func = task.func job.check_signature() - return self._broker.enqueue_jobs([job]) + self._broker.enqueue_jobs([job]) + return job - def schedule_batch(self, batch: Batch): + def schedule_batch(self, batch: Batch) -> Iterable[Job]: """Schedule many jobs at once. Scheduling jobs in batches allows to enqueue them fast by avoiding round-trips to the broker. :arg batch: :class:`Batch` instance containing jobs to schedule + + :return: The Jobs that were created and scheduled. """ jobs = list() for task, at, args, kwargs in batch.jobs_to_create: @@ -136,7 +145,8 @@ def schedule_batch(self, batch: Batch): job.check_signature() jobs.append(job) - return self._broker.enqueue_jobs(jobs) + self._broker.enqueue_jobs(jobs) + return jobs def _arbiter_func(self, stop_when_queue_empty=False): logger.debug('Arbiter started') diff --git a/spinach/task.py b/spinach/task.py index 2fd4f3a..5d12fab 100644 --- a/spinach/task.py +++ b/spinach/task.py @@ -1,11 +1,14 @@ from datetime import datetime, timezone, timedelta import functools import json -from typing import Optional, Callable, List, Union +from typing import Iterable, Optional, Callable, List, TYPE_CHECKING, Union from numbers import Number from . import const, exc +if TYPE_CHECKING: + from .job import Job + class Task: @@ -207,20 +210,24 @@ def _require_attached_tasks(self): 'a Spinach Engine.' ) - def schedule(self, task: Schedulable, *args, **kwargs): + def schedule(self, task: Schedulable, *args, **kwargs) -> "Job": """Schedule a job to be executed as soon as possible. :arg task: the task or its name to execute in the background :arg args: args to be passed to the task function :arg kwargs: kwargs to be passed to the task function + :return: The Job that was created and scheduled. + This method can only be used once tasks have been attached to a Spinach :class:`Engine`. """ self._require_attached_tasks() - self._spin.schedule(task, *args, **kwargs) + return self._spin.schedule(task, *args, **kwargs) - def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs): + def schedule_at( + self, task: Schedulable, at: datetime, *args, **kwargs + ) -> "Job": """Schedule a job to be executed in the future. :arg task: the task or its name to execute in the background @@ -231,22 +238,26 @@ def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs): :arg args: args to be passed to the task function :arg kwargs: kwargs to be passed to the task function + :return: The Job that was created and scheduled. + This method can only be used once tasks have been attached to a Spinach :class:`Engine`. """ self._require_attached_tasks() - self._spin.schedule_at(task, at, *args, **kwargs) + return self._spin.schedule_at(task, at, *args, **kwargs) - def schedule_batch(self, batch: 'Batch'): + def schedule_batch(self, batch: 'Batch') -> Iterable["Job"]: """Schedule many jobs at once. Scheduling jobs in batches allows to enqueue them fast by avoiding round-trips to the broker. :arg batch: :class:`Batch` instance containing jobs to schedule + + :return: The Jobs that were created and scheduled. """ self._require_attached_tasks() - self._spin.schedule_batch(batch) + return self._spin.schedule_batch(batch) class Batch: diff --git a/tests/test_engine.py b/tests/test_engine.py index a6705be..e5d6b47 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -45,6 +45,48 @@ def test_attach_tasks(mock_logger, spin, spin_2): assert spin_2._tasks.tasks == tasks.tasks +def test_schedule_at(patch_now): + now = get_now() + + tasks = Tasks() + tasks.add(print, 'bar_task') + + broker = Mock() + + s = Engine(broker, namespace='tests') + s.attach_tasks(tasks) + + job = s.schedule_at('bar_task', now, three=True) + + bar_job = broker.enqueue_jobs.call_args[0][0][0] + assert bar_job == job + assert bar_job.task_name == 'bar_task' + assert bar_job.at == now + assert bar_job.task_args == () + assert bar_job.task_kwargs == {'three': True} + + +def test_schedule(patch_now): + now = get_now() + + tasks = Tasks() + tasks.add(print, 'foo_task') + + broker = Mock() + + s = Engine(broker, namespace='tests') + s.attach_tasks(tasks) + + job1 = s.schedule('foo_task', 1, 2) + + foo_job = broker.enqueue_jobs.call_args[0][0][0] + assert foo_job == job1 + assert foo_job.task_name == 'foo_task' + assert foo_job.at == now + assert foo_job.task_args == (1, 2) + assert foo_job.task_kwargs == {} + + def test_schedule_batch(patch_now): now = get_now() @@ -60,17 +102,19 @@ def test_schedule_batch(patch_now): batch = Batch() batch.schedule('foo_task', 1, 2) batch.schedule_at('bar_task', now, three=True) - s.schedule_batch(batch) + jobs = s.schedule_batch(batch) broker.enqueue_jobs.assert_called_once_with([ANY, ANY]) foo_job = broker.enqueue_jobs.call_args[0][0][0] + assert foo_job in jobs assert foo_job.task_name == 'foo_task' assert foo_job.at == now assert foo_job.task_args == (1, 2) assert foo_job.task_kwargs == {} bar_job = broker.enqueue_jobs.call_args[0][0][1] + assert bar_job in jobs assert bar_job.task_name == 'bar_task' assert bar_job.at == now assert bar_job.task_args == () diff --git a/tests/test_task.py b/tests/test_task.py index bf24376..0dc14b9 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -218,13 +218,19 @@ def test_tasks_scheduling(task): tasks.schedule_batch(batch) spin = mock.Mock() + job = mock.sentinel + spin.schedule.return_value = job + spin.schedule_at.return_value = job + spin.schedule_batch.return_value = job tasks._spin = spin - tasks.schedule('write_to_stdout') + retjob = tasks.schedule('write_to_stdout') spin.schedule.assert_called_once_with('write_to_stdout') + assert retjob == job - tasks.schedule_at('write_to_stdout', get_now()) + retjob = tasks.schedule_at('write_to_stdout', get_now()) spin.schedule_at.assert_called_once_with('write_to_stdout', get_now()) + assert retjob == job tasks.schedule_batch(batch) spin.schedule_batch.assert_called_once_with(batch)