diff --git a/CHANGELOG.md b/CHANGELOG.md index c87dc84de..979366562 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## neptune 1.8.2 + +### Changes +- Support for disabling operation saving based on disk utilization ([#1496](https://github.com/neptune-ai/neptune-client/pull/1496)) + + ## neptune 1.8.1 ### Fixes diff --git a/src/neptune/envs.py b/src/neptune/envs.py index e73e739dc..31bc2e144 100644 --- a/src/neptune/envs.py +++ b/src/neptune/envs.py @@ -29,6 +29,8 @@ "NEPTUNE_FETCH_TABLE_STEP_SIZE", "NEPTUNE_SYNC_AFTER_STOP_TIMEOUT", "NEPTUNE_REQUEST_TIMEOUT", + "NEPTUNE_MAX_DISK_UTILIZATION", + "NEPTUNE_NON_RAISING_ON_DISK_ISSUE", "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK", "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK", ] @@ -68,4 +70,8 @@ NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK" +NEPTUNE_MAX_DISK_UTILIZATION = "NEPTUNE_MAX_DISK_UTILIZATION" + +NEPTUNE_NON_RAISING_ON_DISK_ISSUE = "NEPTUNE_NON_RAISING_ON_DISK_ISSUE" + S3_ENDPOINT_URL = "S3_ENDPOINT_URL" diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 865102e76..3e050eb66 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -49,6 +49,7 @@ get_container_dir, ) from neptune.internal.threading.daemon import Daemon +from neptune.internal.utils.disk_full import ensure_disk_not_full from neptune.internal.utils.logger import logger _logger = logging.getLogger(__name__) @@ -105,6 +106,7 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa process_path = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}" return get_container_dir(ASYNC_DIRECTORY, container_id, container_type, process_path) + @ensure_disk_not_full def enqueue_operation(self, op: Operation, *, wait: bool) -> None: self._last_version = self._queue.put(op) diff --git a/src/neptune/internal/operation_processors/offline_operation_processor.py b/src/neptune/internal/operation_processors/offline_operation_processor.py index de286242f..a1968f83b 100644 --- a/src/neptune/internal/operation_processors/offline_operation_processor.py +++ b/src/neptune/internal/operation_processors/offline_operation_processor.py @@ -29,6 +29,7 @@ OperationStorage, get_container_dir, ) +from neptune.internal.utils.disk_full import ensure_disk_not_full class OfflineOperationProcessor(OperationProcessor): @@ -46,6 +47,7 @@ def __init__(self, container_id: UniqueId, container_type: ContainerType, lock: def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Path: return get_container_dir(OFFLINE_DIRECTORY, container_id, container_type) + @ensure_disk_not_full def enqueue_operation(self, op: Operation, *, wait: bool) -> None: self._queue.put(op) diff --git a/src/neptune/internal/operation_processors/sync_operation_processor.py b/src/neptune/internal/operation_processors/sync_operation_processor.py index 0833b34ec..b472d4e15 100644 --- a/src/neptune/internal/operation_processors/sync_operation_processor.py +++ b/src/neptune/internal/operation_processors/sync_operation_processor.py @@ -30,6 +30,7 @@ OperationStorage, get_container_dir, ) +from neptune.internal.utils.disk_full import ensure_disk_not_full class SyncOperationProcessor(OperationProcessor): @@ -45,6 +46,7 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa process_path = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}" return get_container_dir(SYNC_DIRECTORY, container_id, container_type, process_path) + @ensure_disk_not_full def enqueue_operation(self, op: Operation, *, wait: bool) -> None: _, errors = self._backend.execute_operations( container_id=self._container_id, diff --git a/src/neptune/internal/utils/disk_full.py b/src/neptune/internal/utils/disk_full.py new file mode 100644 index 000000000..f56918759 --- /dev/null +++ b/src/neptune/internal/utils/disk_full.py @@ -0,0 +1,87 @@ +# +# Copyright (c) 2023, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +__all__ = ["ensure_disk_not_full"] + + +import os +from functools import wraps +from typing import ( + Any, + Callable, + Dict, + Optional, + Tuple, +) + +import psutil + +from neptune.common.warnings import ( + NeptuneWarning, + warn_once, +) +from neptune.constants import NEPTUNE_DATA_DIRECTORY +from neptune.envs import ( + NEPTUNE_MAX_DISK_UTILIZATION, + NEPTUNE_NON_RAISING_ON_DISK_ISSUE, +) + + +def get_neptune_data_directory() -> str: + return os.getenv("NEPTUNE_DATA_DIRECTORY", NEPTUNE_DATA_DIRECTORY) + + +def get_disk_utilization_percent(path: Optional[str] = None) -> float: + try: + if path is None: + path = get_neptune_data_directory() + + return float(psutil.disk_usage(path).percent) + except (ValueError, UnicodeEncodeError): + return 0 + + +def get_max_percentage_from_env() -> Optional[float]: + value = os.getenv(NEPTUNE_MAX_DISK_UTILIZATION) + if value is not None: + return float(value) + return None + + +def ensure_disk_not_full(func: Callable[..., None]) -> Callable[..., None]: + non_raising_on_disk_issue = NEPTUNE_NON_RAISING_ON_DISK_ISSUE in os.environ + max_disk_utilization = get_max_percentage_from_env() + + @wraps(func) + def wrapper(*args: Tuple, **kwargs: Dict[str, Any]) -> None: + if non_raising_on_disk_issue: + try: + if max_disk_utilization: + current_utilization = get_disk_utilization_percent() + if current_utilization > max_disk_utilization: + warn_once( + f"Max disk utilization {max_disk_utilization}% exceeded with {current_utilization}." + f" Neptune will not be saving your data.", + exception=NeptuneWarning, + ) + return + + func(*args, **kwargs) + except IOError: + warn_once("Encountered disk issue and Neptune will not be saving your data.", exception=NeptuneWarning) + else: + return func(*args, **kwargs) + + return wrapper