From d75b0a377896ca9365fd7838cd80114eb286e53d Mon Sep 17 00:00:00 2001 From: Matus Drobuliak <60785969+matusdrobuliak66@users.noreply.github.com> Date: Thu, 19 Dec 2024 13:11:15 +0100 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=8E=A8=20release=20license=20seats=20?= =?UTF-8?q?on=20issues=20(#6980)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ackground_task_periodic_heartbeat_check.py | 15 +- .../modules/db/licensed_items_checkouts_db.py | 43 ++++++ .../process_message_running_service.py | 15 +- .../test_licensed_items_checkouts_db.py | 139 ++++++++++++++++++ 4 files changed, 207 insertions(+), 5 deletions(-) create mode 100644 services/resource-usage-tracker/tests/unit/with_dbs/test_licensed_items_checkouts_db.py diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py index 70abcb8f5a6..98a18522e9e 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py @@ -1,6 +1,6 @@ import asyncio import logging -from datetime import datetime, timedelta, timezone +from datetime import UTC, datetime, timedelta from fastapi import FastAPI from models_library.resource_tracker import ( @@ -15,7 +15,11 @@ from ..core.settings import ApplicationSettings from ..models.credit_transactions import CreditTransactionCreditsAndStatusUpdate from ..models.service_runs import ServiceRunStoppedAtUpdate -from .modules.db import credit_transactions_db, service_runs_db +from .modules.db import ( + credit_transactions_db, + licensed_items_checkouts_db, + service_runs_db, +) from .utils import compute_service_run_credit_costs, make_negative _logger = logging.getLogger(__name__) @@ -116,6 +120,11 @@ async def _close_unhealthy_service( db_engine, data=update_credit_transaction ) + # 3. Release license seats in case some were checked out but not properly released. + await licensed_items_checkouts_db.force_release_license_seats_by_run_id( + db_engine, service_run_id=service_run_id + ) + async def periodic_check_of_running_services_task(app: FastAPI) -> None: _logger.info("Periodic check started") @@ -124,7 +133,7 @@ async def periodic_check_of_running_services_task(app: FastAPI) -> None: app_settings: ApplicationSettings = app.state.settings _db_engine = app.state.engine - base_start_timestamp = datetime.now(tz=timezone.utc) + base_start_timestamp = datetime.now(tz=UTC) # Get all current running services (across all products) total_count: PositiveInt = await service_runs_db.total_service_runs_with_running_status_across_all_products( diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/licensed_items_checkouts_db.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/licensed_items_checkouts_db.py index 2402a8c52be..5035a637199 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/licensed_items_checkouts_db.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/licensed_items_checkouts_db.py @@ -1,3 +1,4 @@ +import logging from datetime import datetime from typing import cast @@ -8,6 +9,7 @@ LicensedItemCheckoutID, ) from models_library.rest_ordering import OrderBy, OrderDirection +from models_library.services_types import ServiceRunID from models_library.wallets import WalletID from pydantic import NonNegativeInt from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker.errors import ( @@ -27,6 +29,9 @@ LicensedItemCheckoutDB, ) +_logger = logging.getLogger(__name__) + + _SELECTION_ARGS = ( resource_tracker_licensed_items_checkouts.c.licensed_item_checkout_id, resource_tracker_licensed_items_checkouts.c.licensed_item_id, @@ -214,3 +219,41 @@ async def get_currently_used_seats_for_item_and_wallet( if total_sum is None: return 0 return cast(int, total_sum) + + +async def force_release_license_seats_by_run_id( + engine: AsyncEngine, + connection: AsyncConnection | None = None, + *, + service_run_id: ServiceRunID, +) -> None: + """ + Purpose: This function is utilized by a periodic heartbeat check task that monitors whether running services are + sending heartbeat signals. If heartbeat signals are not received within a specified timeframe and a service is + deemed unhealthy, this function ensures the proper release of any licensed seats that were not correctly released by + the unhealthy service. + Currently, this functionality is primarily used to handle the release of a single seat allocated to the VIP model. + """ + update_stmt = ( + resource_tracker_licensed_items_checkouts.update() + .values( + modified=sa.func.now(), + stopped_at=sa.func.now(), + ) + .where( + ( + resource_tracker_licensed_items_checkouts.c.service_run_id + == service_run_id + ) + & (resource_tracker_licensed_items_checkouts.c.stopped_at.is_(None)) + ) + .returning(sa.literal_column("*")) + ) + + async with transaction_context(engine, connection) as conn: + result = await conn.execute(update_stmt) + released_seats = result.fetchall() + if released_seats: + _logger.error( + "Force release of %s seats: %s", len(released_seats), released_seats + ) diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py index e9234f65435..88553f51705 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py @@ -33,7 +33,12 @@ ServiceRunLastHeartbeatUpdate, ServiceRunStoppedAtUpdate, ) -from .modules.db import credit_transactions_db, pricing_plans_db, service_runs_db +from .modules.db import ( + credit_transactions_db, + licensed_items_checkouts_db, + pricing_plans_db, + service_runs_db, +) from .modules.rabbitmq import RabbitMQClient, get_rabbitmq_client from .utils import ( compute_service_run_credit_costs, @@ -269,9 +274,15 @@ async def _process_stop_event( running_service = await service_runs_db.update_service_run_stopped_at( db_engine, data=update_service_run_stopped_at ) + await licensed_items_checkouts_db.force_release_license_seats_by_run_id( + db_engine, service_run_id=msg.service_run_id + ) if running_service is None: - _logger.error("Nothing to update. This should not happen investigate.") + _logger.error( + "Nothing to update. This should not happen investigate. service_run_id: %s", + msg.service_run_id, + ) return if running_service.wallet_id and running_service.pricing_unit_cost is not None: diff --git a/services/resource-usage-tracker/tests/unit/with_dbs/test_licensed_items_checkouts_db.py b/services/resource-usage-tracker/tests/unit/with_dbs/test_licensed_items_checkouts_db.py new file mode 100644 index 00000000000..5f0fc5a1f5b --- /dev/null +++ b/services/resource-usage-tracker/tests/unit/with_dbs/test_licensed_items_checkouts_db.py @@ -0,0 +1,139 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name +# pylint:disable=too-many-arguments + + +from datetime import UTC, datetime +from typing import Generator +from unittest import mock + +import pytest +import sqlalchemy as sa +from models_library.basic_types import IDStr +from models_library.rest_ordering import OrderBy +from simcore_postgres_database.models.resource_tracker_licensed_items_checkouts import ( + resource_tracker_licensed_items_checkouts, +) +from simcore_postgres_database.models.resource_tracker_service_runs import ( + resource_tracker_service_runs, +) +from simcore_service_resource_usage_tracker.models.licensed_items_checkouts import ( + CreateLicensedItemCheckoutDB, +) +from simcore_service_resource_usage_tracker.services.modules.db import ( + licensed_items_checkouts_db, +) + +pytest_simcore_core_services_selection = [ + "postgres", +] +pytest_simcore_ops_services_selection = [ + "adminer", +] + + +_USER_ID_1 = 1 +_WALLET_ID = 6 + + +@pytest.fixture() +def resource_tracker_service_run_id( + postgres_db: sa.engine.Engine, random_resource_tracker_service_run +) -> Generator[str, None, None]: + with postgres_db.connect() as con: + result = con.execute( + resource_tracker_service_runs.insert() + .values( + **random_resource_tracker_service_run( + user_id=_USER_ID_1, wallet_id=_WALLET_ID + ) + ) + .returning(resource_tracker_service_runs.c.service_run_id) + ) + row = result.first() + assert row + + yield row[0] + + con.execute(resource_tracker_licensed_items_checkouts.delete()) + con.execute(resource_tracker_service_runs.delete()) + + +async def test_licensed_items_checkouts_db__force_release_license_seats_by_run_id( + mocked_redis_server: None, + mocked_setup_rabbitmq: mock.Mock, + resource_tracker_service_run_id, + initialized_app, +): + engine = initialized_app.state.engine + + # SETUP + _create_license_item_checkout_db_1 = CreateLicensedItemCheckoutDB( + licensed_item_id="beb16d18-d57d-44aa-a638-9727fa4a72ef", + wallet_id=_WALLET_ID, + user_id=_USER_ID_1, + user_email="test@test.com", + product_name="osparc", + service_run_id=resource_tracker_service_run_id, + started_at=datetime.now(tz=UTC), + num_of_seats=1, + ) + await licensed_items_checkouts_db.create( + engine, data=_create_license_item_checkout_db_1 + ) + + _create_license_item_checkout_db_2 = _create_license_item_checkout_db_1.model_dump() + _create_license_item_checkout_db_2[ + "licensed_item_id" + ] = "b1b96583-333f-44d6-b1e0-5c0a8af555bf" + await licensed_items_checkouts_db.create( + engine, + data=CreateLicensedItemCheckoutDB.model_construct( + **_create_license_item_checkout_db_2 + ), + ) + + _create_license_item_checkout_db_3 = _create_license_item_checkout_db_1.model_dump() + _create_license_item_checkout_db_3[ + "licensed_item_id" + ] = "38a5ce59-876f-482a-ace1-d3b2636feac6" + checkout = await licensed_items_checkouts_db.create( + engine, + data=CreateLicensedItemCheckoutDB.model_construct( + **_create_license_item_checkout_db_3 + ), + ) + + _helper_time = datetime.now(UTC) + await licensed_items_checkouts_db.update( + engine, + licensed_item_checkout_id=checkout.licensed_item_checkout_id, + product_name="osparc", + stopped_at=_helper_time, + ) + + # TEST FORCE RELEASE LICENSE SEATS + await licensed_items_checkouts_db.force_release_license_seats_by_run_id( + engine, service_run_id=resource_tracker_service_run_id + ) + + # ASSERT + total, items = await licensed_items_checkouts_db.list_( + engine, + product_name="osparc", + filter_wallet_id=_WALLET_ID, + offset=0, + limit=5, + order_by=OrderBy(field=IDStr("started_at")), + ) + assert total == 3 + assert len(items) == 3 + + _helper_count = 0 + for item in items: + assert isinstance(item.stopped_at, datetime) + if item.stopped_at > _helper_time: + _helper_count += 1 + + assert _helper_count == 2 From 68f0b243954bbe607ad5152fd5919785ca4c5c35 Mon Sep 17 00:00:00 2001 From: Andrei Neagu <5694077+GitHK@users.noreply.github.com> Date: Fri, 20 Dec 2024 08:47:05 +0100 Subject: [PATCH 2/2] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20reroute=20`get=20proje?= =?UTF-8?q?ct=20inactivity`=20via=20`dynamic-scheduler`=20(#6949)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andrei Neagu --- .../dynamic_services.py | 2 ++ .../dynamic_scheduler/services.py | 19 ++++++++++ services/director-v2/openapi.json | 5 ++- .../api/rpc/_services.py | 11 ++++++ .../services/director_v2/_public_client.py | 13 ++++++- .../services/director_v2/_thin_client.py | 10 ++++++ .../services/scheduler_interface.py | 18 ++++++++++ .../unit/api_rpc/test_api_rpc__services.py | 35 +++++++++++++++++++ .../api/v0/openapi.yaml | 2 ++ .../director_v2/_core_dynamic_services.py | 35 ------------------- .../director_v2/api.py | 2 -- .../dynamic_scheduler/api.py | 15 ++++++++ .../projects/projects_api.py | 5 ++- .../02/test_projects_crud_handlers.py | 24 ++++++------- 14 files changed, 140 insertions(+), 56 deletions(-) delete mode 100644 services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/dynamic_services.py b/packages/models-library/src/models_library/api_schemas_directorv2/dynamic_services.py index 151611271a4..d26acac0490 100644 --- a/packages/models-library/src/models_library/api_schemas_directorv2/dynamic_services.py +++ b/packages/models-library/src/models_library/api_schemas_directorv2/dynamic_services.py @@ -79,3 +79,5 @@ class DynamicServiceCreate(ServiceDetails): class GetProjectInactivityResponse(BaseModel): is_inactive: bool + + model_config = ConfigDict(json_schema_extra={"example": {"is_inactive": "false"}}) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py index f58500d771e..fb3276ae670 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py @@ -3,6 +3,7 @@ from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceGet, + GetProjectInactivityResponse, RetrieveDataOutEnveloped, ) from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE @@ -99,6 +100,24 @@ async def stop_dynamic_service( assert result is None # nosec +@log_decorator(_logger, level=logging.DEBUG) +async def get_project_inactivity( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + project_id: ProjectID, + max_inactivity_seconds: NonNegativeInt, +) -> GetProjectInactivityResponse: + result = await rabbitmq_rpc_client.request( + DYNAMIC_SCHEDULER_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python("get_project_inactivity"), + project_id=project_id, + max_inactivity_seconds=max_inactivity_seconds, + timeout_s=_RPC_DEFAULT_TIMEOUT_S, + ) + assert isinstance(result, GetProjectInactivityResponse) # nosec + return result + + @log_decorator(_logger, level=logging.DEBUG) async def restart_user_services( rabbitmq_rpc_client: RabbitMQRPCClient, diff --git a/services/director-v2/openapi.json b/services/director-v2/openapi.json index 51e5f191b27..c769aff191a 100644 --- a/services/director-v2/openapi.json +++ b/services/director-v2/openapi.json @@ -2058,7 +2058,10 @@ "required": [ "is_inactive" ], - "title": "GetProjectInactivityResponse" + "title": "GetProjectInactivityResponse", + "example": { + "is_inactive": "false" + } }, "HTTPValidationError": { "properties": { diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py index a0bbed6b110..b90ed821bfa 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py @@ -1,6 +1,7 @@ from fastapi import FastAPI from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceGet, + GetProjectInactivityResponse, RetrieveDataOutEnveloped, ) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( @@ -12,6 +13,7 @@ from models_library.projects_nodes_io import NodeID from models_library.services_types import ServicePortKey from models_library.users import UserID +from pydantic import NonNegativeInt from servicelib.rabbitmq import RPCRouter from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import ( ServiceWaitingForManualInterventionError, @@ -62,6 +64,15 @@ async def stop_dynamic_service( ) +@router.expose() +async def get_project_inactivity( + app: FastAPI, *, project_id: ProjectID, max_inactivity_seconds: NonNegativeInt +) -> GetProjectInactivityResponse: + return await scheduler_interface.get_project_inactivity( + app, project_id=project_id, max_inactivity_seconds=max_inactivity_seconds + ) + + @router.expose() async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None: await scheduler_interface.restart_user_services(app, node_id=node_id) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py index 2841eb5f467..cc1e8851684 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py @@ -4,6 +4,7 @@ from fastapi import FastAPI, status from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceGet, + GetProjectInactivityResponse, RetrieveDataOutEnveloped, ) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( @@ -14,7 +15,7 @@ from models_library.projects_nodes_io import NodeID from models_library.services_types import ServicePortKey from models_library.users import UserID -from pydantic import TypeAdapter +from pydantic import NonNegativeInt, TypeAdapter from servicelib.fastapi.app_state import SingletonInAppStateMixin from servicelib.fastapi.http_client import AttachLifespanMixin, HasClientSetupInterface from servicelib.fastapi.http_client_thin import UnexpectedStatusError @@ -125,6 +126,16 @@ async def list_tracked_dynamic_services( ) return TypeAdapter(list[DynamicServiceGet]).validate_python(response.json()) + async def get_project_inactivity( + self, *, project_id: ProjectID, max_inactivity_seconds: NonNegativeInt + ) -> GetProjectInactivityResponse: + response = await self.thin_client.get_projects_inactivity( + project_id=project_id, max_inactivity_seconds=max_inactivity_seconds + ) + return TypeAdapter(GetProjectInactivityResponse).validate_python( + response.json() + ) + async def restart_user_services(self, *, node_id: NodeID) -> None: await self.thin_client.post_restart(node_id=node_id) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py index 1b86d604148..412e7377d19 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py @@ -13,6 +13,7 @@ from models_library.services_resources import ServiceResourcesDictHelpers from models_library.services_types import ServicePortKey from models_library.users import UserID +from pydantic import NonNegativeInt from servicelib.common_headers import ( X_DYNAMIC_SIDECAR_REQUEST_DNS, X_DYNAMIC_SIDECAR_REQUEST_SCHEME, @@ -143,6 +144,15 @@ async def get_dynamic_services( ) @retry_on_errors() + @expect_status(status.HTTP_200_OK) + async def get_projects_inactivity( + self, *, project_id: ProjectID, max_inactivity_seconds: NonNegativeInt + ) -> Response: + return await self.client.get( + f"/dynamic_services/projects/{project_id}/inactivity", + params={"max_inactivity_seconds": max_inactivity_seconds}, + ) + @expect_status(status.HTTP_204_NO_CONTENT) async def post_restart(self, *, node_id: NodeID) -> Response: return await self.client.post(f"/dynamic_services/{node_id}:restart") diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py index 27e854ae6db..ff279fb75c9 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py @@ -1,6 +1,7 @@ from fastapi import FastAPI from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceGet, + GetProjectInactivityResponse, RetrieveDataOutEnveloped, ) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( @@ -12,6 +13,7 @@ from models_library.projects_nodes_io import NodeID from models_library.services_types import ServicePortKey from models_library.users import UserID +from pydantic import NonNegativeInt from ..core.settings import ApplicationSettings from .director_v2 import DirectorV2Client @@ -79,6 +81,22 @@ async def stop_dynamic_service( await set_request_as_stopped(app, dynamic_service_stop) +async def get_project_inactivity( + app: FastAPI, *, project_id: ProjectID, max_inactivity_seconds: NonNegativeInt +) -> GetProjectInactivityResponse: + settings: ApplicationSettings = app.state.settings + if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: + raise NotImplementedError + + director_v2_client = DirectorV2Client.get_from_app_state(app) + response: GetProjectInactivityResponse = ( + await director_v2_client.get_project_inactivity( + project_id=project_id, max_inactivity_seconds=max_inactivity_seconds + ) + ) + return response + + async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None: settings: ApplicationSettings = app.state.settings if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: diff --git a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py index 6455ca367f5..f3380bbb2f5 100644 --- a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py +++ b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py @@ -11,6 +11,7 @@ from fastapi.encoders import jsonable_encoder from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceGet, + GetProjectInactivityResponse, RetrieveDataOutEnveloped, ) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( @@ -493,6 +494,40 @@ async def test_stop_dynamic_service_serializes_generic_errors( ) +@pytest.fixture +def inactivity_response() -> GetProjectInactivityResponse: + return TypeAdapter(GetProjectInactivityResponse).validate_python( + GetProjectInactivityResponse.model_json_schema()["example"] + ) + + +@pytest.fixture +def mock_director_v2_get_project_inactivity( + project_id: ProjectID, inactivity_response: GetProjectInactivityResponse +) -> Iterator[None]: + with respx.mock( + base_url="http://director-v2:8000/v2", + assert_all_called=False, + assert_all_mocked=True, # IMPORTANT: KEEP always True! + ) as mock: + mock.get(f"/dynamic_services/projects/{project_id}/inactivity").respond( + status.HTTP_200_OK, text=inactivity_response.model_dump_json() + ) + yield None + + +async def test_get_project_inactivity( + mock_director_v2_get_project_inactivity: None, + rpc_client: RabbitMQRPCClient, + project_id: ProjectID, + inactivity_response: GetProjectInactivityResponse, +): + result = await services.get_project_inactivity( + rpc_client, project_id=project_id, max_inactivity_seconds=5 + ) + assert result == inactivity_response + + @pytest.fixture def mock_director_v2_restart_user_services(node_id: NodeID) -> Iterator[None]: with respx.mock( diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 27bdb777e6f..5e20d12a5ab 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -9952,6 +9952,8 @@ components: required: - is_inactive title: GetProjectInactivityResponse + example: + is_inactive: 'false' GetWalletAutoRecharge: properties: enabled: diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py b/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py deleted file mode 100644 index 5f9f3f3aad8..00000000000 --- a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py +++ /dev/null @@ -1,35 +0,0 @@ -""" Operations on dynamic-services - -- This interface HIDES request/responses/exceptions to the director-v2 API service - -""" - -import logging - -from aiohttp import web -from models_library.projects import ProjectID -from pydantic import NonNegativeInt -from servicelib.logging_utils import log_decorator -from yarl import URL - -from ._core_base import DataType, request_director_v2 -from .settings import DirectorV2Settings, get_plugin_settings - -_log = logging.getLogger(__name__) - - -@log_decorator(logger=_log) -async def get_project_inactivity( - app: web.Application, - project_id: ProjectID, - max_inactivity_seconds: NonNegativeInt, -) -> DataType: - settings: DirectorV2Settings = get_plugin_settings(app) - backend_url = ( - URL(settings.base_url) / f"dynamic_services/projects/{project_id}/inactivity" - ).update_query(max_inactivity_seconds=max_inactivity_seconds) - result = await request_director_v2( - app, "GET", backend_url, expected_status=web.HTTPOk - ) - assert isinstance(result, dict) # nosec - return result diff --git a/services/web/server/src/simcore_service_webserver/director_v2/api.py b/services/web/server/src/simcore_service_webserver/director_v2/api.py index 9c070e7cae9..2ecbb1446fd 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/api.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/api.py @@ -16,7 +16,6 @@ is_pipeline_running, stop_pipeline, ) -from ._core_dynamic_services import get_project_inactivity from ._core_utils import is_healthy from .exceptions import DirectorServiceError @@ -28,7 +27,6 @@ "DirectorServiceError", "get_batch_tasks_outputs", "get_computation_task", - "get_project_inactivity", "get_project_run_policy", "is_healthy", "is_pipeline_running", diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py index bdfd22c3c1d..5773052010b 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py @@ -5,6 +5,7 @@ from aiohttp import web from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceGet, + GetProjectInactivityResponse, RetrieveDataOutEnveloped, ) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( @@ -23,6 +24,7 @@ from models_library.rabbitmq_messages import ProgressRabbitMessageProject, ProgressType from models_library.services import ServicePortKey from models_library.users import UserID +from pydantic import NonNegativeInt from pydantic.types import PositiveInt from servicelib.progress_bar import ProgressBarData from servicelib.rabbitmq import RabbitMQClient, RPCServerError @@ -154,6 +156,19 @@ async def stop_dynamic_services_in_project( await logged_gather(*services_to_stop) +async def get_project_inactivity( + app: web.Application, + *, + project_id: ProjectID, + max_inactivity_seconds: NonNegativeInt, +) -> GetProjectInactivityResponse: + return await services.get_project_inactivity( + get_rabbitmq_rpc_client(app), + project_id=project_id, + max_inactivity_seconds=max_inactivity_seconds, + ) + + async def restart_user_services(app: web.Application, *, node_id: NodeID) -> None: """Restarts the user service(s) started by the the node_uuid's sidecar diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index fc41203434a..3edd4c50e39 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -1895,13 +1895,12 @@ async def get_project_inactivity( app: web.Application, project_id: ProjectID ) -> GetProjectInactivityResponse: project_settings: ProjectsSettings = get_plugin_settings(app) - project_inactivity = await director_v2_api.get_project_inactivity( + return await dynamic_scheduler_api.get_project_inactivity( app, - project_id, + project_id=project_id, # NOTE: project is considered inactive if all services exposing an /inactivity # endpoint were inactive since at least PROJECTS_INACTIVITY_INTERVAL max_inactivity_seconds=int( project_settings.PROJECTS_INACTIVITY_INTERVAL.total_seconds() ), ) - return GetProjectInactivityResponse.model_validate(project_inactivity) diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_crud_handlers.py b/services/web/server/tests/unit/with_dbs/02/test_projects_crud_handlers.py index b15a9fb9e45..dcf954d2b54 100644 --- a/services/web/server/tests/unit/with_dbs/02/test_projects_crud_handlers.py +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_crud_handlers.py @@ -4,7 +4,6 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -import re import uuid as uuidlib from collections.abc import Awaitable, Callable, Iterator from http import HTTPStatus @@ -16,9 +15,13 @@ from aiohttp.test_utils import TestClient from aioresponses import aioresponses from faker import Faker +from models_library.api_schemas_directorv2.dynamic_services import ( + GetProjectInactivityResponse, +) from models_library.products import ProductName from models_library.projects_state import ProjectState from pydantic import TypeAdapter +from pytest_mock import MockerFixture from pytest_simcore.helpers.assert_checks import assert_status from pytest_simcore.helpers.webserver_login import UserInfoDict from pytest_simcore.helpers.webserver_parametrizations import ( @@ -656,18 +659,11 @@ async def test_new_template_from_project( @pytest.fixture -def mock_director_v2_inactivity( - aioresponses_mocker: aioresponses, is_inactive: bool -) -> None: - aioresponses_mocker.clear() - get_services_pattern = re.compile( - r"^http://[a-z\-_]*director-v2:[0-9]+/v2/dynamic_services/projects/.*/inactivity.*$" - ) - aioresponses_mocker.get( - get_services_pattern, - status=status.HTTP_200_OK, - repeat=True, - payload={"is_inactive": is_inactive}, +def mock_dynamic_scheduler_inactivity(mocker: MockerFixture, is_inactive: bool) -> None: + mocker.patch( + "simcore_service_webserver.dynamic_scheduler.api.get_project_inactivity", + autospec=True, + return_value=GetProjectInactivityResponse(is_inactive=is_inactive), ) @@ -680,7 +676,7 @@ def mock_director_v2_inactivity( ) @pytest.mark.parametrize("is_inactive", [True, False]) async def test_get_project_inactivity( - mock_director_v2_inactivity: None, + mock_dynamic_scheduler_inactivity: None, logged_user: UserInfoDict, client: TestClient, faker: Faker,