Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gagantrivedi committed Oct 24, 2023
1 parent 1b53f06 commit f951507
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
4 changes: 3 additions & 1 deletion api/task_processor/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ def delay(
task = Task.create(
task_identifier=task_identifier,
scheduled_for=delay_until or timezone.now(),
priority=priority,
queue_size=queue_size,
args=args,
kwargs=kwargs,
)
except TaskQueueFullError as e:
logger.warning(e)
return

task.save(priority=priority)
task.save()
return task

def run_in_thread(*, args: typing.Tuple = (), kwargs: typing.Dict = None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
register_task_handler,
)
from task_processor.exceptions import InvalidArgumentsError
from task_processor.models import RecurringTask
from task_processor.models import RecurringTask, Task, TaskPriority
from task_processor.task_registry import get_task
from task_processor.task_run_method import TaskRunMethod

Expand Down Expand Up @@ -131,3 +131,43 @@ class NonSerializableObj:
# When
with pytest.raises(InvalidArgumentsError):
my_function.delay(args=(NonSerializableObj(),))


def test_delay_returns_none_if_task_queue_is_full(settings, db):
# Given
settings.TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR

@register_task_handler(queue_size=1)
def my_function(*args, **kwargs):
pass

for _ in range(10):
Task.objects.create(
task_identifier="test_unit_task_processor_decorators.my_function"
)

# When
task = my_function.delay()

# Then
assert task is None


def test_can_create_task_with_priority(settings, db):
# Given
settings.TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR

@register_task_handler(priority=TaskPriority.HIGH)
def my_function(*args, **kwargs):
pass

for _ in range(10):
Task.objects.create(
task_identifier="test_unit_task_processor_decorators.my_function"
)

# When
task = my_function.delay()

# Then
assert task.priority == TaskPriority.HIGH

0 comments on commit f951507

Please sign in to comment.