From 4486214bb17538aaa3e049973ff6a571b14d02bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Jankowski?= Date: Thu, 7 Dec 2023 09:56:43 +0100 Subject: [PATCH] Flag added for cleaning internal data (#1589) --- CHANGELOG.md | 1 + src/neptune/envs.py | 3 ++ src/neptune/internal/disk_queue.py | 35 +++++++++++-------- .../async_operation_processor.py | 3 +- .../sync_operation_processor.py | 6 +++- src/neptune/internal/utils/files.py | 6 +++- .../neptune/new/attributes/atoms/test_file.py | 19 ++++++++-- 7 files changed, 53 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c0e7c52f..aabb2f147 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Changes - Use literals instead of str for Mode typing ([#1586](https://github.com/neptune-ai/neptune-client/pull/1586)) +- Flag added for cleaning internal data ([#1589](https://github.com/neptune-ai/neptune-client/pull/1589)) ## 1.8.6 diff --git a/src/neptune/envs.py b/src/neptune/envs.py index b63ac488d..4a4c68d64 100644 --- a/src/neptune/envs.py +++ b/src/neptune/envs.py @@ -33,6 +33,7 @@ "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK", "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK", "NEPTUNE_DISABLE_PARENT_DIR_DELETION", + "NEPTUNE_CLEAN_INTERNAL_DATA", "NEPTUNE_ASYNC_BATCH_SIZE", ] @@ -75,6 +76,8 @@ NEPTUNE_DISABLE_PARENT_DIR_DELETION = "NEPTUNE_DISABLE_PARENT_DIR_DELETION" +NEPTUNE_CLEAN_INTERNAL_DATA = "NEPTUNE_CLEAN_INTERNAL_DATA" + NEPTUNE_ASYNC_BATCH_SIZE = "NEPTUNE_ASYNC_BATCH_SIZE" S3_ENDPOINT_URL = "S3_ENDPOINT_URL" diff --git a/src/neptune/internal/disk_queue.py b/src/neptune/internal/disk_queue.py index 3857ed663..2bd2e5e35 100644 --- a/src/neptune/internal/disk_queue.py +++ b/src/neptune/internal/disk_queue.py @@ -34,7 +34,10 @@ ) from neptune.exceptions import MalformedOperation -from neptune.internal.utils.files import remove_parent_folder_if_allowed +from neptune.internal.utils.files import ( + remove_parent_folder_if_allowed, + should_clean_internal_data, +) from neptune.internal.utils.json_file_splitter import JsonFileSplitter from neptune.internal.utils.sync_offset_file import SyncOffsetFile @@ -196,19 +199,20 @@ def wait_for_empty(self, seconds: Optional[float] = None) -> bool: def ack(self, version: int) -> None: self._last_ack_file.write(version) - log_versions = self._get_all_log_file_versions() - for i in range(0, len(log_versions) - 1): - if log_versions[i + 1] <= version: - filename = self._get_log_file(log_versions[i]) - try: - os.remove(filename) - except FileNotFoundError: - # not really a problem - pass - except Exception: - _logger.exception("Cannot remove queue file %s", filename) - else: - break + if should_clean_internal_data(): + log_versions = self._get_all_log_file_versions() + for i in range(0, len(log_versions) - 1): + if log_versions[i + 1] <= version: + filename = self._get_log_file(log_versions[i]) + try: + os.remove(filename) + except FileNotFoundError: + # not really a problem + pass + except Exception: + _logger.exception("Cannot remove queue file %s", filename) + else: + break with self._empty_cond: if self.is_empty(): @@ -252,4 +256,5 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.flush() self.close() - self.cleanup_if_empty() + if should_clean_internal_data(): + self.cleanup_if_empty() diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index b99f5d3e2..111c66041 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -58,6 +58,7 @@ ) from neptune.internal.threading.daemon import Daemon from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize +from neptune.internal.utils.files import should_clean_internal_data from neptune.internal.utils.logger import logger if TYPE_CHECKING: @@ -252,7 +253,7 @@ def stop(self, seconds: Optional[float] = None) -> None: self.close() # Remove local files - if self._queue.is_empty(): + if should_clean_internal_data() and self._queue.is_empty(): # TODO: Will be refactored self._metadata_file.cleanup() self._queue.cleanup_if_empty() diff --git a/src/neptune/internal/operation_processors/sync_operation_processor.py b/src/neptune/internal/operation_processors/sync_operation_processor.py index 885fa1251..09c6acbe9 100644 --- a/src/neptune/internal/operation_processors/sync_operation_processor.py +++ b/src/neptune/internal/operation_processors/sync_operation_processor.py @@ -31,6 +31,7 @@ ) from neptune.internal.operation_processors.utils import common_metadata from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize +from neptune.internal.utils.files import should_clean_internal_data if TYPE_CHECKING: from pathlib import Path @@ -72,7 +73,10 @@ def enqueue_operation(self, op: "Operation", *, wait: bool) -> None: raise errors[0] def stop(self, seconds: Optional[float] = None) -> None: - # Remove local files + if should_clean_internal_data(): + self._remove_local_files() + + def _remove_local_files(self) -> None: self._metadata_file.cleanup() self._operation_storage.cleanup() diff --git a/src/neptune/internal/utils/files.py b/src/neptune/internal/utils/files.py index aa0579985..6db544fda 100644 --- a/src/neptune/internal/utils/files.py +++ b/src/neptune/internal/utils/files.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["remove_parent_folder_if_allowed"] +__all__ = ["remove_parent_folder_if_allowed", "should_clean_internal_data"] import os from pathlib import Path @@ -38,3 +38,7 @@ def remove_parent_folder_if_allowed(path: Path) -> None: os.rmdir(parent) except OSError: logger.debug(f"Cannot remove directory: {parent}") + + +def should_clean_internal_data() -> bool: + return os.getenv("NEPTUNE_CLEAN_INTERNAL_DATA", "True").lower() in ("true", "t", "1") diff --git a/tests/unit/neptune/new/attributes/atoms/test_file.py b/tests/unit/neptune/new/attributes/atoms/test_file.py index cd3a44a13..05dc03c41 100644 --- a/tests/unit/neptune/new/attributes/atoms/test_file.py +++ b/tests/unit/neptune/new/attributes/atoms/test_file.py @@ -36,7 +36,10 @@ FileSetVal, ) from neptune.common.utils import IS_WINDOWS -from neptune.envs import NEPTUNE_DISABLE_PARENT_DIR_DELETION +from neptune.envs import ( + NEPTUNE_CLEAN_INTERNAL_DATA, + NEPTUNE_DISABLE_PARENT_DIR_DELETION, +) from neptune.internal.operation import ( UploadFile, UploadFileSet, @@ -192,7 +195,7 @@ def test_clean_files_on_close(self): assert not os.path.exists(data_path.parent) # run folder @patch.dict(os.environ, {NEPTUNE_DISABLE_PARENT_DIR_DELETION: "True"}) - def test_clean_files_on_close_when_cleanup_disabled(self): + def test_clean_files_on_close_when_parent_dir_cleanup_disabled(self): with self._exp() as run: data_path = run._op_processor._operation_storage.data_path @@ -202,3 +205,15 @@ def test_clean_files_on_close_when_cleanup_disabled(self): assert not os.path.exists(data_path) # exec folder assert os.path.exists(data_path.parent) # run folder + + @patch.dict(os.environ, {NEPTUNE_CLEAN_INTERNAL_DATA: "False"}) + def test_clean_files_on_close_when_internal_cleanup_disabled(self): + with self._exp() as run: + data_path = run._op_processor._operation_storage.data_path + + assert os.path.exists(data_path) + + run.stop() + + assert os.path.exists(data_path) # exec folder + assert os.path.exists(data_path.parent) # run folder