From b37fa13ca156862dba13b680552c3e94405eb371 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 21 Feb 2024 15:38:32 +0100 Subject: [PATCH] fix: avoid race condition in max running processes (#675) Signed-off-by: Alex --- src/aap_eda/services/activation/exceptions.py | 4 + src/aap_eda/services/activation/manager.py | 39 +++++++-- src/aap_eda/tasks/orchestrator.py | 28 +++---- .../services/activation/test_manager.py | 38 +++++++++ tests/integration/tasks/test_orchestrator.py | 81 +++++++++++++++++-- 5 files changed, 157 insertions(+), 33 deletions(-) diff --git a/src/aap_eda/services/activation/exceptions.py b/src/aap_eda/services/activation/exceptions.py index a11a55170..353b7b04e 100644 --- a/src/aap_eda/services/activation/exceptions.py +++ b/src/aap_eda/services/activation/exceptions.py @@ -21,6 +21,10 @@ class ActivationManagerError(Exception): """Base class for exceptions for the ActivationManager.""" +class MaxRunningProcessesError(ActivationManagerError): + """Exception raised when the maximum number processes is reached.""" + + class ActivationStartError(ActivationManagerError): """Exception raised when an activation fails to start.""" diff --git a/src/aap_eda/services/activation/manager.py b/src/aap_eda/services/activation/manager.py index 28453fd11..a8dfe761d 100644 --- a/src/aap_eda/services/activation/manager.py +++ b/src/aap_eda/services/activation/manager.py @@ -1033,14 +1033,25 @@ def _create_activation_instance(self): if hasattr(self.db_instance, "git_hash") else "" ) - try: - args = { - "name": self.db_instance.name, - "status": ActivationStatus.STARTING, - "git_hash": git_hash, - } - args[f"{self.db_instance_type}"] = self.db_instance + if not self.check_new_process_allowed( + self.db_instance_type, + self.db_instance.id, + ): + msg = ( + "Failed to create rulebook process. " + "Reason: Max running processes reached. " + "Waiting for a free slot." + ) + self._set_activation_status(ActivationStatus.PENDING, msg) + raise exceptions.MaxRunningProcessesError + args = { + "name": self.db_instance.name, + "status": ActivationStatus.STARTING, + "git_hash": git_hash, + } + args[f"{self.db_instance_type}"] = self.db_instance + try: models.RulebookProcess.objects.create(**args) except IntegrityError as exc: msg = ( @@ -1060,3 +1071,17 @@ def _get_container_request(self) -> ContainerRequest: ) LOGGER.exception(msg) raise exceptions.ActivationManagerError(msg) + + @staticmethod + def check_new_process_allowed(parent_type: str, parent_id: int) -> bool: + """Check if a new process is allowed.""" + num_running_processes = models.RulebookProcess.objects.filter( + status__in=[ActivationStatus.RUNNING, ActivationStatus.STARTING], + ).count() + if num_running_processes >= settings.MAX_RUNNING_ACTIVATIONS: + LOGGER.info( + "No capacity to start a new rulebook process. " + f"{parent_type} {parent_id} is postponed", + ) + return False + return True diff --git a/src/aap_eda/tasks/orchestrator.py b/src/aap_eda/tasks/orchestrator.py index d53dc0663..01da7b6ff 100644 --- a/src/aap_eda/tasks/orchestrator.py +++ b/src/aap_eda/tasks/orchestrator.py @@ -15,7 +15,6 @@ import logging from typing import Union -from django.conf import settings from django.core.exceptions import ObjectDoesNotExist import aap_eda.tasks.activation_request_queue as requests_queue @@ -27,6 +26,7 @@ ) from aap_eda.core.models import Activation, ActivationRequestQueue, EventStream from aap_eda.core.tasking import unique_enqueue +from aap_eda.services.activation import exceptions from aap_eda.services.activation.manager import ActivationManager LOGGER = logging.getLogger(__name__) @@ -90,8 +90,12 @@ def _run_request( f"{process_parent.id}", ) start_commands = [ActivationRequest.START, ActivationRequest.AUTO_START] - if request.request in start_commands and not _can_start_new_process( - process_parent, + if ( + request.request in start_commands + and not ActivationManager.check_new_process_allowed( + process_parent_type, + process_parent.id, + ) ): return False @@ -107,6 +111,8 @@ def _run_request( manager.restart() elif request.request == ActivationRequest.DELETE: manager.delete() + except exceptions.MaxRunningProcessesError: + return False except Exception as e: LOGGER.exception( f"Failed to process request {request.request} for " @@ -115,22 +121,6 @@ def _run_request( return True -def _can_start_new_process( - process_parent: Union[Activation, EventStream] -) -> bool: - num_running_processes = models.RulebookProcess.objects.filter( - status__in=[ActivationStatus.RUNNING, ActivationStatus.STARTING], - ).count() - if num_running_processes >= settings.MAX_RUNNING_ACTIVATIONS: - process_parent_type = type(process_parent).__name__ - LOGGER.info( - "No capacity to start a new rulebook process. " - f"{process_parent_type} {process_parent.id} is postponed", - ) - return False - return True - - def _make_user_request( process_parent_type: ProcessParentType, id: int, diff --git a/tests/integration/services/activation/test_manager.py b/tests/integration/services/activation/test_manager.py index e185a5e0a..5ab1be670 100644 --- a/tests/integration/services/activation/test_manager.py +++ b/tests/integration/services/activation/test_manager.py @@ -111,6 +111,28 @@ def basic_activation( ) +@pytest.fixture +def new_activation_with_instance( + default_user: models.User, + default_decision_environment: models.DecisionEnvironment, + default_rulebook: models.Rulebook, +) -> models.Activation: + """Return an activation with an instance.""" + activation = models.Activation.objects.create( + name="new_activation_with_instance", + user=default_user, + decision_environment=default_decision_environment, + rulebook=default_rulebook, + # rulebook_rulesets is populated by the serializer + rulebook_rulesets=default_rulebook.rulesets, + ) + models.RulebookProcess.objects.create( + activation=activation, + status=ActivationStatus.RUNNING, + ) + return activation + + @pytest.fixture def container_engine_mock() -> MagicMock: return create_autospec(ContainerEngine, instance=True) @@ -554,3 +576,19 @@ def test_delete_with_exception( assert ( models.Activation.objects.filter(id=running_activation.id).count() == 0 ) + + +@pytest.mark.django_db +def test_start_max_running_activations( + new_activation_with_instance: models.Activation, + basic_activation: models.Activation, + settings: SettingsWrapper, + eda_caplog: LogCaptureFixture, +): + """Test start verb when max running activations is reached.""" + apply_settings(settings, MAX_RUNNING_ACTIVATIONS=1) + activation_manager = ActivationManager(basic_activation) + + with pytest.raises(exceptions.MaxRunningProcessesError): + activation_manager.start() + assert "No capacity to start a new rulebook process" in eda_caplog.text diff --git a/tests/integration/tasks/test_orchestrator.py b/tests/integration/tasks/test_orchestrator.py index 69e21f00a..919288fdf 100644 --- a/tests/integration/tasks/test_orchestrator.py +++ b/tests/integration/tasks/test_orchestrator.py @@ -18,17 +18,41 @@ from django.conf import settings import aap_eda.tasks.activation_request_queue as queue -import aap_eda.tasks.orchestrator as orchestrator from aap_eda.core import models from aap_eda.core.enums import ( ActivationRequest, ActivationStatus, ProcessParentType, ) +from aap_eda.tasks import orchestrator + + +@pytest.fixture +def default_rulebook() -> models.Rulebook: + """Return a default rulebook.""" + rulesets = """ +--- +- name: Hello World + hosts: all + sources: + - ansible.eda.range: + limit: 5 + rules: + - name: Say Hello + condition: event.i == 1 + action: + debug: + msg: "Hello World!" + +""" + return models.Rulebook.objects.create( + name="test-rulebook", + rulesets=rulesets, + ) @pytest.fixture() -def activation(): +def activation(default_rulebook): user = models.User.objects.create_user( username="luke.skywalker", first_name="Luke", @@ -36,9 +60,16 @@ def activation(): email="luke.skywalker@example.com", password="secret", ) + decision_environment = models.DecisionEnvironment.objects.create( + name="test-decision-environment", + image_url="localhost:14000/test-image-url", + ) return models.Activation.objects.create( name="test1", user=user, + decision_environment=decision_environment, + rulebook=default_rulebook, + rulebook_rulesets=default_rulebook.rulesets, ) @@ -106,17 +137,15 @@ def test_manage_request(manager_mock, activation, verb): @pytest.mark.django_db -@mock.patch("aap_eda.tasks.orchestrator.ActivationManager") -def test_manage_not_start(manager_mock, activation, max_running_processes): +@mock.patch.object(orchestrator.ActivationManager, "start", autospec=True) +def test_manage_not_start(start_mock, activation, max_running_processes): queue.push( ProcessParentType.ACTIVATION, activation.id, ActivationRequest.START ) - manager_instance_mock = mock.Mock() - manager_mock.return_value = manager_instance_mock orchestrator._manage(ProcessParentType.ACTIVATION, activation.id) - manager_instance_mock.start.assert_not_called() + start_mock.assert_not_called() assert ( len(queue.peek_all(ProcessParentType.ACTIVATION, activation.id)) == 1 ) @@ -194,3 +223,41 @@ def test_monitor_rulebook_processes( orchestrator.monitor_rulebook_processes() enqueue_mock.assert_has_calls(call_args, any_order=True) + + +original_start_method = orchestrator.ActivationManager.start + + +@pytest.mark.django_db +@mock.patch.object(orchestrator.ActivationManager, "start", autospec=True) +def test_max_running_activation_after_start_job( + start_mock, + activation, + max_running_processes, +): + """Check if the max running processes error is handled correctly + when the limit is reached after the request is started.""" + + def side_effect(*args, **kwargs): + # Recreate the process and run the original start method + instance = args[0] + models.RulebookProcess.objects.create( + name="running", + activation=max_running_processes[0].activation, + status=ActivationStatus.RUNNING, + ) + original_start_method(instance, *args[1:], **kwargs) + + start_mock.side_effect = side_effect + + max_running_processes[0].delete() + + queue.push( + ProcessParentType.ACTIVATION, activation.id, ActivationRequest.START + ) + orchestrator._manage(ProcessParentType.ACTIVATION, activation.id) + assert start_mock.call_count == 1 + running_processes = models.RulebookProcess.objects.filter( + status__in=[ActivationStatus.STARTING, ActivationStatus.RUNNING] + ).count() + assert running_processes == settings.MAX_RUNNING_ACTIVATIONS