diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py index b92b608dfb7..f58500d771e 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py @@ -99,6 +99,22 @@ async def stop_dynamic_service( assert result is None # nosec +@log_decorator(_logger, level=logging.DEBUG) +async def restart_user_services( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + node_id: NodeID, + timeout_s: NonNegativeInt, +) -> None: + result = await rabbitmq_rpc_client.request( + DYNAMIC_SCHEDULER_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python("restart_user_services"), + node_id=node_id, + timeout_s=timeout_s, + ) + assert result is None # nosec + + @log_decorator(_logger, level=logging.DEBUG) async def retrieve_inputs( rabbitmq_rpc_client: RabbitMQRPCClient, diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py index 63e0fce8391..a0bbed6b110 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py @@ -62,6 +62,11 @@ async def stop_dynamic_service( ) +@router.expose() +async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None: + await scheduler_interface.restart_user_services(app, node_id=node_id) + + @router.expose() async def retrieve_inputs( app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py index a9267bf0402..2841eb5f467 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py @@ -125,6 +125,9 @@ async def list_tracked_dynamic_services( ) return TypeAdapter(list[DynamicServiceGet]).validate_python(response.json()) + async def restart_user_services(self, *, node_id: NodeID) -> None: + await self.thin_client.post_restart(node_id=node_id) + async def update_projects_networks(self, *, project_id: ProjectID) -> None: await self.thin_client.patch_projects_networks(project_id=project_id) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py index 9c57dfaaca6..1b86d604148 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py @@ -142,6 +142,11 @@ async def get_dynamic_services( params=as_dict_exclude_unset(user_id=user_id, project_id=project_id), ) + @retry_on_errors() + @expect_status(status.HTTP_204_NO_CONTENT) + async def post_restart(self, *, node_id: NodeID) -> Response: + return await self.client.post(f"/dynamic_services/{node_id}:restart") + @retry_on_errors() @expect_status(status.HTTP_204_NO_CONTENT) async def patch_projects_networks(self, *, project_id: ProjectID) -> Response: diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py index 1cfd3638229..27e854ae6db 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py @@ -79,6 +79,15 @@ async def stop_dynamic_service( await set_request_as_stopped(app, dynamic_service_stop) +async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None: + settings: ApplicationSettings = app.state.settings + if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: + raise NotImplementedError + + director_v2_client = DirectorV2Client.get_from_app_state(app) + await director_v2_client.restart_user_services(node_id=node_id) + + async def retrieve_inputs( app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] ) -> RetrieveDataOutEnveloped: diff --git a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py index 08c9692c06d..6455ca367f5 100644 --- a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py +++ b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py @@ -494,15 +494,34 @@ async def test_stop_dynamic_service_serializes_generic_errors( @pytest.fixture -def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]: +def mock_director_v2_restart_user_services(node_id: NodeID) -> Iterator[None]: with respx.mock( base_url="http://director-v2:8000/v2", assert_all_called=False, assert_all_mocked=True, # IMPORTANT: KEEP always True! ) as mock: - request_ok = mock.post(f"/dynamic_services/{node_id}:retrieve") + mock.post(f"/dynamic_services/{node_id}:restart").respond( + status.HTTP_204_NO_CONTENT + ) + yield None + + +async def test_restart_user_services( + mock_director_v2_restart_user_services: None, + rpc_client: RabbitMQRPCClient, + node_id: NodeID, +): + await services.restart_user_services(rpc_client, node_id=node_id, timeout_s=5) - request_ok.respond( + +@pytest.fixture +def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]: + with respx.mock( + base_url="http://director-v2:8000/v2", + assert_all_called=False, + assert_all_mocked=True, # IMPORTANT: KEEP always True! + ) as mock: + mock.post(f"/dynamic_services/{node_id}:retrieve").respond( status.HTTP_200_OK, text=TypeAdapter(RetrieveDataOutEnveloped) .validate_python( diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py b/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py index ef25109628d..5f9f3f3aad8 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py @@ -18,24 +18,6 @@ _log = logging.getLogger(__name__) -@log_decorator(logger=_log) -async def restart_dynamic_service(app: web.Application, node_uuid: str) -> None: - """User restart the dynamic dynamic service started in the node_uuid - - NOTE that this operation will NOT restart all sidecar services - (``simcore-service-dynamic-sidecar`` or ``reverse-proxy caddy`` services) but - ONLY those containers in the compose-spec (i.e. the ones exposed to the user) - """ - settings: DirectorV2Settings = get_plugin_settings(app) - await request_director_v2( - app, - "POST", - url=settings.base_url / f"dynamic_services/{node_uuid}:restart", - expected_status=web.HTTPOk, - timeout=settings.DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT, - ) - - @log_decorator(logger=_log) async def get_project_inactivity( app: web.Application, diff --git a/services/web/server/src/simcore_service_webserver/director_v2/api.py b/services/web/server/src/simcore_service_webserver/director_v2/api.py index 1eb7721b336..9c070e7cae9 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/api.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/api.py @@ -16,7 +16,7 @@ is_pipeline_running, stop_pipeline, ) -from ._core_dynamic_services import get_project_inactivity, restart_dynamic_service +from ._core_dynamic_services import get_project_inactivity from ._core_utils import is_healthy from .exceptions import DirectorServiceError @@ -32,7 +32,6 @@ "get_project_run_policy", "is_healthy", "is_pipeline_running", - "restart_dynamic_service", "set_project_run_policy", "stop_pipeline", ) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/settings.py b/services/web/server/src/simcore_service_webserver/director_v2/settings.py index 21cb368ff50..31fc096a5dd 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/settings.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/settings.py @@ -33,14 +33,6 @@ def base_url(self) -> URL: # - Mostly in floats (aiohttp.Client/) but sometimes in ints # - Typically in seconds but occasionally in ms - DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT: PositiveInt = Field( - 1 * _MINUTE, - description="timeout of containers restart", - validation_alias=AliasChoices( - "DIRECTOR_V2_RESTART_DYNAMIC_SERVICE_TIMEOUT", - ), - ) - DIRECTOR_V2_STORAGE_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: PositiveInt = Field( _HOUR, description=( diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py index 2a2c5b8c1da..bdfd22c3c1d 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py @@ -154,6 +154,23 @@ async def stop_dynamic_services_in_project( await logged_gather(*services_to_stop) +async def restart_user_services(app: web.Application, *, node_id: NodeID) -> None: + """Restarts the user service(s) started by the the node_uuid's sidecar + + NOTE: this operation will NOT restart + sidecar services (``dy-sidecar`` or ``dy-proxy`` services), + but ONLY user services (the ones defined by the compose spec). + """ + settings: DynamicSchedulerSettings = get_plugin_settings(app) + await services.restart_user_services( + get_rabbitmq_rpc_client(app), + node_id=node_id, + timeout_s=int( + settings.DYNAMIC_SCHEDULER_RESTART_USER_SERVICES_TIMEOUT.total_seconds() + ), + ) + + async def retrieve_inputs( app: web.Application, node_id: NodeID, port_keys: list[ServicePortKey] ) -> RetrieveDataOutEnveloped: diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py index 9d34fa378cc..5f33995a89e 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py @@ -26,6 +26,10 @@ class DynamicSchedulerSettings(BaseCustomSettings, MixinServiceSettings): ), ) + DYNAMIC_SCHEDULER_RESTART_USER_SERVICES_TIMEOUT: datetime.timedelta = Field( + datetime.timedelta(minutes=1), description="timeout for user services restart" + ) + DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field( datetime.timedelta(hours=1), description=( diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py index 4aaf99e3ba5..9ddd88c0df1 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py @@ -62,7 +62,6 @@ from .._meta import API_VTAG as VTAG from ..catalog import client as catalog_client -from ..director_v2 import api as director_v2_api from ..dynamic_scheduler import api as dynamic_scheduler_api from ..groups.api import get_group_from_gid, list_all_user_groups_ids from ..groups.exceptions import GroupNotFoundError @@ -411,7 +410,9 @@ async def restart_node(request: web.Request) -> web.Response: path_params = parse_request_path_parameters_as(NodePathParams, request) - await director_v2_api.restart_dynamic_service(request.app, f"{path_params.node_id}") + await dynamic_scheduler_api.restart_user_services( + request.app, node_id=path_params.node_id + ) return web.json_response(status=status.HTTP_204_NO_CONTENT)