Skip to content

Commit

Permalink
♻️ Redirecting inputs retrieval via dynamic-scheduler ⚠️ (#6908)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Dec 17, 2024
1 parent 75aed81 commit acd677a
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
from typing import Final

from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
Expand All @@ -11,6 +14,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from pydantic import NonNegativeInt, TypeAdapter
from servicelib.logging_utils import log_decorator
Expand Down Expand Up @@ -95,6 +99,25 @@ async def stop_dynamic_service(
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def retrieve_inputs(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout_s: NonNegativeInt,
) -> RetrieveDataOutEnveloped:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_inputs"),
node_id=node_id,
port_keys=port_keys,
timeout_s=timeout_s,
)
assert isinstance(result, RetrieveDataOutEnveloped) # nosec
return result


@log_decorator(_logger, level=logging.DEBUG)
async def update_projects_networks(
rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID
Expand Down
9 changes: 5 additions & 4 deletions services/director/src/simcore_service_director/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ async def _get_node_details(


async def get_services_details(
app: FastAPI, user_id: str | None, study_id: str | None
app: FastAPI, user_id: str | None, project_id: str | None
) -> list[dict]:
app_settings = get_application_settings(app)
async with docker_utils.docker_client() as client: # pylint: disable=not-async-context-manager
Expand All @@ -1091,9 +1091,10 @@ async def get_services_details(
filters.append(
f"{_to_simcore_runtime_docker_label_key('user_id')}=" + user_id
)
if study_id:
if project_id:
filters.append(
f"{_to_simcore_runtime_docker_label_key('project_id')}=" + study_id
f"{_to_simcore_runtime_docker_label_key('project_id')}="
+ project_id
)
list_running_services = await client.services.list(
filters={"label": filters}
Expand All @@ -1104,7 +1105,7 @@ async def get_services_details(
for service in list_running_services
]
except aiodocker.DockerError as err:
msg = f"Error while accessing container for {user_id=}, {study_id=}"
msg = f"Error while accessing container for {user_id=}, {project_id=}"
raise GenericDockerError(err=msg) from err


Expand Down
1 change: 1 addition & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ services:
DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING}
DYNAMIC_SCHEDULER_TRACING: ${DYNAMIC_SCHEDULER_TRACING}
DYNAMIC_SCHEDULER_UI_STORAGE_SECRET: ${DYNAMIC_SCHEDULER_UI_STORAGE_SECRET}
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT}
TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}
TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT}
static-webserver:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from servicelib.rabbitmq import RPCRouter
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import (
Expand Down Expand Up @@ -58,6 +62,15 @@ async def stop_dynamic_service(
)


@router.expose()
async def retrieve_inputs(
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
) -> RetrieveDataOutEnveloped:
return await scheduler_interface.retrieve_inputs(
app, node_id=node_id, port_keys=port_keys
)


@router.expose()
async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
await scheduler_interface.update_projects_networks(app, project_id=project_id)
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,26 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):

DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: datetime.timedelta = Field(
default=datetime.timedelta(minutes=60),
validation_alias=AliasChoices(
"DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT",
"DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT",
),
description=(
"Time to wait before timing out when stopping a dynamic service. "
"Since services require data to be stopped, this operation is timed out after 1 hour"
),
)

DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field(
default=datetime.timedelta(minutes=60),
description=(
"When dynamic services upload and download data from storage, "
"sometimes very big payloads are involved. In order to handle "
"such payloads it is required to have long timeouts which "
"allow the service to finish the operation."
),
)

DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field(
default=False,
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
from typing import Any

from fastapi import FastAPI, status
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from pydantic import TypeAdapter
from servicelib.fastapi.app_state import SingletonInAppStateMixin
Expand Down Expand Up @@ -75,7 +79,7 @@ async def stop_dynamic_service(
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
timeout: datetime.timedelta, # noqa: ASYNC109
timeout: datetime.timedelta # noqa: ASYNC109
) -> None:
try:
await self.thin_client.delete_dynamic_service(
Expand All @@ -100,6 +104,19 @@ async def stop_dynamic_service(

raise

async def retrieve_inputs(
self,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout: datetime.timedelta # noqa: ASYNC109
) -> RetrieveDataOutEnveloped:
response = await self.thin_client.dynamic_service_retrieve(
node_id=node_id, port_keys=port_keys, timeout=timeout
)
dict_response: dict[str, Any] = response.json()
return TypeAdapter(RetrieveDataOutEnveloped).validate_python(dict_response)

async def list_tracked_dynamic_services(
self, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_resources import ServiceResourcesDictHelpers
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from servicelib.common_headers import (
X_DYNAMIC_SIDECAR_REQUEST_DNS,
Expand Down Expand Up @@ -91,7 +92,7 @@ async def delete_dynamic_service(
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
timeout: datetime.timedelta,
timeout: datetime.timedelta, # noqa: ASYNC109
) -> Response:
@retry_on_errors(total_retry_timeout_overwrite=timeout.total_seconds())
@expect_status(status.HTTP_204_NO_CONTENT)
Expand All @@ -112,6 +113,22 @@ async def _(

return await _(self)

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def dynamic_service_retrieve(
self,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout: datetime.timedelta, # noqa: ASYNC109
) -> Response:
post_data = {"port_keys": port_keys}
return await self.client.post(
f"/dynamic_services/{node_id}:retrieve",
content=json_dumps(post_data),
timeout=timeout.total_seconds(),
)

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def get_dynamic_services(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.users import UserID

from ..core.settings import ApplicationSettings
Expand Down Expand Up @@ -75,6 +79,21 @@ async def stop_dynamic_service(
await set_request_as_stopped(app, dynamic_service_stop)


async def retrieve_inputs(
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
) -> RetrieveDataOutEnveloped:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
return await director_v2_client.retrieve_inputs(
node_id=node_id,
port_keys=port_keys,
timeout=settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT,
)


async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from faker import Faker
from fastapi import FastAPI, status
from fastapi.encoders import jsonable_encoder
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
Expand Down Expand Up @@ -55,14 +58,14 @@ def node_not_found(faker: Faker) -> NodeID:
@pytest.fixture
def service_status_new_style() -> DynamicServiceGet:
return TypeAdapter(DynamicServiceGet).validate_python(
DynamicServiceGet.model_config["json_schema_extra"]["examples"][1]
DynamicServiceGet.model_json_schema()["examples"][1]
)


@pytest.fixture
def service_status_legacy() -> NodeGet:
return TypeAdapter(NodeGet).validate_python(
NodeGet.model_config["json_schema_extra"]["examples"][1]
NodeGet.model_json_schema()["examples"][1]
)


Expand Down Expand Up @@ -112,9 +115,7 @@ def mock_director_v2_service_state(
mock.get("/dynamic_services").respond(
status.HTTP_200_OK,
text=json.dumps(
jsonable_encoder(
DynamicServiceGet.model_config["json_schema_extra"]["examples"]
)
jsonable_encoder(DynamicServiceGet.model_json_schema()["examples"])
),
)

Expand Down Expand Up @@ -193,7 +194,7 @@ async def test_list_tracked_dynamic_services(rpc_client: RabbitMQRPCClient):
assert len(results) == 2
assert results == [
TypeAdapter(DynamicServiceGet).validate_python(x)
for x in DynamicServiceGet.model_config["json_schema_extra"]["examples"]
for x in DynamicServiceGet.model_json_schema()["examples"]
]


Expand Down Expand Up @@ -223,7 +224,7 @@ async def test_get_state(
def dynamic_service_start() -> DynamicServiceStart:
# one for legacy and one for new style?
return TypeAdapter(DynamicServiceStart).validate_python(
DynamicServiceStart.model_config["json_schema_extra"]["example"]
DynamicServiceStart.model_json_schema()["example"]
)


Expand Down Expand Up @@ -492,6 +493,41 @@ async def test_stop_dynamic_service_serializes_generic_errors(
)


@pytest.fixture
def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]:
with respx.mock(
base_url="http://director-v2:8000/v2",
assert_all_called=False,
assert_all_mocked=True, # IMPORTANT: KEEP always True!
) as mock:
request_ok = mock.post(f"/dynamic_services/{node_id}:retrieve")

request_ok.respond(
status.HTTP_200_OK,
text=TypeAdapter(RetrieveDataOutEnveloped)
.validate_python(
RetrieveDataOutEnveloped.model_json_schema()["examples"][0]
)
.model_dump_json(),
)

yield None


async def test_retrieve_inputs(
mock_director_v2_service_retrieve_inputs: None,
rpc_client: RabbitMQRPCClient,
node_id: NodeID,
):
results = await services.retrieve_inputs(
rpc_client, node_id=node_id, port_keys=[], timeout_s=10
)
assert (
results.model_dump(mode="python")
== RetrieveDataOutEnveloped.model_json_schema()["examples"][0]
)


@pytest.fixture
def mock_director_v2_update_projects_networks(project_id: ProjectID) -> Iterator[None]:
with respx.mock(
Expand Down
Loading

0 comments on commit acd677a

Please sign in to comment.