diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py index 62cc42d91ba..b92b608dfb7 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py @@ -1,7 +1,10 @@ import logging from typing import Final -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, @@ -11,6 +14,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_basic_types import RPCMethodName +from models_library.services_types import ServicePortKey from models_library.users import UserID from pydantic import NonNegativeInt, TypeAdapter from servicelib.logging_utils import log_decorator @@ -95,6 +99,25 @@ async def stop_dynamic_service( assert result is None # nosec +@log_decorator(_logger, level=logging.DEBUG) +async def retrieve_inputs( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + node_id: NodeID, + port_keys: list[ServicePortKey], + timeout_s: NonNegativeInt, +) -> RetrieveDataOutEnveloped: + result = await rabbitmq_rpc_client.request( + DYNAMIC_SCHEDULER_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_inputs"), + node_id=node_id, + port_keys=port_keys, + timeout_s=timeout_s, + ) + assert isinstance(result, RetrieveDataOutEnveloped) # nosec + return result + + @log_decorator(_logger, level=logging.DEBUG) async def update_projects_networks( rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID diff --git a/services/director/src/simcore_service_director/producer.py b/services/director/src/simcore_service_director/producer.py index 907e7a8e04e..81d1accf23d 100644 --- a/services/director/src/simcore_service_director/producer.py +++ b/services/director/src/simcore_service_director/producer.py @@ -1078,7 +1078,7 @@ async def _get_node_details( async def get_services_details( - app: FastAPI, user_id: str | None, study_id: str | None + app: FastAPI, user_id: str | None, project_id: str | None ) -> list[dict]: app_settings = get_application_settings(app) async with docker_utils.docker_client() as client: # pylint: disable=not-async-context-manager @@ -1091,9 +1091,10 @@ async def get_services_details( filters.append( f"{_to_simcore_runtime_docker_label_key('user_id')}=" + user_id ) - if study_id: + if project_id: filters.append( - f"{_to_simcore_runtime_docker_label_key('project_id')}=" + study_id + f"{_to_simcore_runtime_docker_label_key('project_id')}=" + + project_id ) list_running_services = await client.services.list( filters={"label": filters} @@ -1104,7 +1105,7 @@ async def get_services_details( for service in list_running_services ] except aiodocker.DockerError as err: - msg = f"Error while accessing container for {user_id=}, {study_id=}" + msg = f"Error while accessing container for {user_id=}, {project_id=}" raise GenericDockerError(err=msg) from err diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 265d29e56ed..766c117244d 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -568,6 +568,7 @@ services: DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING} DYNAMIC_SCHEDULER_TRACING: ${DYNAMIC_SCHEDULER_TRACING} DYNAMIC_SCHEDULER_UI_STORAGE_SECRET: ${DYNAMIC_SCHEDULER_UI_STORAGE_SECRET} + DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT} TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT} TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT} static-webserver: diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py index 9bba60a19f5..63e0fce8391 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py @@ -1,5 +1,8 @@ from fastapi import FastAPI -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -7,6 +10,7 @@ from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from models_library.users import UserID from servicelib.rabbitmq import RPCRouter from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import ( @@ -58,6 +62,15 @@ async def stop_dynamic_service( ) +@router.expose() +async def retrieve_inputs( + app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] +) -> RetrieveDataOutEnveloped: + return await scheduler_interface.retrieve_inputs( + app, node_id=node_id, port_keys=port_keys + ) + + @router.expose() async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None: await scheduler_interface.update_projects_networks(app, project_id=project_id) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py index 36be9f4b587..e44ec885ed3 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py @@ -62,12 +62,26 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: datetime.timedelta = Field( default=datetime.timedelta(minutes=60), + validation_alias=AliasChoices( + "DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT", + "DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT", + ), description=( "Time to wait before timing out when stopping a dynamic service. " "Since services require data to be stopped, this operation is timed out after 1 hour" ), ) + DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field( + default=datetime.timedelta(minutes=60), + description=( + "When dynamic services upload and download data from storage, " + "sometimes very big payloads are involved. In order to handle " + "such payloads it is required to have long timeouts which " + "allow the service to finish the operation." + ), + ) + DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field( default=False, description=( diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py index 52134837775..a9267bf0402 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py @@ -2,13 +2,17 @@ from typing import Any from fastapi import FastAPI, status -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, ) from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from models_library.users import UserID from pydantic import TypeAdapter from servicelib.fastapi.app_state import SingletonInAppStateMixin @@ -75,7 +79,7 @@ async def stop_dynamic_service( node_id: NodeID, simcore_user_agent: str, save_state: bool, - timeout: datetime.timedelta, # noqa: ASYNC109 + timeout: datetime.timedelta # noqa: ASYNC109 ) -> None: try: await self.thin_client.delete_dynamic_service( @@ -100,6 +104,19 @@ async def stop_dynamic_service( raise + async def retrieve_inputs( + self, + *, + node_id: NodeID, + port_keys: list[ServicePortKey], + timeout: datetime.timedelta # noqa: ASYNC109 + ) -> RetrieveDataOutEnveloped: + response = await self.thin_client.dynamic_service_retrieve( + node_id=node_id, port_keys=port_keys, timeout=timeout + ) + dict_response: dict[str, Any] = response.json() + return TypeAdapter(RetrieveDataOutEnveloped).validate_python(dict_response) + async def list_tracked_dynamic_services( self, *, user_id: UserID | None = None, project_id: ProjectID | None = None ) -> list[DynamicServiceGet]: diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py index 39fce3741f3..9c57dfaaca6 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py @@ -11,6 +11,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.services_resources import ServiceResourcesDictHelpers +from models_library.services_types import ServicePortKey from models_library.users import UserID from servicelib.common_headers import ( X_DYNAMIC_SIDECAR_REQUEST_DNS, @@ -91,7 +92,7 @@ async def delete_dynamic_service( node_id: NodeID, simcore_user_agent: str, save_state: bool, - timeout: datetime.timedelta, + timeout: datetime.timedelta, # noqa: ASYNC109 ) -> Response: @retry_on_errors(total_retry_timeout_overwrite=timeout.total_seconds()) @expect_status(status.HTTP_204_NO_CONTENT) @@ -112,6 +113,22 @@ async def _( return await _(self) + @retry_on_errors() + @expect_status(status.HTTP_200_OK) + async def dynamic_service_retrieve( + self, + *, + node_id: NodeID, + port_keys: list[ServicePortKey], + timeout: datetime.timedelta, # noqa: ASYNC109 + ) -> Response: + post_data = {"port_keys": port_keys} + return await self.client.post( + f"/dynamic_services/{node_id}:retrieve", + content=json_dumps(post_data), + timeout=timeout.total_seconds(), + ) + @retry_on_errors() @expect_status(status.HTTP_200_OK) async def get_dynamic_services( diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py index 382030a0489..1cfd3638229 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py @@ -1,5 +1,8 @@ from fastapi import FastAPI -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -7,6 +10,7 @@ from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from models_library.users import UserID from ..core.settings import ApplicationSettings @@ -75,6 +79,21 @@ async def stop_dynamic_service( await set_request_as_stopped(app, dynamic_service_stop) +async def retrieve_inputs( + app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] +) -> RetrieveDataOutEnveloped: + settings: ApplicationSettings = app.state.settings + if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: + raise NotImplementedError + + director_v2_client = DirectorV2Client.get_from_app_state(app) + return await director_v2_client.retrieve_inputs( + node_id=node_id, + port_keys=port_keys, + timeout=settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT, + ) + + async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None: settings: ApplicationSettings = app.state.settings if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: diff --git a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py index cb4dd981d19..08c9692c06d 100644 --- a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py +++ b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py @@ -9,7 +9,10 @@ from faker import Faker from fastapi import FastAPI, status from fastapi.encoders import jsonable_encoder -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -55,14 +58,14 @@ def node_not_found(faker: Faker) -> NodeID: @pytest.fixture def service_status_new_style() -> DynamicServiceGet: return TypeAdapter(DynamicServiceGet).validate_python( - DynamicServiceGet.model_config["json_schema_extra"]["examples"][1] + DynamicServiceGet.model_json_schema()["examples"][1] ) @pytest.fixture def service_status_legacy() -> NodeGet: return TypeAdapter(NodeGet).validate_python( - NodeGet.model_config["json_schema_extra"]["examples"][1] + NodeGet.model_json_schema()["examples"][1] ) @@ -112,9 +115,7 @@ def mock_director_v2_service_state( mock.get("/dynamic_services").respond( status.HTTP_200_OK, text=json.dumps( - jsonable_encoder( - DynamicServiceGet.model_config["json_schema_extra"]["examples"] - ) + jsonable_encoder(DynamicServiceGet.model_json_schema()["examples"]) ), ) @@ -193,7 +194,7 @@ async def test_list_tracked_dynamic_services(rpc_client: RabbitMQRPCClient): assert len(results) == 2 assert results == [ TypeAdapter(DynamicServiceGet).validate_python(x) - for x in DynamicServiceGet.model_config["json_schema_extra"]["examples"] + for x in DynamicServiceGet.model_json_schema()["examples"] ] @@ -223,7 +224,7 @@ async def test_get_state( def dynamic_service_start() -> DynamicServiceStart: # one for legacy and one for new style? return TypeAdapter(DynamicServiceStart).validate_python( - DynamicServiceStart.model_config["json_schema_extra"]["example"] + DynamicServiceStart.model_json_schema()["example"] ) @@ -492,6 +493,41 @@ async def test_stop_dynamic_service_serializes_generic_errors( ) +@pytest.fixture +def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]: + with respx.mock( + base_url="http://director-v2:8000/v2", + assert_all_called=False, + assert_all_mocked=True, # IMPORTANT: KEEP always True! + ) as mock: + request_ok = mock.post(f"/dynamic_services/{node_id}:retrieve") + + request_ok.respond( + status.HTTP_200_OK, + text=TypeAdapter(RetrieveDataOutEnveloped) + .validate_python( + RetrieveDataOutEnveloped.model_json_schema()["examples"][0] + ) + .model_dump_json(), + ) + + yield None + + +async def test_retrieve_inputs( + mock_director_v2_service_retrieve_inputs: None, + rpc_client: RabbitMQRPCClient, + node_id: NodeID, +): + results = await services.retrieve_inputs( + rpc_client, node_id=node_id, port_keys=[], timeout_s=10 + ) + assert ( + results.model_dump(mode="python") + == RetrieveDataOutEnveloped.model_json_schema()["examples"][0] + ) + + @pytest.fixture def mock_director_v2_update_projects_networks(project_id: ProjectID) -> Iterator[None]: with respx.mock( diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py b/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py index 3c0cac4120e..ef25109628d 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py @@ -8,63 +8,16 @@ from aiohttp import web from models_library.projects import ProjectID -from models_library.services import ServicePortKey from pydantic import NonNegativeInt from servicelib.logging_utils import log_decorator from yarl import URL from ._core_base import DataType, request_director_v2 -from .exceptions import DirectorServiceError from .settings import DirectorV2Settings, get_plugin_settings _log = logging.getLogger(__name__) -# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191 -@log_decorator(logger=_log) -async def retrieve( - app: web.Application, service_uuid: str, port_keys: list[ServicePortKey] -) -> DataType: - """Pulls data from connections to the dynamic service inputs""" - settings: DirectorV2Settings = get_plugin_settings(app) - result = await request_director_v2( - app, - "POST", - url=settings.base_url / f"dynamic_services/{service_uuid}:retrieve", - data={"port_keys": port_keys}, - timeout=settings.get_service_retrieve_timeout(), - ) - assert isinstance(result, dict) # nosec - return result - - -# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191 -# notice that this function is identical to retrieve except that it does NOT raises -@log_decorator(logger=_log) -async def request_retrieve_dyn_service( - app: web.Application, service_uuid: str, port_keys: list[str] -) -> None: - settings: DirectorV2Settings = get_plugin_settings(app) - body = {"port_keys": port_keys} - - try: - await request_director_v2( - app, - "POST", - url=settings.base_url / f"dynamic_services/{service_uuid}:retrieve", - data=body, - timeout=settings.get_service_retrieve_timeout(), - ) - except DirectorServiceError as exc: - _log.warning( - "Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s:%s]", - service_uuid, - port_keys, - exc.status, - exc.reason, - ) - - @log_decorator(logger=_log) async def restart_dynamic_service(app: web.Application, node_uuid: str) -> None: """User restart the dynamic dynamic service started in the node_uuid diff --git a/services/web/server/src/simcore_service_webserver/director_v2/api.py b/services/web/server/src/simcore_service_webserver/director_v2/api.py index a4d9c351082..1eb7721b336 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/api.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/api.py @@ -16,12 +16,7 @@ is_pipeline_running, stop_pipeline, ) -from ._core_dynamic_services import ( - get_project_inactivity, - request_retrieve_dyn_service, - restart_dynamic_service, - retrieve, -) +from ._core_dynamic_services import get_project_inactivity, restart_dynamic_service from ._core_utils import is_healthy from .exceptions import DirectorServiceError @@ -37,9 +32,7 @@ "get_project_run_policy", "is_healthy", "is_pipeline_running", - "request_retrieve_dyn_service", "restart_dynamic_service", - "retrieve", "set_project_run_policy", "stop_pipeline", ) diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py index f34b90e1c2b..2a2c5b8c1da 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py @@ -3,7 +3,10 @@ from functools import partial from aiohttp import web -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -18,6 +21,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_messages import ProgressRabbitMessageProject, ProgressType +from models_library.services import ServicePortKey from models_library.users import UserID from pydantic.types import PositiveInt from servicelib.progress_bar import ProgressBarData @@ -150,6 +154,20 @@ async def stop_dynamic_services_in_project( await logged_gather(*services_to_stop) +async def retrieve_inputs( + app: web.Application, node_id: NodeID, port_keys: list[ServicePortKey] +) -> RetrieveDataOutEnveloped: + settings: DynamicSchedulerSettings = get_plugin_settings(app) + return await services.retrieve_inputs( + get_rabbitmq_rpc_client(app), + node_id=node_id, + port_keys=port_keys, + timeout_s=int( + settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT.total_seconds() + ), + ) + + async def update_projects_networks( app: web.Application, *, project_id: ProjectID ) -> None: diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py index 91dac1317b6..9d34fa378cc 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py @@ -26,6 +26,16 @@ class DynamicSchedulerSettings(BaseCustomSettings, MixinServiceSettings): ), ) + DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field( + datetime.timedelta(hours=1), + description=( + "When dynamic services upload and download data from storage, " + "sometimes very big payloads are involved. In order to handle " + "such payloads it is required to have long timeouts which " + "allow the service to finish the operation." + ), + ) + def get_plugin_settings(app: web.Application) -> DynamicSchedulerSettings: settings = app[APP_SETTINGS_KEY].WEBSERVER_DYNAMIC_SCHEDULER diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py index 7e4b35bab5d..4aaf99e3ba5 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py @@ -279,8 +279,8 @@ async def retrieve_node(request: web.Request) -> web.Response: retrieve = await parse_request_body_as(NodeRetrieve, request) return web.json_response( - await director_v2_api.retrieve( - request.app, f"{path_params.node_id}", retrieve.port_keys + await dynamic_scheduler_api.retrieve_inputs( + request.app, path_params.node_id, retrieve.port_keys ), dumps=json_dumps, ) diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index 80195446249..d4bec765a9e 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -1050,7 +1050,7 @@ async def patch_project_node( # 5. Updates project states for user, if inputs have been changed if "inputs" in _node_patch_exclude_unset: updated_project = await add_project_states_for_user( - user_id=user_id, project=updated_project, is_template=False, app=app + user_id=user_id, project=updated_project, is_template=False, app=app ) # 6. Notify project node update @@ -1132,6 +1132,20 @@ async def is_node_id_present_in_any_project_workbench( return await db.node_id_exists(node_id) +async def _safe_retrieve( + app: web.Application, node_id: NodeID, port_keys: list[str] +) -> None: + try: + await dynamic_scheduler_api.retrieve_inputs(app, node_id, port_keys) + except RPCServerError as exc: + log.warning( + "Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s]", + node_id, + port_keys, + exc, + ) + + async def _trigger_connected_service_retrieve( app: web.Application, project: dict, updated_node_uuid: str, changed_keys: list[str] ) -> None: @@ -1172,7 +1186,7 @@ async def _trigger_connected_service_retrieve( # call /retrieve on the nodes update_tasks = [ - director_v2_api.request_retrieve_dyn_service(app, node, keys) + _safe_retrieve(app, NodeID(node), keys) for node, keys in nodes_keys_to_update.items() ] await logged_gather(*update_tasks)