Skip to content

Commit

Permalink
fix: avoid race condition in max running processes (#675)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex <[email protected]>
  • Loading branch information
Alex-Izquierdo authored Feb 21, 2024
1 parent 9311bbf commit b37fa13
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 33 deletions.
4 changes: 4 additions & 0 deletions src/aap_eda/services/activation/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class ActivationManagerError(Exception):
"""Base class for exceptions for the ActivationManager."""


class MaxRunningProcessesError(ActivationManagerError):
"""Exception raised when the maximum number processes is reached."""


class ActivationStartError(ActivationManagerError):
"""Exception raised when an activation fails to start."""

Expand Down
39 changes: 32 additions & 7 deletions src/aap_eda/services/activation/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,14 +1033,25 @@ def _create_activation_instance(self):
if hasattr(self.db_instance, "git_hash")
else ""
)
try:
args = {
"name": self.db_instance.name,
"status": ActivationStatus.STARTING,
"git_hash": git_hash,
}
args[f"{self.db_instance_type}"] = self.db_instance

if not self.check_new_process_allowed(
self.db_instance_type,
self.db_instance.id,
):
msg = (
"Failed to create rulebook process. "
"Reason: Max running processes reached. "
"Waiting for a free slot."
)
self._set_activation_status(ActivationStatus.PENDING, msg)
raise exceptions.MaxRunningProcessesError
args = {
"name": self.db_instance.name,
"status": ActivationStatus.STARTING,
"git_hash": git_hash,
}
args[f"{self.db_instance_type}"] = self.db_instance
try:
models.RulebookProcess.objects.create(**args)
except IntegrityError as exc:
msg = (
Expand All @@ -1060,3 +1071,17 @@ def _get_container_request(self) -> ContainerRequest:
)
LOGGER.exception(msg)
raise exceptions.ActivationManagerError(msg)

@staticmethod
def check_new_process_allowed(parent_type: str, parent_id: int) -> bool:
"""Check if a new process is allowed."""
num_running_processes = models.RulebookProcess.objects.filter(
status__in=[ActivationStatus.RUNNING, ActivationStatus.STARTING],
).count()
if num_running_processes >= settings.MAX_RUNNING_ACTIVATIONS:
LOGGER.info(
"No capacity to start a new rulebook process. "
f"{parent_type} {parent_id} is postponed",
)
return False
return True
28 changes: 9 additions & 19 deletions src/aap_eda/tasks/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import logging
from typing import Union

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist

import aap_eda.tasks.activation_request_queue as requests_queue
Expand All @@ -27,6 +26,7 @@
)
from aap_eda.core.models import Activation, ActivationRequestQueue, EventStream
from aap_eda.core.tasking import unique_enqueue
from aap_eda.services.activation import exceptions
from aap_eda.services.activation.manager import ActivationManager

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,8 +90,12 @@ def _run_request(
f"{process_parent.id}",
)
start_commands = [ActivationRequest.START, ActivationRequest.AUTO_START]
if request.request in start_commands and not _can_start_new_process(
process_parent,
if (
request.request in start_commands
and not ActivationManager.check_new_process_allowed(
process_parent_type,
process_parent.id,
)
):
return False

Expand All @@ -107,6 +111,8 @@ def _run_request(
manager.restart()
elif request.request == ActivationRequest.DELETE:
manager.delete()
except exceptions.MaxRunningProcessesError:
return False
except Exception as e:
LOGGER.exception(
f"Failed to process request {request.request} for "
Expand All @@ -115,22 +121,6 @@ def _run_request(
return True


def _can_start_new_process(
process_parent: Union[Activation, EventStream]
) -> bool:
num_running_processes = models.RulebookProcess.objects.filter(
status__in=[ActivationStatus.RUNNING, ActivationStatus.STARTING],
).count()
if num_running_processes >= settings.MAX_RUNNING_ACTIVATIONS:
process_parent_type = type(process_parent).__name__
LOGGER.info(
"No capacity to start a new rulebook process. "
f"{process_parent_type} {process_parent.id} is postponed",
)
return False
return True


def _make_user_request(
process_parent_type: ProcessParentType,
id: int,
Expand Down
38 changes: 38 additions & 0 deletions tests/integration/services/activation/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,28 @@ def basic_activation(
)


@pytest.fixture
def new_activation_with_instance(
default_user: models.User,
default_decision_environment: models.DecisionEnvironment,
default_rulebook: models.Rulebook,
) -> models.Activation:
"""Return an activation with an instance."""
activation = models.Activation.objects.create(
name="new_activation_with_instance",
user=default_user,
decision_environment=default_decision_environment,
rulebook=default_rulebook,
# rulebook_rulesets is populated by the serializer
rulebook_rulesets=default_rulebook.rulesets,
)
models.RulebookProcess.objects.create(
activation=activation,
status=ActivationStatus.RUNNING,
)
return activation


@pytest.fixture
def container_engine_mock() -> MagicMock:
return create_autospec(ContainerEngine, instance=True)
Expand Down Expand Up @@ -554,3 +576,19 @@ def test_delete_with_exception(
assert (
models.Activation.objects.filter(id=running_activation.id).count() == 0
)


@pytest.mark.django_db
def test_start_max_running_activations(
new_activation_with_instance: models.Activation,
basic_activation: models.Activation,
settings: SettingsWrapper,
eda_caplog: LogCaptureFixture,
):
"""Test start verb when max running activations is reached."""
apply_settings(settings, MAX_RUNNING_ACTIVATIONS=1)
activation_manager = ActivationManager(basic_activation)

with pytest.raises(exceptions.MaxRunningProcessesError):
activation_manager.start()
assert "No capacity to start a new rulebook process" in eda_caplog.text
81 changes: 74 additions & 7 deletions tests/integration/tasks/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,58 @@
from django.conf import settings

import aap_eda.tasks.activation_request_queue as queue
import aap_eda.tasks.orchestrator as orchestrator
from aap_eda.core import models
from aap_eda.core.enums import (
ActivationRequest,
ActivationStatus,
ProcessParentType,
)
from aap_eda.tasks import orchestrator


@pytest.fixture
def default_rulebook() -> models.Rulebook:
"""Return a default rulebook."""
rulesets = """
---
- name: Hello World
hosts: all
sources:
- ansible.eda.range:
limit: 5
rules:
- name: Say Hello
condition: event.i == 1
action:
debug:
msg: "Hello World!"
"""
return models.Rulebook.objects.create(
name="test-rulebook",
rulesets=rulesets,
)


@pytest.fixture()
def activation():
def activation(default_rulebook):
user = models.User.objects.create_user(
username="luke.skywalker",
first_name="Luke",
last_name="Skywalker",
email="[email protected]",
password="secret",
)
decision_environment = models.DecisionEnvironment.objects.create(
name="test-decision-environment",
image_url="localhost:14000/test-image-url",
)
return models.Activation.objects.create(
name="test1",
user=user,
decision_environment=decision_environment,
rulebook=default_rulebook,
rulebook_rulesets=default_rulebook.rulesets,
)


Expand Down Expand Up @@ -106,17 +137,15 @@ def test_manage_request(manager_mock, activation, verb):


@pytest.mark.django_db
@mock.patch("aap_eda.tasks.orchestrator.ActivationManager")
def test_manage_not_start(manager_mock, activation, max_running_processes):
@mock.patch.object(orchestrator.ActivationManager, "start", autospec=True)
def test_manage_not_start(start_mock, activation, max_running_processes):
queue.push(
ProcessParentType.ACTIVATION, activation.id, ActivationRequest.START
)
manager_instance_mock = mock.Mock()
manager_mock.return_value = manager_instance_mock

orchestrator._manage(ProcessParentType.ACTIVATION, activation.id)

manager_instance_mock.start.assert_not_called()
start_mock.assert_not_called()
assert (
len(queue.peek_all(ProcessParentType.ACTIVATION, activation.id)) == 1
)
Expand Down Expand Up @@ -194,3 +223,41 @@ def test_monitor_rulebook_processes(
orchestrator.monitor_rulebook_processes()

enqueue_mock.assert_has_calls(call_args, any_order=True)


original_start_method = orchestrator.ActivationManager.start


@pytest.mark.django_db
@mock.patch.object(orchestrator.ActivationManager, "start", autospec=True)
def test_max_running_activation_after_start_job(
start_mock,
activation,
max_running_processes,
):
"""Check if the max running processes error is handled correctly
when the limit is reached after the request is started."""

def side_effect(*args, **kwargs):
# Recreate the process and run the original start method
instance = args[0]
models.RulebookProcess.objects.create(
name="running",
activation=max_running_processes[0].activation,
status=ActivationStatus.RUNNING,
)
original_start_method(instance, *args[1:], **kwargs)

start_mock.side_effect = side_effect

max_running_processes[0].delete()

queue.push(
ProcessParentType.ACTIVATION, activation.id, ActivationRequest.START
)
orchestrator._manage(ProcessParentType.ACTIVATION, activation.id)
assert start_mock.call_count == 1
running_processes = models.RulebookProcess.objects.filter(
status__in=[ActivationStatus.STARTING, ActivationStatus.RUNNING]
).count()
assert running_processes == settings.MAX_RUNNING_ACTIVATIONS

0 comments on commit b37fa13

Please sign in to comment.