diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py b/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py index f52120966f2..55867fd42ff 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py @@ -15,6 +15,7 @@ from ..api.rpc.routes import setup_rpc_routes from ..services.background_tasks_setup import setup as setup_background_tasks from ..services.efs_manager_setup import setup as setup_efs_manager +from ..services.fire_and_forget_setup import setup as setup_fire_and_forget from ..services.modules.db import setup as setup_db from ..services.modules.rabbitmq import setup as setup_rabbitmq from ..services.modules.redis import setup as setup_redis @@ -56,6 +57,8 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI: setup_background_tasks(app) # requires Redis, DB setup_process_messages(app) # requires Rabbit + setup_fire_and_forget(app) + # EVENTS async def _on_startup() -> None: print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201 diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py index 0c7e7267954..c0e2e625760 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py @@ -83,6 +83,31 @@ async def get_project_node_data_size( return await efs_manager_utils.get_size_bash_async(_dir_path) + async def list_project_node_state_names( + self, project_id: ProjectID, node_id: NodeID + ) -> list[str]: + """ + These are currently state volumes that are mounted via docker volume to dynamic sidecar and user services + (ex. ".data_assets" and "home_user_workspace") + """ + _dir_path = ( + self._efs_mounted_path + / self._project_specific_data_base_directory + / f"{project_id}" + / f"{node_id}" + ) + + project_node_states = [] + for child in _dir_path.iterdir(): + if child.is_dir(): + project_node_states.append(child.name) + else: + _logger.error( + "This is not a directory. This should not happen! %s", + _dir_path / child.name, + ) + return project_node_states + async def remove_project_node_data_write_permissions( self, project_id: ProjectID, node_id: NodeID ) -> None: diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/fire_and_forget_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/fire_and_forget_setup.py new file mode 100644 index 00000000000..379e46753ae --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/fire_and_forget_setup.py @@ -0,0 +1,37 @@ +import logging +from collections.abc import Awaitable, Callable + +from fastapi import FastAPI +from servicelib.logging_utils import log_catch, log_context + +_logger = logging.getLogger(__name__) + + +def _on_app_startup(_app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _startup() -> None: + with log_context( + _logger, logging.INFO, msg="Efs Guardian setup fire and forget tasks.." + ), log_catch(_logger, reraise=False): + _app.state.efs_guardian_fire_and_forget_tasks = set() + + return _startup + + +def _on_app_shutdown( + _app: FastAPI, +) -> Callable[[], Awaitable[None]]: + async def _stop() -> None: + with log_context( + _logger, logging.INFO, msg="Efs Guardian fire and forget tasks shutdown.." + ), log_catch(_logger, reraise=False): + assert _app # nosec + if _app.state.efs_guardian_fire_and_forget_tasks: + for task in _app.state.efs_guardian_fire_and_forget_tasks: + task.cancel() + + return _stop + + +def setup(app: FastAPI) -> None: + app.add_event_handler("startup", _on_app_startup(app)) + app.add_event_handler("shutdown", _on_app_shutdown(app)) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/rabbitmq.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/rabbitmq.py index 82ef1aae84c..f94c5dbf418 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/rabbitmq.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/rabbitmq.py @@ -29,12 +29,17 @@ async def on_startup() -> None: app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create( client_name="efs_guardian_rpc_server", settings=settings ) + app.state.rabbitmq_rpc_client = await RabbitMQRPCClient.create( + client_name="efs_guardian_rpc_client", settings=settings + ) async def on_shutdown() -> None: if app.state.rabbitmq_client: await app.state.rabbitmq_client.close() if app.state.rabbitmq_rpc_server: await app.state.rabbitmq_rpc_server.close() + if app.state.rabbitmq_rpc_client: + await app.state.rabbitmq_rpc_client.close() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) @@ -53,4 +58,9 @@ def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient: return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server) +def get_rabbitmq_rpc_client(app: FastAPI) -> RabbitMQRPCClient: + assert app.state.rabbitmq_rpc_client # nosec + return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_client) + + __all__ = ("RabbitMQClient",) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py index 11c7781bbae..b4e26a9e73a 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py @@ -1,13 +1,20 @@ import logging from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage from models_library.rabbitmq_messages import DynamicServiceRunningMessage from pydantic import parse_raw_as from servicelib.logging_utils import log_context -from simcore_service_efs_guardian.services.modules.redis import get_redis_lock_client +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.dynamic_sidecar.disk_usage import ( + update_disk_usage, +) +from servicelib.utils import fire_and_forget_task from ..core.settings import get_application_settings from ..services.efs_manager import EfsManager +from ..services.modules.rabbitmq import get_rabbitmq_rpc_client +from ..services.modules.redis import get_redis_lock_client _logger = logging.getLogger(__name__) @@ -50,6 +57,23 @@ async def process_dynamic_service_running_message(app: FastAPI, data: bytes) -> rabbit_message.user_id, ) + project_node_state_names = await efs_manager.list_project_node_state_names( + rabbit_message.project_id, node_id=rabbit_message.node_id + ) + rpc_client: RabbitMQRPCClient = get_rabbitmq_rpc_client(app) + _used = min(size, settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES) + usage: dict[str, DiskUsage] = {} + for name in project_node_state_names: + usage[name] = DiskUsage.from_efs_guardian( + used=_used, total=settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES + ) + + fire_and_forget_task( + update_disk_usage(rpc_client, node_id=rabbit_message.node_id, usage=usage), + task_suffix_name=f"update_disk_usage_efs_user_id{rabbit_message.user_id}_node_id{rabbit_message.node_id}", + fire_and_forget_tasks_collection=app.state.efs_guardian_fire_and_forget_tasks, + ) + if size > settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: msg = f"Removing write permissions inside of EFS starts for project ID: {rabbit_message.project_id}, node ID: {rabbit_message.node_id}, current user: {rabbit_message.user_id}, size: {size}, upper limit: {settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES}" with log_context(_logger, logging.WARNING, msg=msg): diff --git a/services/efs-guardian/tests/conftest.py b/services/efs-guardian/tests/conftest.py index 4309ea7b078..260e5a74026 100644 --- a/services/efs-guardian/tests/conftest.py +++ b/services/efs-guardian/tests/conftest.py @@ -21,6 +21,7 @@ "pytest_simcore.environment_configs", "pytest_simcore.faker_projects_data", "pytest_simcore.faker_users_data", + "pytest_simcore.faker_products_data", "pytest_simcore.faker_projects_data", "pytest_simcore.pydantic_models", "pytest_simcore.pytest_global_environs", diff --git a/services/efs-guardian/tests/unit/test_efs_manager.py b/services/efs-guardian/tests/unit/test_efs_manager.py index fad03fcaa44..35c2535de94 100644 --- a/services/efs-guardian/tests/unit/test_efs_manager.py +++ b/services/efs-guardian/tests/unit/test_efs_manager.py @@ -91,6 +91,11 @@ async def test_remove_write_access_rights( is False ) + with pytest.raises(FileNotFoundError): + await efs_manager.list_project_node_state_names( + project_id=project_id, node_id=node_id + ) + with patch( "simcore_service_efs_guardian.services.efs_manager.os.chown" ) as mocked_chown: @@ -108,6 +113,11 @@ async def test_remove_write_access_rights( is True ) + project_node_state_names = await efs_manager.list_project_node_state_names( + project_id=project_id, node_id=node_id + ) + assert project_node_state_names == [_storage_directory_name] + size_before = await efs_manager.get_project_node_data_size( project_id=project_id, node_id=node_id ) diff --git a/services/efs-guardian/tests/unit/test_process_messages.py b/services/efs-guardian/tests/unit/test_process_messages.py new file mode 100644 index 00000000000..32b439777f0 --- /dev/null +++ b/services/efs-guardian/tests/unit/test_process_messages.py @@ -0,0 +1,111 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +from unittest.mock import AsyncMock, patch + +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.products import ProductName +from models_library.rabbitmq_messages import DynamicServiceRunningMessage +from models_library.users import UserID +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict +from pytest_simcore.helpers.typing_env import EnvVarsDict +from simcore_service_efs_guardian.services.efs_manager import NodeID, ProjectID +from simcore_service_efs_guardian.services.process_messages import ( + process_dynamic_service_running_message, +) + +pytest_simcore_core_services_selection = ["rabbit"] +pytest_simcore_ops_services_selection = [] + + +@pytest.fixture +def app_environment( + monkeypatch: pytest.MonkeyPatch, + app_environment: EnvVarsDict, + rabbit_env_vars_dict: EnvVarsDict, + with_disabled_redis_and_background_tasks: None, + with_disabled_postgres: None, +) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, + { + **app_environment, + **rabbit_env_vars_dict, + "EFS_DEFAULT_USER_SERVICE_SIZE_BYTES": "10000", + }, + ) + + +@patch("simcore_service_efs_guardian.services.process_messages.update_disk_usage") +async def test_process_msg( + mock_update_disk_usage, + faker: Faker, + app: FastAPI, + efs_cleanup: None, + project_id: ProjectID, + node_id: NodeID, + user_id: UserID, + product_name: ProductName, +): + # Create mock data for the message + model_instance = DynamicServiceRunningMessage( + project_id=project_id, + node_id=node_id, + user_id=user_id, + product_name=product_name, + ) + json_str = model_instance.json() + model_bytes = json_str.encode("utf-8") + + _expected_project_node_states = [".data_assets", "home_user_workspace"] + # Mock efs_manager and its methods + mock_efs_manager = AsyncMock() + app.state.efs_manager = mock_efs_manager + mock_efs_manager.check_project_node_data_directory_exits.return_value = True + mock_efs_manager.get_project_node_data_size.return_value = 4000 + mock_efs_manager.list_project_node_state_names.return_value = ( + _expected_project_node_states + ) + + result = await process_dynamic_service_running_message(app, data=model_bytes) + + # Check the actual arguments passed to notify_service_efs_disk_usage + _, kwargs = mock_update_disk_usage.call_args + assert kwargs["usage"] + assert len(kwargs["usage"]) == 2 + for key, value in kwargs["usage"].items(): + assert key in _expected_project_node_states + assert value.used == 4000 + assert value.free == 6000 + assert value.total == 10000 + assert value.used_percent == 40.0 + + assert result is True + + +async def test_process_msg__dir_not_exists( + app: FastAPI, + efs_cleanup: None, + project_id: ProjectID, + node_id: NodeID, + user_id: UserID, + product_name: ProductName, +): + # Create mock data for the message + model_instance = DynamicServiceRunningMessage( + project_id=project_id, + node_id=node_id, + user_id=user_id, + product_name=product_name, + ) + json_str = model_instance.json() + model_bytes = json_str.encode("utf-8") + + result = await process_dynamic_service_running_message(app, data=model_bytes) + assert result is True