Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 expose service_run_id as an env var for both comp and new style dynamic services #6942

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
24ff119
run_id can now be requested as an env var on request by a service
Dec 11, 2024
df34a54
Merge branch 'master' into pr-osparc-allow-user-services-to-request-d…
GitHK Dec 11, 2024
47c4eb8
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 12, 2024
daccfdc
added context for run_id in comp tasks
Dec 13, 2024
8d9b796
Merge branch 'pr-osparc-allow-user-services-to-request-dy-sidecar-run…
Dec 13, 2024
d6b89a9
added fixture and refactor tests
Dec 13, 2024
4204990
fixed broken tests
Dec 13, 2024
ff3298c
moved fixture where it is used
Dec 13, 2024
2d1b08e
refactor with proper values
Dec 13, 2024
13d56de
refactor
Dec 13, 2024
dfdacbb
fixed test
Dec 13, 2024
486fb33
refactor to use new RunID
Dec 13, 2024
978f407
refactor types
Dec 13, 2024
0cb9f1f
fixed type
Dec 13, 2024
af0bed3
repalced
Dec 13, 2024
a72b6b5
fixed imports
Dec 13, 2024
98de753
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 13, 2024
b0d715c
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 16, 2024
5b67d46
renamed run_id to service_run_id and RunID to ServiceRunID
Dec 16, 2024
e9be2ee
rename
Dec 16, 2024
40ac24e
refactor
Dec 16, 2024
fe7ee55
fixed naming
Dec 16, 2024
2ded5fe
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 17, 2024
e726e78
fixed failing test
Dec 17, 2024
dc53e45
Merge remote-tracking branch 'upstream/master' into pr-osparc-allow-u…
Dec 19, 2024
f82d8b0
refactor
Dec 19, 2024
e652c83
fixed imports
Dec 19, 2024
1c44aec
fixed specs
Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@

from ..projects import ProjectID
from ..projects_nodes_io import NodeID
from ..resource_tracker import CreditTransactionStatus, ServiceRunId, ServiceRunStatus
from ..resource_tracker import CreditTransactionStatus, ServiceRunStatus
from ..services import ServiceKey, ServiceVersion
from ..services_types import RunID
from ..users import UserID
from ..wallets import WalletID


class ServiceRunGet(BaseModel):
service_run_id: ServiceRunId
service_run_id: RunID
wallet_id: WalletID | None
wallet_name: str | None
user_id: UserID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
PricingPlanId,
PricingUnitCostUpdate,
PricingUnitId,
ServiceRunId,
ServiceRunStatus,
SpecificInfo,
UnitExtraInfo,
)
from ..services import ServiceKey, ServiceVersion
from ..services_types import RunID
from ..users import UserID
from ..wallets import WalletID
from ._base import InputSchema, OutputSchema
Expand All @@ -27,7 +27,7 @@
class ServiceRunGet(
BaseModel
): # NOTE: this is already in use so I didnt modidy inheritance from OutputSchema
service_run_id: ServiceRunId
service_run_id: RunID
wallet_id: WalletID | None
wallet_name: str | None
user_id: UserID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .projects_state import RunningState
from .services import ServiceKey, ServiceType, ServiceVersion
from .services_resources import ServiceResourcesDict
from .services_types import RunID
from .users import UserID
from .utils.enums import StrAutoEnum
from .wallets import WalletID
Expand Down Expand Up @@ -178,7 +179,7 @@ class RabbitResourceTrackingMessageType(StrAutoEnum):
class RabbitResourceTrackingBaseMessage(RabbitMessageBase):
channel_name: Literal["io.simcore.service.tracking"] = "io.simcore.service.tracking"

service_run_id: str = Field(
service_run_id: RunID = Field(
..., description="uniquely identitifies the service run"
)
created_at: datetime.datetime = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

_logger = logging.getLogger(__name__)

ServiceRunId: TypeAlias = str
PricingPlanId: TypeAlias = PositiveInt
PricingUnitId: TypeAlias = PositiveInt
PricingUnitCostId: TypeAlias = PositiveInt
Expand Down
30 changes: 27 additions & 3 deletions packages/models-library/src/models_library/services_types.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
from typing import Annotated, Any, TypeAlias
from typing import TYPE_CHECKING, Annotated, Any, Self, TypeAlias
from uuid import uuid4

import arrow
from pydantic import GetCoreSchemaHandler, StringConstraints, ValidationInfo
from pydantic import (
GetCoreSchemaHandler,
PositiveInt,
StringConstraints,
ValidationInfo,
)
from pydantic_core import CoreSchema, core_schema

from .basic_regex import PROPERTY_KEY_RE, SIMPLE_VERSION_RE
from .projects_nodes_io import NodeID
from .services_regex import (
COMPUTATIONAL_SERVICE_KEY_RE,
DYNAMIC_SERVICE_KEY_RE,
FILENAME_RE,
SERVICE_ENCODED_KEY_RE,
SERVICE_KEY_RE,
)
from .users import UserID

if TYPE_CHECKING:
from .projects import ProjectID

GitHK marked this conversation as resolved.
Show resolved Hide resolved
ServicePortKey: TypeAlias = Annotated[str, StringConstraints(pattern=PROPERTY_KEY_RE)]

Expand Down Expand Up @@ -44,12 +54,15 @@ class RunID(str):
and old volumes for different runs.
Avoids overwriting data that left dropped on the node (due to an error)
and gives the osparc-agent an opportunity to back it up.
The resource-usage-tracker tracker uses these RunIDs to keep track of
resource usage from comp and dynamic services.
"""
GitHK marked this conversation as resolved.
Show resolved Hide resolved

__slots__ = ()

@classmethod
def create(cls) -> "RunID":
def create_for_dynamic_sidecar(cls) -> Self:
"""used for dynamic services"""
# NOTE: there was a legacy version of this RunID
# legacy version:
# '0ac3ed64-665b-42d2-95f7-e59e0db34242'
Expand All @@ -59,6 +72,17 @@ def create(cls) -> "RunID":
run_id_format = f"{utc_int_timestamp}_{uuid4()}"
return cls(run_id_format)

@classmethod
def get_resource_tracking_run_id(
cls,
GitHK marked this conversation as resolved.
Show resolved Hide resolved
user_id: UserID,
project_id: "ProjectID",
node_id: NodeID,
iteration: PositiveInt,
) -> Self:
"""used by computational services"""
return cls(f"comp_{user_id}_{project_id}_{node_id}_{iteration}")

@classmethod
def __get_pydantic_core_schema__(
cls,
Expand Down
36 changes: 36 additions & 0 deletions packages/models-library/tests/test_services_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pytest
from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
from models_library.services_types import RunID
from models_library.users import UserID
from pydantic import PositiveInt


@pytest.mark.parametrize(
"user_id, project_id, node_id, iteration, expected_result",
[
(
2,
ProjectID("e08356e4-eb74-49e9-b769-2c26e34c61d9"),
NodeID("a08356e4-eb74-49e9-b769-2c26e34c61d1"),
5,
"comp_2_e08356e4-eb74-49e9-b769-2c26e34c61d9_a08356e4-eb74-49e9-b769-2c26e34c61d1_5",
)
],
)
def test_run_id_get_resource_tracking_run_id(
user_id: UserID,
project_id: ProjectID,
node_id: NodeID,
iteration: PositiveInt,
expected_result: str,
):
resource_tracking_run_id = RunID.get_resource_tracking_run_id(
user_id, project_id, node_id, iteration
)
assert isinstance(resource_tracking_run_id, RunID)
assert resource_tracking_run_id == expected_result


def test_run_id_create_for_dynamic_sidecar():
assert isinstance(RunID.create_for_dynamic_sidecar(), RunID)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from models_library.licensed_items import LicensedItemID
from models_library.products import ProductName
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.resource_tracker import ServiceRunId
from models_library.services_types import RunID
from models_library.users import UserID
from models_library.wallets import WalletID
from pydantic import TypeAdapter
Expand Down Expand Up @@ -65,7 +65,7 @@ async def checkout_licensed_item_for_wallet(
wallet_id: WalletID,
licensed_item_id: LicensedItemID,
num_of_seats: int,
service_run_id: ServiceRunId,
service_run_id: RunID,
) -> None:
result = await rabbitmq_rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
Expand All @@ -89,7 +89,7 @@ async def release_licensed_item_for_wallet(
wallet_id: WalletID,
licensed_item_id: LicensedItemID,
num_of_seats: int,
service_run_id: ServiceRunId,
service_run_id: RunID,
) -> None:
result = await rabbitmq_rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
Expand Down
2 changes: 1 addition & 1 deletion services/agent/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_client(initialized_app: FastAPI) -> TestClient:

@pytest.fixture
def run_id() -> RunID:
return RunID.create()
return RunID.create_for_dynamic_sidecar()


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ class SchedulerData(CommonServiceDetails, DynamicSidecarServiceLabels):
description="Name of the current dynamic-sidecar being observed",
)
run_id: RunID = Field(
default_factory=RunID.create,
default_factory=RunID.create_for_dynamic_sidecar,
description=(
"Uniquely identify the dynamic sidecar session (a.k.a. 2 "
"subsequent exact same services will have a different run_id)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from models_library.projects_nodes_io import NodeID, NodeIDStr
from models_library.projects_state import RunningState
from models_library.services import ServiceType
from models_library.services_types import RunID
GitHK marked this conversation as resolved.
Show resolved Hide resolved
from models_library.users import UserID
from networkx.classes.reportviews import InDegreeView
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
Expand Down Expand Up @@ -66,7 +67,6 @@
TASK_TO_START_STATES,
WAITING_FOR_START_STATES,
create_service_resources_from_task,
get_resource_tracking_run_id,
)

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -295,7 +295,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
*(
publish_service_resource_tracking_heartbeat(
self.rabbitmq_client,
get_resource_tracking_run_id(
RunID.get_resource_tracking_run_id(
GitHK marked this conversation as resolved.
Show resolved Hide resolved
user_id, t.project_id, t.node_id, iteration
),
)
Expand Down Expand Up @@ -348,7 +348,7 @@ async def _process_started_tasks(
*(
publish_service_resource_tracking_started(
self.rabbitmq_client,
service_run_id=get_resource_tracking_run_id(
service_run_id=RunID.get_resource_tracking_run_id(
GitHK marked this conversation as resolved.
Show resolved Hide resolved
user_id, t.project_id, t.node_id, iteration
),
wallet_id=run_metadata.get("wallet_id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from models_library.projects_nodes_io import NodeID
from models_library.projects_state import RunningState
from models_library.rabbitmq_messages import SimcorePlatformStatus
from models_library.services_types import RunID
GitHK marked this conversation as resolved.
Show resolved Hide resolved
from models_library.users import UserID
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
from servicelib.logging_utils import log_catch
Expand Down Expand Up @@ -48,7 +49,6 @@
from ..db.repositories.comp_runs import CompRunsRepository
from ..db.repositories.comp_tasks import CompTasksRepository
from ._scheduler_base import BaseCompScheduler
from ._utils import get_resource_tracking_run_id

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -129,6 +129,9 @@ async def _start_tasks(
hardware_info=task.hardware_info,
callback=wake_up_callback,
metadata=comp_run.metadata,
resource_tracking_run_id=RunID.get_resource_tracking_run_id(
GitHK marked this conversation as resolved.
Show resolved Hide resolved
user_id, project_id, node_id, comp_run.iteration
),
)
for node_id, task in scheduled_tasks.items()
),
Expand Down Expand Up @@ -319,7 +322,9 @@ async def _process_task_result(
# resource tracking
await publish_service_resource_tracking_stopped(
self.rabbitmq_client,
get_resource_tracking_run_id(user_id, project_id, node_id, iteration),
RunID.get_resource_tracking_run_id(
GitHK marked this conversation as resolved.
Show resolved Hide resolved
user_id, project_id, node_id, iteration
),
simcore_platform_status=simcore_platform_status,
)
# instrumentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@

from fastapi import FastAPI
from models_library.docker import DockerGenericTag
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.projects_state import RunningState
from models_library.services_resources import (
ResourceValue,
ServiceResourcesDict,
ServiceResourcesDictHelpers,
)
from models_library.users import UserID
from servicelib.redis import RedisClientSDK
from settings_library.redis import RedisDatabase

from ...models.comp_runs import Iteration
from ...models.comp_tasks import CompTaskAtDB
from ..redis import get_redis_client_manager

Expand Down Expand Up @@ -55,12 +51,6 @@
}


def get_resource_tracking_run_id(
user_id: UserID, project_id: ProjectID, node_id: NodeID, iteration: Iteration
) -> str:
return f"comp_{user_id}_{project_id}_{node_id}_{iteration}"


def create_service_resources_from_task(task: CompTaskAtDB) -> ServiceResourcesDict:
assert task.image.node_requirements # nosec
return ServiceResourcesDictHelpers.create_from_single_service(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.resource_tracker import HardwareInfo
from models_library.services import RunID
GitHK marked this conversation as resolved.
Show resolved Hide resolved
from models_library.users import UserID
from pydantic import TypeAdapter, ValidationError
from pydantic.networks import AnyUrl
Expand Down Expand Up @@ -293,6 +294,7 @@ async def send_computation_tasks(
remote_fct: ContainerRemoteFct | None = None,
metadata: RunMetadataDict,
hardware_info: HardwareInfo,
resource_tracking_run_id: RunID,
) -> list[PublishedComputationTask]:
"""actually sends the function remote_fct to be remotely executed. if None is kept then the default
function that runs container will be started.
Expand Down Expand Up @@ -396,6 +398,7 @@ async def send_computation_tasks(
node_id=node_id,
node_image=node_image,
metadata=metadata,
resource_tracking_run_id=resource_tracking_run_id,
)
task_owner = dask_utils.compute_task_owner(
user_id, project_id, node_id, metadata.get("project_metadata", {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ResourceValue,
ServiceResourcesDict,
)
from models_library.services_types import RunID
from models_library.users import UserID
from models_library.utils.docker_compose import replace_env_vars_in_compose_spec
from pydantic import ByteSize
Expand Down Expand Up @@ -278,6 +279,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913
node_id: NodeID,
simcore_user_agent: str,
swarm_stack_name: str,
run_id: RunID,
) -> str:
"""
returns a docker-compose spec used by
Expand Down Expand Up @@ -350,6 +352,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913
product_name=product_name,
project_id=project_id,
node_id=node_id,
run_id=run_id,
)

add_egress_configuration(
Expand Down Expand Up @@ -388,6 +391,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913
product_name=product_name,
project_id=project_id,
node_id=node_id,
run_id=run_id,
)

stringified_service_spec: str = replace_env_vars_in_compose_spec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:

# Each time a new dynamic-sidecar service is created
# generate a new `run_id` to avoid resource collisions
scheduler_data.run_id = RunID.create()
scheduler_data.run_id = RunID.create_for_dynamic_sidecar()

rpc_client: RabbitMQRPCClient = app.state.rabbitmq_rpc_client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ async def submit_compose_sepc(app: FastAPI, scheduler_data: SchedulerData) -> No
node_id=scheduler_data.node_uuid,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME,
run_id=scheduler_data.run_id,
)

_logger.debug(
Expand Down
Loading
Loading