Skip to content

Commit

Permalink
fix(task-processor): implement grace period for deleting old recurrin…
Browse files Browse the repository at this point in the history
…g task (#3169)
  • Loading branch information
gagantrivedi authored Dec 21, 2023
1 parent e699912 commit 00f0552
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
12 changes: 10 additions & 2 deletions api/task_processor/processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import traceback
import typing
from datetime import timedelta

from django.utils import timezone

Expand All @@ -14,6 +15,8 @@

logger = logging.getLogger(__name__)

UNREGISTERED_RECURRING_TASK_GRACE_PERIOD = timedelta(minutes=30)


def run_tasks(num_tasks: int = 1) -> typing.List[TaskRun]:
if num_tasks < 1:
Expand Down Expand Up @@ -57,9 +60,14 @@ def run_recurring_tasks(num_tasks: int = 1) -> typing.List[RecurringTaskRun]:
task_runs = []

for task in tasks:
# Remove the task if it's not registered anymore
if not task.is_task_registered:
task.delete()
# This is necessary to ensure that old instances of the task processor,
# which may still be running during deployment, do not remove tasks added by new instances.
# Reference: https://github.com/Flagsmith/flagsmith/issues/2551
if (
timezone.now() - task.created_at
) > UNREGISTERED_RECURRING_TASK_GRACE_PERIOD:
task.delete()
continue

if task.should_execute:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
import time
import uuid
from datetime import timedelta
from threading import Thread

import pytest
from django.utils import timezone
from freezegun import freeze_time

from organisations.models import Organisation
from task_processor.decorators import (
Expand All @@ -19,7 +21,11 @@
TaskResult,
TaskRun,
)
from task_processor.processor import run_recurring_tasks, run_tasks
from task_processor.processor import (
UNREGISTERED_RECURRING_TASK_GRACE_PERIOD,
run_recurring_tasks,
run_tasks,
)
from task_processor.task_registry import registered_tasks


Expand Down Expand Up @@ -150,10 +156,13 @@ def _create_organisation(organisation_name):
assert RecurringTaskRun.objects.filter(task=task).count() == 1


def test_run_recurring_tasks_deletes_the_task_if_it_is_not_registered(
db, run_by_processor
):
def test_run_recurring_tasks_does_nothing_if_unregistered_task_is_new(
db: None, run_by_processor: None, caplog: pytest.LogCaptureFixture
) -> None:
# Given
task_processor_logger = logging.getLogger("task_processor")
task_processor_logger.propagate = True

task_identifier = "test_unit_task_processor_processor._a_task"

@register_recurring_task(run_every=timedelta(milliseconds=100))
Expand All @@ -168,7 +177,35 @@ def _a_task():

# Then
assert len(task_runs) == 0
assert not RecurringTask.objects.filter(task_identifier=task_identifier).exists()
assert RecurringTask.objects.filter(task_identifier=task_identifier).exists()


def test_run_recurring_tasks_deletes_the_task_if_unregistered_task_is_old(
db: None, run_by_processor: None, caplog: pytest.LogCaptureFixture
) -> None:
# Given
task_processor_logger = logging.getLogger("task_processor")
task_processor_logger.propagate = True

task_identifier = "test_unit_task_processor_processor._a_task"

with freeze_time(timezone.now() - UNREGISTERED_RECURRING_TASK_GRACE_PERIOD):

@register_recurring_task(run_every=timedelta(milliseconds=100))
def _a_task():
pass

# now - remove the task from the registry
registered_tasks.pop(task_identifier)

# When
task_runs = run_recurring_tasks()

# Then
assert len(task_runs) == 0
assert (
RecurringTask.objects.filter(task_identifier=task_identifier).exists() is False
)


def test_run_task_runs_task_and_creates_task_run_object_when_failure(db):
Expand Down

3 comments on commit 00f0552

@vercel
Copy link

@vercel vercel bot commented on 00f0552 Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vercel
Copy link

@vercel vercel bot commented on 00f0552 Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vercel
Copy link

@vercel vercel bot commented on 00f0552 Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

docs – ./docs

docs-flagsmith.vercel.app
docs-git-main-flagsmith.vercel.app
docs.bullet-train.io
docs.flagsmith.com

Please sign in to comment.