Skip to content

Commit

Permalink
Flag added for cleaning internal data (#1589)
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky authored Dec 7, 2023
1 parent 7bc211d commit 4486214
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

### Changes
- Use literals instead of str for Mode typing ([#1586](https://github.com/neptune-ai/neptune-client/pull/1586))
- Flag added for cleaning internal data ([#1589](https://github.com/neptune-ai/neptune-client/pull/1589))


## 1.8.6
Expand Down
3 changes: 3 additions & 0 deletions src/neptune/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK",
"NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK",
"NEPTUNE_DISABLE_PARENT_DIR_DELETION",
"NEPTUNE_CLEAN_INTERNAL_DATA",
"NEPTUNE_ASYNC_BATCH_SIZE",
]

Expand Down Expand Up @@ -75,6 +76,8 @@

NEPTUNE_DISABLE_PARENT_DIR_DELETION = "NEPTUNE_DISABLE_PARENT_DIR_DELETION"

NEPTUNE_CLEAN_INTERNAL_DATA = "NEPTUNE_CLEAN_INTERNAL_DATA"

NEPTUNE_ASYNC_BATCH_SIZE = "NEPTUNE_ASYNC_BATCH_SIZE"

S3_ENDPOINT_URL = "S3_ENDPOINT_URL"
35 changes: 20 additions & 15 deletions src/neptune/internal/disk_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
)

from neptune.exceptions import MalformedOperation
from neptune.internal.utils.files import remove_parent_folder_if_allowed
from neptune.internal.utils.files import (
remove_parent_folder_if_allowed,
should_clean_internal_data,
)
from neptune.internal.utils.json_file_splitter import JsonFileSplitter
from neptune.internal.utils.sync_offset_file import SyncOffsetFile

Expand Down Expand Up @@ -196,19 +199,20 @@ def wait_for_empty(self, seconds: Optional[float] = None) -> bool:
def ack(self, version: int) -> None:
self._last_ack_file.write(version)

log_versions = self._get_all_log_file_versions()
for i in range(0, len(log_versions) - 1):
if log_versions[i + 1] <= version:
filename = self._get_log_file(log_versions[i])
try:
os.remove(filename)
except FileNotFoundError:
# not really a problem
pass
except Exception:
_logger.exception("Cannot remove queue file %s", filename)
else:
break
if should_clean_internal_data():
log_versions = self._get_all_log_file_versions()
for i in range(0, len(log_versions) - 1):
if log_versions[i + 1] <= version:
filename = self._get_log_file(log_versions[i])
try:
os.remove(filename)
except FileNotFoundError:
# not really a problem
pass
except Exception:
_logger.exception("Cannot remove queue file %s", filename)
else:
break

with self._empty_cond:
if self.is_empty():
Expand Down Expand Up @@ -252,4 +256,5 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.flush()
self.close()
self.cleanup_if_empty()
if should_clean_internal_data():
self.cleanup_if_empty()
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from neptune.internal.threading.daemon import Daemon
from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize
from neptune.internal.utils.files import should_clean_internal_data
from neptune.internal.utils.logger import logger

if TYPE_CHECKING:
Expand Down Expand Up @@ -252,7 +253,7 @@ def stop(self, seconds: Optional[float] = None) -> None:
self.close()

# Remove local files
if self._queue.is_empty():
if should_clean_internal_data() and self._queue.is_empty():
# TODO: Will be refactored
self._metadata_file.cleanup()
self._queue.cleanup_if_empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from neptune.internal.operation_processors.utils import common_metadata
from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize
from neptune.internal.utils.files import should_clean_internal_data

if TYPE_CHECKING:
from pathlib import Path
Expand Down Expand Up @@ -72,7 +73,10 @@ def enqueue_operation(self, op: "Operation", *, wait: bool) -> None:
raise errors[0]

def stop(self, seconds: Optional[float] = None) -> None:
# Remove local files
if should_clean_internal_data():
self._remove_local_files()

def _remove_local_files(self) -> None:
self._metadata_file.cleanup()
self._operation_storage.cleanup()

Expand Down
6 changes: 5 additions & 1 deletion src/neptune/internal/utils/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = ["remove_parent_folder_if_allowed"]
__all__ = ["remove_parent_folder_if_allowed", "should_clean_internal_data"]

import os
from pathlib import Path
Expand All @@ -38,3 +38,7 @@ def remove_parent_folder_if_allowed(path: Path) -> None:
os.rmdir(parent)
except OSError:
logger.debug(f"Cannot remove directory: {parent}")


def should_clean_internal_data() -> bool:
return os.getenv("NEPTUNE_CLEAN_INTERNAL_DATA", "True").lower() in ("true", "t", "1")
19 changes: 17 additions & 2 deletions tests/unit/neptune/new/attributes/atoms/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
FileSetVal,
)
from neptune.common.utils import IS_WINDOWS
from neptune.envs import NEPTUNE_DISABLE_PARENT_DIR_DELETION
from neptune.envs import (
NEPTUNE_CLEAN_INTERNAL_DATA,
NEPTUNE_DISABLE_PARENT_DIR_DELETION,
)
from neptune.internal.operation import (
UploadFile,
UploadFileSet,
Expand Down Expand Up @@ -192,7 +195,7 @@ def test_clean_files_on_close(self):
assert not os.path.exists(data_path.parent) # run folder

@patch.dict(os.environ, {NEPTUNE_DISABLE_PARENT_DIR_DELETION: "True"})
def test_clean_files_on_close_when_cleanup_disabled(self):
def test_clean_files_on_close_when_parent_dir_cleanup_disabled(self):
with self._exp() as run:
data_path = run._op_processor._operation_storage.data_path

Expand All @@ -202,3 +205,15 @@ def test_clean_files_on_close_when_cleanup_disabled(self):

assert not os.path.exists(data_path) # exec folder
assert os.path.exists(data_path.parent) # run folder

@patch.dict(os.environ, {NEPTUNE_CLEAN_INTERNAL_DATA: "False"})
def test_clean_files_on_close_when_internal_cleanup_disabled(self):
with self._exp() as run:
data_path = run._op_processor._operation_storage.data_path

assert os.path.exists(data_path)

run.stop()

assert os.path.exists(data_path) # exec folder
assert os.path.exists(data_path.parent) # run folder

0 comments on commit 4486214

Please sign in to comment.