Skip to content

Commit

Permalink
Merge pull request #22 from bigjools/issue/21
Browse files Browse the repository at this point in the history
Return Jobs when scheduling them
  • Loading branch information
bigjools authored Apr 29, 2022
2 parents 09c7818 + 20ee68f commit 35d3928
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 16 deletions.
22 changes: 16 additions & 6 deletions spinach/engine.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime, timezone
from logging import getLogger
import threading
from typing import Type
from typing import Iterable, Type

from .task import Tasks, Batch, Schedulable
from .utils import run_forever, handle_sigterm
Expand Down Expand Up @@ -89,17 +89,21 @@ def attach_tasks(self, tasks: Tasks):
def execute(self, task: Schedulable, *args, **kwargs):
return self._tasks.get(task).func(*args, **kwargs)

def schedule(self, task: Schedulable, *args, **kwargs):
def schedule(self, task: Schedulable, *args, **kwargs) -> Job:
"""Schedule a job to be executed as soon as possible.
:arg task: the task or its name to execute in the background
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
:return: The Job that was created and scheduled.
"""
at = datetime.now(timezone.utc)
return self.schedule_at(task, at, *args, **kwargs)

def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs):
def schedule_at(
self, task: Schedulable, at: datetime, *args, **kwargs
) -> Job:
"""Schedule a job to be executed in the future.
:arg task: the task or its name to execute in the background
Expand All @@ -109,21 +113,26 @@ def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs):
contain UTC time.
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
:return: The Job that was created and scheduled.
"""
task = self._tasks.get(task)
job = Job(task.name, task.queue, at, task.max_retries, task_args=args,
task_kwargs=kwargs)
job.task_func = task.func
job.check_signature()
return self._broker.enqueue_jobs([job])
self._broker.enqueue_jobs([job])
return job

def schedule_batch(self, batch: Batch):
def schedule_batch(self, batch: Batch) -> Iterable[Job]:
"""Schedule many jobs at once.
Scheduling jobs in batches allows to enqueue them fast by avoiding
round-trips to the broker.
:arg batch: :class:`Batch` instance containing jobs to schedule
:return: The Jobs that were created and scheduled.
"""
jobs = list()
for task, at, args, kwargs in batch.jobs_to_create:
Expand All @@ -136,7 +145,8 @@ def schedule_batch(self, batch: Batch):
job.check_signature()
jobs.append(job)

return self._broker.enqueue_jobs(jobs)
self._broker.enqueue_jobs(jobs)
return jobs

def _arbiter_func(self, stop_when_queue_empty=False):
logger.debug('Arbiter started')
Expand Down
25 changes: 18 additions & 7 deletions spinach/task.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from datetime import datetime, timezone, timedelta
import functools
import json
from typing import Optional, Callable, List, Union
from typing import Iterable, Optional, Callable, List, TYPE_CHECKING, Union
from numbers import Number

from . import const, exc

if TYPE_CHECKING:
from .job import Job


class Task:

Expand Down Expand Up @@ -207,20 +210,24 @@ def _require_attached_tasks(self):
'a Spinach Engine.'
)

def schedule(self, task: Schedulable, *args, **kwargs):
def schedule(self, task: Schedulable, *args, **kwargs) -> "Job":
"""Schedule a job to be executed as soon as possible.
:arg task: the task or its name to execute in the background
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
:return: The Job that was created and scheduled.
This method can only be used once tasks have been attached to a
Spinach :class:`Engine`.
"""
self._require_attached_tasks()
self._spin.schedule(task, *args, **kwargs)
return self._spin.schedule(task, *args, **kwargs)

def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs):
def schedule_at(
self, task: Schedulable, at: datetime, *args, **kwargs
) -> "Job":
"""Schedule a job to be executed in the future.
:arg task: the task or its name to execute in the background
Expand All @@ -231,22 +238,26 @@ def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs):
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
:return: The Job that was created and scheduled.
This method can only be used once tasks have been attached to a
Spinach :class:`Engine`.
"""
self._require_attached_tasks()
self._spin.schedule_at(task, at, *args, **kwargs)
return self._spin.schedule_at(task, at, *args, **kwargs)

def schedule_batch(self, batch: 'Batch'):
def schedule_batch(self, batch: 'Batch') -> Iterable["Job"]:
"""Schedule many jobs at once.
Scheduling jobs in batches allows to enqueue them fast by avoiding
round-trips to the broker.
:arg batch: :class:`Batch` instance containing jobs to schedule
:return: The Jobs that were created and scheduled.
"""
self._require_attached_tasks()
self._spin.schedule_batch(batch)
return self._spin.schedule_batch(batch)


class Batch:
Expand Down
46 changes: 45 additions & 1 deletion tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,48 @@ def test_attach_tasks(mock_logger, spin, spin_2):
assert spin_2._tasks.tasks == tasks.tasks


def test_schedule_at(patch_now):
now = get_now()

tasks = Tasks()
tasks.add(print, 'bar_task')

broker = Mock()

s = Engine(broker, namespace='tests')
s.attach_tasks(tasks)

job = s.schedule_at('bar_task', now, three=True)

bar_job = broker.enqueue_jobs.call_args[0][0][0]
assert bar_job == job
assert bar_job.task_name == 'bar_task'
assert bar_job.at == now
assert bar_job.task_args == ()
assert bar_job.task_kwargs == {'three': True}


def test_schedule(patch_now):
now = get_now()

tasks = Tasks()
tasks.add(print, 'foo_task')

broker = Mock()

s = Engine(broker, namespace='tests')
s.attach_tasks(tasks)

job1 = s.schedule('foo_task', 1, 2)

foo_job = broker.enqueue_jobs.call_args[0][0][0]
assert foo_job == job1
assert foo_job.task_name == 'foo_task'
assert foo_job.at == now
assert foo_job.task_args == (1, 2)
assert foo_job.task_kwargs == {}


def test_schedule_batch(patch_now):
now = get_now()

Expand All @@ -60,17 +102,19 @@ def test_schedule_batch(patch_now):
batch = Batch()
batch.schedule('foo_task', 1, 2)
batch.schedule_at('bar_task', now, three=True)
s.schedule_batch(batch)
jobs = s.schedule_batch(batch)

broker.enqueue_jobs.assert_called_once_with([ANY, ANY])

foo_job = broker.enqueue_jobs.call_args[0][0][0]
assert foo_job in jobs
assert foo_job.task_name == 'foo_task'
assert foo_job.at == now
assert foo_job.task_args == (1, 2)
assert foo_job.task_kwargs == {}

bar_job = broker.enqueue_jobs.call_args[0][0][1]
assert bar_job in jobs
assert bar_job.task_name == 'bar_task'
assert bar_job.at == now
assert bar_job.task_args == ()
Expand Down
10 changes: 8 additions & 2 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,19 @@ def test_tasks_scheduling(task):
tasks.schedule_batch(batch)

spin = mock.Mock()
job = mock.sentinel
spin.schedule.return_value = job
spin.schedule_at.return_value = job
spin.schedule_batch.return_value = job
tasks._spin = spin

tasks.schedule('write_to_stdout')
retjob = tasks.schedule('write_to_stdout')
spin.schedule.assert_called_once_with('write_to_stdout')
assert retjob == job

tasks.schedule_at('write_to_stdout', get_now())
retjob = tasks.schedule_at('write_to_stdout', get_now())
spin.schedule_at.assert_called_once_with('write_to_stdout', get_now())
assert retjob == job

tasks.schedule_batch(batch)
spin.schedule_batch.assert_called_once_with(batch)
Expand Down

0 comments on commit 35d3928

Please sign in to comment.