Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ reroute user services restart via dynamic-scheduler #6943

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ async def stop_dynamic_service(
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def restart_user_services(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
timeout_s: NonNegativeInt,
) -> None:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("restart_user_services"),
node_id=node_id,
timeout_s=timeout_s,
)
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def retrieve_inputs(
rabbitmq_rpc_client: RabbitMQRPCClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ async def stop_dynamic_service(
)


@router.expose()
async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None:
await scheduler_interface.restart_user_services(app, node_id=node_id)


@router.expose()
async def retrieve_inputs(
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ async def list_tracked_dynamic_services(
)
return TypeAdapter(list[DynamicServiceGet]).validate_python(response.json())

async def restart_user_services(self, *, node_id: NodeID) -> None:
await self.thin_client.post_restart(node_id=node_id)

async def update_projects_networks(self, *, project_id: ProjectID) -> None:
await self.thin_client.patch_projects_networks(project_id=project_id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ async def get_dynamic_services(
params=as_dict_exclude_unset(user_id=user_id, project_id=project_id),
)

@retry_on_errors()
@expect_status(status.HTTP_204_NO_CONTENT)
async def post_restart(self, *, node_id: NodeID) -> Response:
return await self.client.post(f"/dynamic_services/{node_id}:restart")

@retry_on_errors()
@expect_status(status.HTTP_204_NO_CONTENT)
async def patch_projects_networks(self, *, project_id: ProjectID) -> Response:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ async def stop_dynamic_service(
await set_request_as_stopped(app, dynamic_service_stop)


async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
await director_v2_client.restart_user_services(node_id=node_id)


async def retrieve_inputs(
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
) -> RetrieveDataOutEnveloped:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,15 +494,34 @@ async def test_stop_dynamic_service_serializes_generic_errors(


@pytest.fixture
def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]:
def mock_director_v2_restart_user_services(node_id: NodeID) -> Iterator[None]:
with respx.mock(
base_url="http://director-v2:8000/v2",
assert_all_called=False,
assert_all_mocked=True, # IMPORTANT: KEEP always True!
) as mock:
request_ok = mock.post(f"/dynamic_services/{node_id}:retrieve")
mock.post(f"/dynamic_services/{node_id}:restart").respond(
status.HTTP_204_NO_CONTENT
)
yield None


async def test_restart_user_services(
mock_director_v2_restart_user_services: None,
rpc_client: RabbitMQRPCClient,
node_id: NodeID,
):
await services.restart_user_services(rpc_client, node_id=node_id, timeout_s=5)

request_ok.respond(

@pytest.fixture
def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]:
with respx.mock(
base_url="http://director-v2:8000/v2",
assert_all_called=False,
assert_all_mocked=True, # IMPORTANT: KEEP always True!
) as mock:
mock.post(f"/dynamic_services/{node_id}:retrieve").respond(
status.HTTP_200_OK,
text=TypeAdapter(RetrieveDataOutEnveloped)
.validate_python(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,6 @@
_log = logging.getLogger(__name__)


@log_decorator(logger=_log)
async def restart_dynamic_service(app: web.Application, node_uuid: str) -> None:
"""User restart the dynamic dynamic service started in the node_uuid

NOTE that this operation will NOT restart all sidecar services
(``simcore-service-dynamic-sidecar`` or ``reverse-proxy caddy`` services) but
ONLY those containers in the compose-spec (i.e. the ones exposed to the user)
"""
settings: DirectorV2Settings = get_plugin_settings(app)
await request_director_v2(
app,
"POST",
url=settings.base_url / f"dynamic_services/{node_uuid}:restart",
expected_status=web.HTTPOk,
timeout=settings.DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT,
)


@log_decorator(logger=_log)
async def get_project_inactivity(
app: web.Application,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
is_pipeline_running,
stop_pipeline,
)
from ._core_dynamic_services import get_project_inactivity, restart_dynamic_service
from ._core_dynamic_services import get_project_inactivity
from ._core_utils import is_healthy
from .exceptions import DirectorServiceError

Expand All @@ -32,7 +32,6 @@
"get_project_run_policy",
"is_healthy",
"is_pipeline_running",
"restart_dynamic_service",
"set_project_run_policy",
"stop_pipeline",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ def base_url(self) -> URL:
# - Mostly in floats (aiohttp.Client/) but sometimes in ints
# - Typically in seconds but occasionally in ms

DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT: PositiveInt = Field(
1 * _MINUTE,
description="timeout of containers restart",
validation_alias=AliasChoices(
"DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT",
),
)

DIRECTOR_V2_STORAGE_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: PositiveInt = Field(
_HOUR,
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,23 @@ async def stop_dynamic_services_in_project(
await logged_gather(*services_to_stop)


async def restart_user_services(app: web.Application, *, node_id: NodeID) -> None:
"""Restarts the user service(s) started by the the node_uuid's sidecar

NOTE: this operation will NOT restart
sidecar services (``dy-sidecar`` or ``dy-proxy`` services),
but ONLY user services (the ones defined by the compose spec).
"""
settings: DynamicSchedulerSettings = get_plugin_settings(app)
await services.restart_user_services(
get_rabbitmq_rpc_client(app),
node_id=node_id,
timeout_s=int(
settings.DYNAMIC_SCHEDULER_RESTART_USER_SERVICES_TIMEOUT.total_seconds()
),
)


async def retrieve_inputs(
app: web.Application, node_id: NodeID, port_keys: list[ServicePortKey]
) -> RetrieveDataOutEnveloped:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class DynamicSchedulerSettings(BaseCustomSettings, MixinServiceSettings):
),
)

DYNAMIC_SCHEDULER_RESTART_USER_SERVICES_TIMEOUT: datetime.timedelta = Field(
datetime.timedelta(minutes=1), description="timeout for user services restart"
)

DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field(
datetime.timedelta(hours=1),
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@

from .._meta import API_VTAG as VTAG
from ..catalog import client as catalog_client
from ..director_v2 import api as director_v2_api
from ..dynamic_scheduler import api as dynamic_scheduler_api
from ..groups.api import get_group_from_gid, list_all_user_groups_ids
from ..groups.exceptions import GroupNotFoundError
Expand Down Expand Up @@ -411,7 +410,9 @@ async def restart_node(request: web.Request) -> web.Response:

path_params = parse_request_path_parameters_as(NodePathParams, request)

await director_v2_api.restart_dynamic_service(request.app, f"{path_params.node_id}")
await dynamic_scheduler_api.restart_user_services(
request.app, node_id=path_params.node_id
)

return web.json_response(status=status.HTTP_204_NO_CONTENT)

Expand Down
Loading