diff --git a/.env-devel b/.env-devel index 978062f428b..3f621100b31 100644 --- a/.env-devel +++ b/.env-devel @@ -105,6 +105,7 @@ DYNAMIC_SIDECAR_LOG_LEVEL=DEBUG DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS=[] DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS={} DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT=3600 +DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD=0.8 # DIRECTOR_V2 ---- DYNAMIC_SCHEDULER_LOGLEVEL=DEBUG DYNAMIC_SCHEDULER_PROFILING=1 diff --git a/packages/service-library/src/servicelib/async_utils.py b/packages/service-library/src/servicelib/async_utils.py index 3385ad5820e..b30bbb18df9 100644 --- a/packages/service-library/src/servicelib/async_utils.py +++ b/packages/service-library/src/servicelib/async_utils.py @@ -1,14 +1,18 @@ import asyncio import logging -from collections import deque +from collections.abc import Awaitable, Callable from contextlib import suppress from dataclasses import dataclass +from datetime import timedelta from functools import wraps -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Deque +from typing import TYPE_CHECKING, Any, Coroutine, Final, TypeVar + +from pydantic import NonNegativeFloat +from servicelib.background_task import cancel_task from .utils_profiling_middleware import dont_profile, is_profiling, profile_context -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) if TYPE_CHECKING: Queue = asyncio.Queue @@ -51,7 +55,7 @@ async def _safe_cancel(context: Context) -> None: await context.task except RuntimeError as e: if "Event loop is closed" in f"{e}": - logger.warning("event loop is closed and could not cancel %s", context) + _logger.warning("event loop is closed and could not cancel %s", context) else: raise @@ -62,7 +66,7 @@ async def cancel_sequential_workers() -> None: await _safe_cancel(context) _sequential_jobs_contexts.clear() - logger.info("All run_sequentially_in_context pending workers stopped") + _logger.info("All run_sequentially_in_context pending workers stopped") # NOTE: If you get funny mismatches with mypy in returned values it might be due to this decorator. @@ -118,22 +122,22 @@ def _get_context(args: Any, kwargs: dict) -> Context: search_args = dict(zip(arg_names, args)) search_args.update(kwargs) - key_parts: Deque[str] = deque() + key_parts: list[str] = [] for arg in target_args: sub_args = arg.split(".") main_arg = sub_args[0] if main_arg not in search_args: - raise ValueError( + msg = ( f"Expected '{main_arg}' in '{decorated_function.__name__}'" f" arguments. Got '{search_args}'" ) + raise ValueError(msg) context_key = search_args[main_arg] for attribute in sub_args[1:]: potential_key = getattr(context_key, attribute) if not potential_key: - raise ValueError( - f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments." - ) + msg = f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments." + raise ValueError(msg) context_key = potential_key key_parts.append(f"{decorated_function.__name__}_{context_key}") @@ -200,3 +204,35 @@ async def worker(in_q: Queue[QueueElement], out_q: Queue) -> None: return wrapper return decorator + + +T = TypeVar("T") +_CANCELLATION_TIMEOUT: Final[NonNegativeFloat] = 0.1 + + +async def _monitor_task( + notification_hook: Callable[[], Awaitable[None]], notify_after: timedelta +) -> None: + await asyncio.sleep(notify_after.total_seconds()) + await notification_hook() + + +async def notify_when_over_threshold( + task: Coroutine[Any, Any, T], + *, + notification_hook: Callable[[], Awaitable[None]], + notify_after: timedelta, +) -> T: + monitor_task = asyncio.create_task(_monitor_task(notification_hook, notify_after)) + + try: + result = await task + await cancel_task(monitor_task, timeout=_CANCELLATION_TIMEOUT) + except asyncio.CancelledError: + await cancel_task(monitor_task, timeout=_CANCELLATION_TIMEOUT) + raise + except Exception: + await cancel_task(monitor_task, timeout=_CANCELLATION_TIMEOUT) + raise + + return result diff --git a/packages/service-library/tests/test_async_utils.py b/packages/service-library/tests/test_async_utils.py index 902cbcc9b82..80be2057805 100644 --- a/packages/service-library/tests/test_async_utils.py +++ b/packages/service-library/tests/test_async_utils.py @@ -7,13 +7,16 @@ import random from collections import deque from dataclasses import dataclass +from datetime import timedelta from time import time from typing import Any +from unittest.mock import Mock import pytest from faker import Faker from servicelib.async_utils import ( _sequential_jobs_contexts, + notify_when_over_threshold, run_sequentially_in_context, ) @@ -224,3 +227,64 @@ async def test_multiple_context_calls(context_param: int) -> int: assert i == await test_multiple_context_calls(i) assert len(_sequential_jobs_contexts) == RETRIES + + +async def test_notify_when_over_threshold(): + + notification_spy = Mock() + + async def notification() -> None: + notification_spy() + print("notified") + + async def _worker( + *, sleep_for: float, raise_error: type[BaseException] | None = None + ) -> float: + await asyncio.sleep(sleep_for) + + if raise_error: + raise raise_error + + return sleep_for + + # 1. finish after + result = await notify_when_over_threshold( + _worker(sleep_for=0.5), + notification_hook=notification, + notify_after=timedelta(seconds=0.1), + ) + assert isinstance(result, float) + assert result == 0.5 + assert notification_spy.call_count == 1 + + # 2. finish before + notification_spy.reset_mock() + await notify_when_over_threshold( + _worker(sleep_for=0.1), + notification_hook=notification, + notify_after=timedelta(seconds=0.2), + ) + await asyncio.sleep(0.2) + assert notification_spy.call_count == 0 + + # 3. raise error before notification + for notification_type in (RuntimeError, asyncio.CancelledError): + notification_spy.reset_mock() + with pytest.raises(notification_type): + await notify_when_over_threshold( + _worker(sleep_for=0, raise_error=notification_type), + notification_hook=notification, + notify_after=timedelta(seconds=0.2), + ) + assert notification_spy.call_count == 0 + + # 4. raise after notification + for notification_type in (RuntimeError, asyncio.CancelledError): + notification_spy.reset_mock() + with pytest.raises(notification_type): + await notify_when_over_threshold( + _worker(sleep_for=0.2, raise_error=notification_type), + notification_hook=notification, + notify_after=timedelta(seconds=0.1), + ) + assert notification_spy.call_count == 1 diff --git a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py index 831c7df2f18..dbee9e80d08 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py @@ -111,6 +111,19 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings): ), ) + DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD: float = Field( + 0.8, + gt=0, + le=1, + description=( + "threshold used to figure out when to send out a notification that it's" + "taking too much time to recover the states for the service. " + "The message should inform the user that if adding more " + "data they will not be able to open the service since the platform " + "will timeout the recovery operation after a certain amount of time" + ), + ) + DYNAMIC_SIDECAR_API_USER_SERVICES_PULLING_TIMEOUT: PositiveFloat = Field( 60.0 * _MINUTE, description="before starting the user services pull all the images in parallel", @@ -166,3 +179,11 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings): DIRECTOR_V2_DYNAMIC_SIDECAR_SLEEP_AFTER_CONTAINER_REMOVAL: timedelta = Field( timedelta(0), description="time to sleep before removing a container" ) + + @property + def states_restore_notification_timeout(self) -> timedelta: + """threshold to notify frontend about the size of the workspace getting too big""" + return timedelta( + seconds=self.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT + * self.DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD + ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index 3071cde1060..f9cbd3500ed 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -19,6 +19,7 @@ from models_library.sidecar_volumes import VolumeCategory, VolumeStatus from models_library.user_preferences import FrontendUserPreference from models_library.users import UserID +from servicelib.async_utils import notify_when_over_threshold from servicelib.fastapi.http_client_thin import BaseHttpClientError from servicelib.fastapi.long_running_tasks.client import ( ProgressCallback, @@ -449,6 +450,7 @@ async def prepare_services_environment( app: FastAPI, scheduler_data: SchedulerData ) -> None: app_settings: AppSettings = app.state.settings + dynamic_sidecar_settings = app_settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER sidecars_client = await get_sidecars_client(app, scheduler_data.node_uuid) dynamic_sidecar_endpoint = scheduler_data.endpoint @@ -502,8 +504,18 @@ async def _pull_user_services_images_with_metrics() -> None: ) async def _restore_service_state_with_metrics() -> None: + async def notify_frontend() -> None: + # TODO: finish implementation below + _logger.debug( + f"Notify {scheduler_data.node_uuid=}, {scheduler_data.project_id=} went over the time threshold" + ) + with track_duration() as duration: - size = await sidecars_client.restore_service_state(dynamic_sidecar_endpoint) + size = await notify_when_over_threshold( + sidecars_client.restore_service_state(dynamic_sidecar_endpoint), + notification_hook=notify_frontend, + notify_after=dynamic_sidecar_settings.states_restore_notification_timeout, + ) if size and size > 0: get_instrumentation( diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 45e843ad712..593fde43a7f 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -325,6 +325,7 @@ services: DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS: ${DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS} DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS: ${DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS} DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT} + DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD: ${DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD} LOG_FORMAT_LOCAL_DEV_ENABLED: ${LOG_FORMAT_LOCAL_DEV_ENABLED} LOG_FILTER_MAPPING : ${LOG_FILTER_MAPPING}