Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 notify frontend about current efs disk space #6520

Merged
merged 14 commits into from
Oct 29, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..api.rpc.routes import setup_rpc_routes
from ..services.background_tasks_setup import setup as setup_background_tasks
from ..services.efs_manager_setup import setup as setup_efs_manager
from ..services.fire_and_forget_setup import setup as setup_fire_and_forget
from ..services.modules.db import setup as setup_db
from ..services.modules.rabbitmq import setup as setup_rabbitmq
from ..services.modules.redis import setup as setup_redis
Expand Down Expand Up @@ -56,6 +57,8 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
setup_background_tasks(app) # requires Redis, DB
setup_process_messages(app) # requires Rabbit

setup_fire_and_forget(app)

# EVENTS
async def _on_startup() -> None:
print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,31 @@ async def get_project_node_data_size(

return await efs_manager_utils.get_size_bash_async(_dir_path)

async def list_project_node_state_names(
self, project_id: ProjectID, node_id: NodeID
) -> list[str]:
"""
These are currently state volumes that are mounted via docker volume to dynamic sidecar and user services
(ex. ".data_assets" and "home_user_workspace")
"""
_dir_path = (
self._efs_mounted_path
/ self._project_specific_data_base_directory
/ f"{project_id}"
/ f"{node_id}"
)

project_node_states = []
for child in _dir_path.iterdir():
if child.is_dir():
project_node_states.append(child.name)
else:
_logger.error(
"This is not a directory. This should not happen! %s",
_dir_path / child.name,
)
return project_node_states

async def remove_project_node_data_write_permissions(
self, project_id: ProjectID, node_id: NodeID
) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
from collections.abc import Awaitable, Callable

from fastapi import FastAPI
from servicelib.logging_utils import log_catch, log_context

_logger = logging.getLogger(__name__)


def _on_app_startup(_app: FastAPI) -> Callable[[], Awaitable[None]]:
async def _startup() -> None:
with log_context(
_logger, logging.INFO, msg="Efs Guardian setup fire and forget tasks.."
), log_catch(_logger, reraise=False):
_app.state.efs_guardian_fire_and_forget_tasks = set()

return _startup


def _on_app_shutdown(
_app: FastAPI,
) -> Callable[[], Awaitable[None]]:
async def _stop() -> None:
with log_context(
_logger, logging.INFO, msg="Efs Guardian fire and forget tasks shutdown.."
), log_catch(_logger, reraise=False):
assert _app # nosec
if _app.state.efs_guardian_fire_and_forget_tasks:
for task in _app.state.efs_guardian_fire_and_forget_tasks:
task.cancel()

return _stop


def setup(app: FastAPI) -> None:
app.add_event_handler("startup", _on_app_startup(app))
app.add_event_handler("shutdown", _on_app_shutdown(app))
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ async def on_startup() -> None:
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
client_name="efs_guardian_rpc_server", settings=settings
)
app.state.rabbitmq_rpc_client = await RabbitMQRPCClient.create(
client_name="efs_guardian_rpc_client", settings=settings
)

async def on_shutdown() -> None:
if app.state.rabbitmq_client:
await app.state.rabbitmq_client.close()
if app.state.rabbitmq_rpc_server:
await app.state.rabbitmq_rpc_server.close()
if app.state.rabbitmq_rpc_client:
await app.state.rabbitmq_rpc_client.close()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)
Expand All @@ -53,4 +58,9 @@ def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient:
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server)


def get_rabbitmq_rpc_client(app: FastAPI) -> RabbitMQRPCClient:
assert app.state.rabbitmq_rpc_client # nosec
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_client)


__all__ = ("RabbitMQClient",)
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import logging

from fastapi import FastAPI
from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage
from models_library.rabbitmq_messages import DynamicServiceRunningMessage
from pydantic import parse_raw_as
from servicelib.logging_utils import log_context
from simcore_service_efs_guardian.services.modules.redis import get_redis_lock_client
from servicelib.rabbitmq import RabbitMQRPCClient
from servicelib.rabbitmq.rpc_interfaces.dynamic_sidecar.disk_usage import (
update_disk_usage,
)
from servicelib.utils import fire_and_forget_task

from ..core.settings import get_application_settings
from ..services.efs_manager import EfsManager
from ..services.modules.rabbitmq import get_rabbitmq_rpc_client
from ..services.modules.redis import get_redis_lock_client

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,6 +57,23 @@ async def process_dynamic_service_running_message(app: FastAPI, data: bytes) ->
rabbit_message.user_id,
)

project_node_state_names = await efs_manager.list_project_node_state_names(
rabbit_message.project_id, node_id=rabbit_message.node_id
)
rpc_client: RabbitMQRPCClient = get_rabbitmq_rpc_client(app)
_used = min(size, settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES)
usage: dict[str, DiskUsage] = {}
for name in project_node_state_names:
usage[name] = DiskUsage.from_efs_guardian(
used=_used, total=settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES
)

fire_and_forget_task(
update_disk_usage(rpc_client, node_id=rabbit_message.node_id, usage=usage),
task_suffix_name=f"update_disk_usage_efs_user_id{rabbit_message.user_id}_node_id{rabbit_message.node_id}",
fire_and_forget_tasks_collection=app.state.efs_guardian_fire_and_forget_tasks,
)

if size > settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES:
msg = f"Removing write permissions inside of EFS starts for project ID: {rabbit_message.project_id}, node ID: {rabbit_message.node_id}, current user: {rabbit_message.user_id}, size: {size}, upper limit: {settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES}"
with log_context(_logger, logging.WARNING, msg=msg):
Expand Down
1 change: 1 addition & 0 deletions services/efs-guardian/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"pytest_simcore.environment_configs",
"pytest_simcore.faker_projects_data",
"pytest_simcore.faker_users_data",
"pytest_simcore.faker_products_data",
"pytest_simcore.faker_projects_data",
"pytest_simcore.pydantic_models",
"pytest_simcore.pytest_global_environs",
Expand Down
10 changes: 10 additions & 0 deletions services/efs-guardian/tests/unit/test_efs_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ async def test_remove_write_access_rights(
is False
)

with pytest.raises(FileNotFoundError):
await efs_manager.list_project_node_state_names(
project_id=project_id, node_id=node_id
)

with patch(
"simcore_service_efs_guardian.services.efs_manager.os.chown"
) as mocked_chown:
Expand All @@ -108,6 +113,11 @@ async def test_remove_write_access_rights(
is True
)

project_node_state_names = await efs_manager.list_project_node_state_names(
project_id=project_id, node_id=node_id
)
assert project_node_state_names == [_storage_directory_name]

size_before = await efs_manager.get_project_node_data_size(
project_id=project_id, node_id=node_id
)
Expand Down
111 changes: 111 additions & 0 deletions services/efs-guardian/tests/unit/test_process_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-arguments
# pylint: disable=unused-argument
# pylint: disable=unused-variable


from unittest.mock import AsyncMock, patch

import pytest
from faker import Faker
from fastapi import FastAPI
from models_library.products import ProductName
from models_library.rabbitmq_messages import DynamicServiceRunningMessage
from models_library.users import UserID
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from simcore_service_efs_guardian.services.efs_manager import NodeID, ProjectID
from simcore_service_efs_guardian.services.process_messages import (
process_dynamic_service_running_message,
)

pytest_simcore_core_services_selection = ["rabbit"]
pytest_simcore_ops_services_selection = []


@pytest.fixture
def app_environment(
monkeypatch: pytest.MonkeyPatch,
app_environment: EnvVarsDict,
rabbit_env_vars_dict: EnvVarsDict,
with_disabled_redis_and_background_tasks: None,
with_disabled_postgres: None,
) -> EnvVarsDict:
return setenvs_from_dict(
monkeypatch,
{
**app_environment,
**rabbit_env_vars_dict,
"EFS_DEFAULT_USER_SERVICE_SIZE_BYTES": "10000",
},
)


@patch("simcore_service_efs_guardian.services.process_messages.update_disk_usage")
async def test_process_msg(
mock_update_disk_usage,
faker: Faker,
app: FastAPI,
efs_cleanup: None,
project_id: ProjectID,
node_id: NodeID,
user_id: UserID,
product_name: ProductName,
):
# Create mock data for the message
model_instance = DynamicServiceRunningMessage(
project_id=project_id,
node_id=node_id,
user_id=user_id,
product_name=product_name,
)
json_str = model_instance.json()
model_bytes = json_str.encode("utf-8")

_expected_project_node_states = [".data_assets", "home_user_workspace"]
# Mock efs_manager and its methods
mock_efs_manager = AsyncMock()
app.state.efs_manager = mock_efs_manager
mock_efs_manager.check_project_node_data_directory_exits.return_value = True
mock_efs_manager.get_project_node_data_size.return_value = 4000
mock_efs_manager.list_project_node_state_names.return_value = (
_expected_project_node_states
)

result = await process_dynamic_service_running_message(app, data=model_bytes)

# Check the actual arguments passed to notify_service_efs_disk_usage
_, kwargs = mock_update_disk_usage.call_args
assert kwargs["usage"]
assert len(kwargs["usage"]) == 2
for key, value in kwargs["usage"].items():
assert key in _expected_project_node_states
assert value.used == 4000
assert value.free == 6000
assert value.total == 10000
assert value.used_percent == 40.0

assert result is True


async def test_process_msg__dir_not_exists(
app: FastAPI,
efs_cleanup: None,
project_id: ProjectID,
node_id: NodeID,
user_id: UserID,
product_name: ProductName,
):
# Create mock data for the message
model_instance = DynamicServiceRunningMessage(
project_id=project_id,
node_id=node_id,
user_id=user_id,
product_name=product_name,
)
json_str = model_instance.json()
model_bytes = json_str.encode("utf-8")

result = await process_dynamic_service_running_message(app, data=model_bytes)
assert result is True
Loading