Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inform users when their workspace is getting too big ⚠️ #6674

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ DYNAMIC_SIDECAR_LOG_LEVEL=DEBUG
DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS=[]
DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS={}
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT=3600
DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD=0.8
# DIRECTOR_V2 ----
DYNAMIC_SCHEDULER_LOGLEVEL=DEBUG
DYNAMIC_SCHEDULER_PROFILING=1
Expand Down
56 changes: 46 additions & 10 deletions packages/service-library/src/servicelib/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import asyncio
import logging
from collections import deque
from collections.abc import Awaitable, Callable
from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta
from functools import wraps
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Deque
from typing import TYPE_CHECKING, Any, Coroutine, Final, TypeVar

from pydantic import NonNegativeFloat
from servicelib.background_task import cancel_task

from .utils_profiling_middleware import dont_profile, is_profiling, profile_context

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)

if TYPE_CHECKING:
Queue = asyncio.Queue
Expand Down Expand Up @@ -51,7 +55,7 @@ async def _safe_cancel(context: Context) -> None:
await context.task
except RuntimeError as e:
if "Event loop is closed" in f"{e}":
logger.warning("event loop is closed and could not cancel %s", context)
_logger.warning("event loop is closed and could not cancel %s", context)
else:
raise

Expand All @@ -62,7 +66,7 @@ async def cancel_sequential_workers() -> None:
await _safe_cancel(context)

_sequential_jobs_contexts.clear()
logger.info("All run_sequentially_in_context pending workers stopped")
_logger.info("All run_sequentially_in_context pending workers stopped")


# NOTE: If you get funny mismatches with mypy in returned values it might be due to this decorator.
Expand Down Expand Up @@ -118,22 +122,22 @@ def _get_context(args: Any, kwargs: dict) -> Context:
search_args = dict(zip(arg_names, args))
search_args.update(kwargs)

key_parts: Deque[str] = deque()
key_parts: list[str] = []
for arg in target_args:
sub_args = arg.split(".")
main_arg = sub_args[0]
if main_arg not in search_args:
raise ValueError(
msg = (
f"Expected '{main_arg}' in '{decorated_function.__name__}'"
f" arguments. Got '{search_args}'"
)
raise ValueError(msg)
context_key = search_args[main_arg]
for attribute in sub_args[1:]:
potential_key = getattr(context_key, attribute)
if not potential_key:
raise ValueError(
f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments."
)
msg = f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments."
raise ValueError(msg)
context_key = potential_key

key_parts.append(f"{decorated_function.__name__}_{context_key}")
Expand Down Expand Up @@ -200,3 +204,35 @@ async def worker(in_q: Queue[QueueElement], out_q: Queue) -> None:
return wrapper

return decorator


T = TypeVar("T")
_CANCELLATION_TIMEOUT: Final[NonNegativeFloat] = 0.1


async def _monitor_task(
notification_hook: Callable[[], Awaitable[None]], notify_after: timedelta
) -> None:
await asyncio.sleep(notify_after.total_seconds())
await notification_hook()


async def notify_when_over_threshold(
task: Coroutine[Any, Any, T],
*,
notification_hook: Callable[[], Awaitable[None]],
notify_after: timedelta,
) -> T:
monitor_task = asyncio.create_task(_monitor_task(notification_hook, notify_after))

try:
result = await task
await cancel_task(monitor_task, timeout=_CANCELLATION_TIMEOUT)
except asyncio.CancelledError:
await cancel_task(monitor_task, timeout=_CANCELLATION_TIMEOUT)
raise
except Exception:
await cancel_task(monitor_task, timeout=_CANCELLATION_TIMEOUT)
raise

return result
64 changes: 64 additions & 0 deletions packages/service-library/tests/test_async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import random
from collections import deque
from dataclasses import dataclass
from datetime import timedelta
from time import time
from typing import Any
from unittest.mock import Mock

import pytest
from faker import Faker
from servicelib.async_utils import (
_sequential_jobs_contexts,
notify_when_over_threshold,
run_sequentially_in_context,
)

Expand Down Expand Up @@ -224,3 +227,64 @@ async def test_multiple_context_calls(context_param: int) -> int:
assert i == await test_multiple_context_calls(i)

assert len(_sequential_jobs_contexts) == RETRIES


async def test_notify_when_over_threshold():

notification_spy = Mock()

async def notification() -> None:
notification_spy()
print("notified")

async def _worker(
*, sleep_for: float, raise_error: type[BaseException] | None = None
) -> float:
await asyncio.sleep(sleep_for)

if raise_error:
raise raise_error

return sleep_for

# 1. finish after
result = await notify_when_over_threshold(
_worker(sleep_for=0.5),
notification_hook=notification,
notify_after=timedelta(seconds=0.1),
)
assert isinstance(result, float)
assert result == 0.5
assert notification_spy.call_count == 1

# 2. finish before
notification_spy.reset_mock()
await notify_when_over_threshold(
_worker(sleep_for=0.1),
notification_hook=notification,
notify_after=timedelta(seconds=0.2),
)
await asyncio.sleep(0.2)
assert notification_spy.call_count == 0

# 3. raise error before notification
for notification_type in (RuntimeError, asyncio.CancelledError):
notification_spy.reset_mock()
with pytest.raises(notification_type):
await notify_when_over_threshold(
_worker(sleep_for=0, raise_error=notification_type),
notification_hook=notification,
notify_after=timedelta(seconds=0.2),
)
assert notification_spy.call_count == 0

# 4. raise after notification
for notification_type in (RuntimeError, asyncio.CancelledError):
notification_spy.reset_mock()
with pytest.raises(notification_type):
await notify_when_over_threshold(
_worker(sleep_for=0.2, raise_error=notification_type),
notification_hook=notification,
notify_after=timedelta(seconds=0.1),
)
assert notification_spy.call_count == 1
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings):
),
)

DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD: float = Field(
0.8,
gt=0,
le=1,
description=(
"threshold used to figure out when to send out a notification that it's"
"taking too much time to recover the states for the service. "
"The message should inform the user that if adding more "
"data they will not be able to open the service since the platform "
"will timeout the recovery operation after a certain amount of time"
),
)

DYNAMIC_SIDECAR_API_USER_SERVICES_PULLING_TIMEOUT: PositiveFloat = Field(
60.0 * _MINUTE,
description="before starting the user services pull all the images in parallel",
Expand Down Expand Up @@ -166,3 +179,11 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings):
DIRECTOR_V2_DYNAMIC_SIDECAR_SLEEP_AFTER_CONTAINER_REMOVAL: timedelta = Field(
timedelta(0), description="time to sleep before removing a container"
)

@property
def states_restore_notification_timeout(self) -> timedelta:
"""threshold to notify frontend about the size of the workspace getting too big"""
return timedelta(
seconds=self.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT
* self.DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from models_library.sidecar_volumes import VolumeCategory, VolumeStatus
from models_library.user_preferences import FrontendUserPreference
from models_library.users import UserID
from servicelib.async_utils import notify_when_over_threshold
from servicelib.fastapi.http_client_thin import BaseHttpClientError
from servicelib.fastapi.long_running_tasks.client import (
ProgressCallback,
Expand Down Expand Up @@ -449,6 +450,7 @@ async def prepare_services_environment(
app: FastAPI, scheduler_data: SchedulerData
) -> None:
app_settings: AppSettings = app.state.settings
dynamic_sidecar_settings = app_settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER
sidecars_client = await get_sidecars_client(app, scheduler_data.node_uuid)
dynamic_sidecar_endpoint = scheduler_data.endpoint

Expand Down Expand Up @@ -502,8 +504,18 @@ async def _pull_user_services_images_with_metrics() -> None:
)

async def _restore_service_state_with_metrics() -> None:
async def notify_frontend() -> None:
# TODO: finish implementation below
_logger.debug(
f"Notify {scheduler_data.node_uuid=}, {scheduler_data.project_id=} went over the time threshold"
)

with track_duration() as duration:
size = await sidecars_client.restore_service_state(dynamic_sidecar_endpoint)
size = await notify_when_over_threshold(
sidecars_client.restore_service_state(dynamic_sidecar_endpoint),
notification_hook=notify_frontend,
notify_after=dynamic_sidecar_settings.states_restore_notification_timeout,
)

if size and size > 0:
get_instrumentation(
Expand Down
1 change: 1 addition & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ services:
DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS: ${DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS}
DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS: ${DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS}
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT}
DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD: ${DYNAMIC_SIDECAR_API_STATES_RESTORE_NOTIFICATION_THRESHOLD}

LOG_FORMAT_LOCAL_DEV_ENABLED: ${LOG_FORMAT_LOCAL_DEV_ENABLED}
LOG_FILTER_MAPPING : ${LOG_FILTER_MAPPING}
Expand Down
Loading