Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 expose service_run_id as an env var for both comp and new style dynamic services #6942

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
24ff119
run_id can now be requested as an env var on request by a service
Dec 11, 2024
df34a54
Merge branch 'master' into pr-osparc-allow-user-services-to-request-d…
GitHK Dec 11, 2024
47c4eb8
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 12, 2024
daccfdc
added context for run_id in comp tasks
Dec 13, 2024
8d9b796
Merge branch 'pr-osparc-allow-user-services-to-request-dy-sidecar-run…
Dec 13, 2024
d6b89a9
added fixture and refactor tests
Dec 13, 2024
4204990
fixed broken tests
Dec 13, 2024
ff3298c
moved fixture where it is used
Dec 13, 2024
2d1b08e
refactor with proper values
Dec 13, 2024
13d56de
refactor
Dec 13, 2024
dfdacbb
fixed test
Dec 13, 2024
486fb33
refactor to use new RunID
Dec 13, 2024
978f407
refactor types
Dec 13, 2024
0cb9f1f
fixed type
Dec 13, 2024
af0bed3
repalced
Dec 13, 2024
a72b6b5
fixed imports
Dec 13, 2024
98de753
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 13, 2024
b0d715c
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 16, 2024
5b67d46
renamed run_id to service_run_id and RunID to ServiceRunID
Dec 16, 2024
e9be2ee
rename
Dec 16, 2024
40ac24e
refactor
Dec 16, 2024
fe7ee55
fixed naming
Dec 16, 2024
2ded5fe
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 17, 2024
e726e78
fixed failing test
Dec 17, 2024
dc53e45
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 19, 2024
f82d8b0
refactor
Dec 19, 2024
e652c83
fixed imports
Dec 19, 2024
1c44aec
fixed specs
Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ async def _start_tasks(
hardware_info=task.hardware_info,
callback=wake_up_callback,
metadata=comp_run.metadata,
run_id=comp_run.run_id,
GitHK marked this conversation as resolved.
Show resolved Hide resolved
)
for node_id, task in scheduled_tasks.items()
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from models_library.projects_nodes_io import NodeID
from models_library.resource_tracker import HardwareInfo
from models_library.users import UserID
from pydantic import TypeAdapter, ValidationError
from pydantic import PositiveInt, TypeAdapter, ValidationError
from pydantic.networks import AnyUrl
from servicelib.logging_utils import log_catch
from settings_library.s3 import S3Settings
Expand Down Expand Up @@ -293,6 +293,7 @@ async def send_computation_tasks(
remote_fct: ContainerRemoteFct | None = None,
metadata: RunMetadataDict,
hardware_info: HardwareInfo,
run_id: PositiveInt,
GitHK marked this conversation as resolved.
Show resolved Hide resolved
) -> list[PublishedComputationTask]:
"""actually sends the function remote_fct to be remotely executed. if None is kept then the default
function that runs container will be started.
Expand Down Expand Up @@ -396,6 +397,7 @@ async def send_computation_tasks(
node_id=node_id,
node_image=node_image,
metadata=metadata,
run_id=run_id,
)
task_owner = dask_utils.compute_task_owner(
user_id, project_id, node_id, metadata.get("project_metadata", {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ResourceValue,
ServiceResourcesDict,
)
from models_library.services_types import RunID
from models_library.users import UserID
from models_library.utils.docker_compose import replace_env_vars_in_compose_spec
from pydantic import ByteSize
Expand Down Expand Up @@ -278,6 +279,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913
node_id: NodeID,
simcore_user_agent: str,
swarm_stack_name: str,
run_id: RunID,
) -> str:
"""
returns a docker-compose spec used by
Expand Down Expand Up @@ -350,6 +352,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913
product_name=product_name,
project_id=project_id,
node_id=node_id,
run_id=run_id,
)

add_egress_configuration(
Expand Down Expand Up @@ -388,6 +391,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913
product_name=product_name,
project_id=project_id,
node_id=node_id,
run_id=run_id,
)

stringified_service_spec: str = replace_env_vars_in_compose_spec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ async def submit_compose_sepc(app: FastAPI, scheduler_data: SchedulerData) -> No
node_id=scheduler_data.node_uuid,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME,
run_id=scheduler_data.run_id,
)

_logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
from models_library.projects_nodes_io import NodeID
from models_library.service_settings_labels import ComposeSpecLabelDict
from models_library.services import ServiceKey, ServiceVersion
from models_library.services_types import RunID
from models_library.users import UserID
from models_library.utils.specs_substitution import SpecsSubstitutionsResolver
from pydantic import BaseModel
from pydantic import BaseModel, PositiveInt
from servicelib.fastapi.app_state import SingletonInAppStateMixin
from servicelib.logging_utils import log_context

Expand Down Expand Up @@ -120,6 +121,7 @@ def create(cls, app: FastAPI):
("OSPARC_VARIABLE_NODE_ID", "node_id"),
("OSPARC_VARIABLE_PRODUCT_NAME", "product_name"),
("OSPARC_VARIABLE_STUDY_UUID", "project_id"),
("OSPARC_VARIABLE_RUN_ID", "run_id"),
("OSPARC_VARIABLE_USER_ID", "user_id"),
("OSPARC_VARIABLE_API_HOST", "api_server_base_url"),
]:
Expand Down Expand Up @@ -181,6 +183,7 @@ async def resolve_and_substitute_session_variables_in_model(
product_name: str,
project_id: ProjectID,
node_id: NodeID,
run_id: RunID,
) -> TBaseModel:
result: TBaseModel = model
try:
Expand All @@ -200,6 +203,7 @@ async def resolve_and_substitute_session_variables_in_model(
product_name=product_name,
project_id=project_id,
node_id=node_id,
run_id=run_id,
api_server_base_url=app.state.settings.DIRECTOR_V2_PUBLIC_API_BASE_URL,
),
)
Expand All @@ -221,6 +225,7 @@ async def resolve_and_substitute_session_variables_in_specs(
product_name: str,
project_id: ProjectID,
node_id: NodeID,
run_id: RunID | PositiveInt,
) -> dict[str, Any]:
table = OsparcSessionVariablesTable.get_from_app_state(app)
resolver = SpecsSubstitutionsResolver(specs, upgrade=False)
Expand All @@ -241,6 +246,7 @@ async def resolve_and_substitute_session_variables_in_specs(
product_name=product_name,
project_id=project_id,
node_id=node_id,
run_id=f"{run_id}",
api_server_base_url=app.state.settings.DIRECTOR_V2_PUBLIC_API_BASE_URL,
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from models_library.projects_nodes_io import NodeID, NodeIDStr
from models_library.services import ServiceKey, ServiceVersion
from models_library.users import UserID
from pydantic import AnyUrl, ByteSize, TypeAdapter, ValidationError
from pydantic import AnyUrl, ByteSize, PositiveInt, TypeAdapter, ValidationError
from servicelib.logging_utils import log_catch, log_context
from simcore_sdk import node_ports_v2
from simcore_sdk.node_ports_common.exceptions import (
Expand Down Expand Up @@ -342,6 +342,7 @@ async def compute_task_envs(
node_id: NodeID,
node_image: Image,
metadata: RunMetadataDict,
run_id: PositiveInt,
) -> ContainerEnvsDict:
product_name = metadata.get("product_name", UNDEFINED_DOCKER_LABEL)
task_envs = node_image.envs
Expand All @@ -360,6 +361,7 @@ async def compute_task_envs(
product_name=product_name,
project_id=project_id,
node_id=node_id,
run_id=run_id,
)
# NOTE: see https://github.com/ITISFoundation/osparc-simcore/issues/3638
# we currently do not validate as we are using illegal docker key names with underscores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def _get_or_raise(context: ContextDict) -> Any:
try:
return context[parameter_name]
except KeyError as err:
msg = "Parameter {keyname} missing from substitution context"
msg = f"{parameter_name=} missing from substitution context"
raise CaptureError(msg) from err

# For context["foo"] -> return operator.methodcaller("__getitem__", keyname)
Expand Down
7 changes: 6 additions & 1 deletion services/director-v2/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from models_library.services import RunID, ServiceKey, ServiceKeyVersion, ServiceVersion
from models_library.services_enums import ServiceState
from models_library.utils._original_fastapi_encoders import jsonable_encoder
from pydantic import TypeAdapter
from pydantic import PositiveInt, TypeAdapter
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from settings_library.s3 import S3Settings
Expand Down Expand Up @@ -338,3 +338,8 @@ def mock_docker_api(mocker: MockerFixture) -> None:
async def async_docker_client() -> AsyncIterable[aiodocker.Docker]:
async with aiodocker.Docker() as docker_client:
yield docker_client


@pytest.fixture
def comp_task_run_id() -> PositiveInt:
return 42
GitHK marked this conversation as resolved.
Show resolved Hide resolved
26 changes: 25 additions & 1 deletion services/director-v2/tests/unit/test_modules_dask_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from models_library.projects_nodes_io import NodeID
from models_library.resource_tracker import HardwareInfo
from models_library.users import UserID
from pydantic import AnyUrl, ByteSize, TypeAdapter
from pydantic import AnyUrl, ByteSize, PositiveInt, TypeAdapter
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from settings_library.s3 import S3Settings
Expand Down Expand Up @@ -442,6 +442,7 @@ async def test_send_computation_task(
task_labels: ContainerLabelsDict,
empty_hardware_info: HardwareInfo,
faker: Faker,
comp_task_run_id: PositiveInt,
):
_DASK_EVENT_NAME = faker.pystr()

Expand Down Expand Up @@ -503,6 +504,7 @@ def fake_sidecar_fct(
),
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
assert node_id_to_job_ids
assert len(node_id_to_job_ids) == 1
Expand Down Expand Up @@ -559,6 +561,7 @@ async def test_computation_task_is_persisted_on_dask_scheduler(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
"""rationale:
When a task is submitted to the dask backend, a dask future is returned.
Expand Down Expand Up @@ -594,6 +597,7 @@ def fake_sidecar_fct(
remote_fct=fake_sidecar_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -649,6 +653,7 @@ async def test_abort_computation_tasks(
faker: Faker,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
_DASK_EVENT_NAME = faker.pystr()

Expand Down Expand Up @@ -687,6 +692,7 @@ def fake_remote_fct(
remote_fct=fake_remote_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -738,6 +744,7 @@ async def test_failed_task_returns_exceptions(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
# NOTE: this must be inlined so that the test works,
# the dask-worker must be able to import the function
Expand All @@ -758,6 +765,7 @@ def fake_failing_sidecar_fct(
remote_fct=fake_failing_sidecar_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -800,6 +808,7 @@ async def test_send_computation_task_with_missing_resources_raises(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
# remove the workers that can handle gpu
scheduler_info = dask_client.backend.client.scheduler_info()
Expand All @@ -826,6 +835,7 @@ async def test_send_computation_task_with_missing_resources_raises(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
mocked_user_completed_cb.assert_not_called()

Expand All @@ -844,6 +854,7 @@ async def test_send_computation_task_with_hardware_info_raises(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
# NOTE: running on the default cluster will raise missing resources
with pytest.raises(MissingComputationalResourcesError):
Expand All @@ -855,6 +866,7 @@ async def test_send_computation_task_with_hardware_info_raises(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=hardware_info,
run_id=comp_task_run_id,
)
mocked_user_completed_cb.assert_not_called()

Expand All @@ -872,6 +884,7 @@ async def test_too_many_resources_send_computation_task(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
# create an image that needs a huge amount of CPU
image = Image(
Expand All @@ -895,6 +908,7 @@ async def test_too_many_resources_send_computation_task(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)

mocked_user_completed_cb.assert_not_called()
Expand All @@ -911,6 +925,7 @@ async def test_disconnected_backend_raises_exception(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
# DISCONNECT THE CLUSTER
await dask_spec_local_cluster.close() # type: ignore
Expand All @@ -923,6 +938,7 @@ async def test_disconnected_backend_raises_exception(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
mocked_user_completed_cb.assert_not_called()

Expand All @@ -942,6 +958,7 @@ async def test_changed_scheduler_raises_exception(
unused_tcp_port_factory: Callable,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
# change the scheduler (stop the current one and start another at the same address)
scheduler_address = URL(dask_spec_local_cluster.scheduler_address)
Expand Down Expand Up @@ -971,6 +988,7 @@ async def test_changed_scheduler_raises_exception(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
mocked_user_completed_cb.assert_not_called()

Expand All @@ -988,6 +1006,7 @@ async def test_get_tasks_status(
fail_remote_fct: bool,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
# NOTE: this must be inlined so that the test works,
# the dask-worker must be able to import the function
Expand Down Expand Up @@ -1015,6 +1034,7 @@ def fake_remote_fct(
remote_fct=fake_remote_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -1069,6 +1089,7 @@ async def test_dask_sub_handlers(
fake_task_handlers: TaskHandlers,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
):
dask_client.register_handlers(fake_task_handlers)
_DASK_START_EVENT = "start"
Expand Down Expand Up @@ -1098,6 +1119,7 @@ def fake_remote_fct(
remote_fct=fake_remote_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -1142,6 +1164,7 @@ async def test_get_cluster_details(
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
faker: Faker,
comp_task_run_id: PositiveInt,
):
cluster_details = await dask_client.get_cluster_details()
assert cluster_details
Expand Down Expand Up @@ -1178,6 +1201,7 @@ def fake_sidecar_fct(
),
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down
Loading
Loading