diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a20b1216..6780e5c64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,9 @@ ### Fixes - Load CLI plug-ins in try..except block to avoid a failure in loading a plug-in to crash entire CLI ([#1392](https://github.com/neptune-ai/neptune-client/pull/1392)) -- Fixed cleaning operation storage when using sync mode and forking ([#1413](https://github.com/neptune-ai/neptune-client/pull/1413)) +- Fixed cleaning operation storage when using `sync` mode and forking ([#1413](https://github.com/neptune-ai/neptune-client/pull/1413)) - Fix FileDependenciesStrategy when the dependency file is in a folder ([#1411](https://github.com/neptune-ai/neptune-client/pull/1411)) - +- Fixed cleaning operation storage when using `async` mode and forking ([#1418](https://github.com/neptune-ai/neptune-client/pull/1418)) ## neptune 1.4.1 diff --git a/src/neptune/internal/disk_queue.py b/src/neptune/internal/disk_queue.py index 56d6351ec..d733bcd4d 100644 --- a/src/neptune/internal/disk_queue.py +++ b/src/neptune/internal/disk_queue.py @@ -167,14 +167,15 @@ def flush(self): self._last_put_file.flush() def close(self): - """ - Close and remove underlying files if queue is empty - """ self._reader.close() self._writer.close() self._last_ack_file.close() self._last_put_file.close() + def cleanup_if_empty(self) -> None: + """ + Remove underlying files if queue is empty + """ if self.is_empty(): self._remove_data() @@ -258,3 +259,4 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.flush() self.close() + self.cleanup_if_empty() diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 08a681e3e..2ab814cea 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -194,7 +194,7 @@ def _wait_for_queue_empty(self, initial_queue_size: int, seconds: Optional[float already_synced_proc, ) - def stop(self, seconds: Optional[float] = None): + def stop(self, seconds: Optional[float] = None) -> None: ts = time() self._queue.flush() if self._consumer.is_running(): @@ -204,7 +204,12 @@ def stop(self, seconds: Optional[float] = None): self._consumer.interrupt() sec_left = None if seconds is None else seconds - (time() - ts) self._consumer.join(sec_left) - self._queue.close() + + # Close resources + self.close() + + # Remove local files + self._queue.cleanup_if_empty() def close(self): self._queue.close() diff --git a/src/neptune/internal/operation_processors/offline_operation_processor.py b/src/neptune/internal/operation_processors/offline_operation_processor.py index bb0130f11..de286242f 100644 --- a/src/neptune/internal/operation_processors/offline_operation_processor.py +++ b/src/neptune/internal/operation_processors/offline_operation_processor.py @@ -58,5 +58,10 @@ def flush(self): def start(self): pass - def stop(self, seconds: Optional[float] = None): + def stop(self, seconds: Optional[float] = None) -> None: + self.close() + # Remove local files + self._queue.cleanup_if_empty() + + def close(self) -> None: self._queue.close() diff --git a/src/neptune/internal/operation_processors/operation_processor.py b/src/neptune/internal/operation_processors/operation_processor.py index 22681e49c..73ebcc785 100644 --- a/src/neptune/internal/operation_processors/operation_processor.py +++ b/src/neptune/internal/operation_processors/operation_processor.py @@ -22,31 +22,32 @@ class OperationProcessor(abc.ABC): + def pause(self): + pass + + def resume(self): + pass + @abc.abstractmethod def enqueue_operation(self, op: Operation, *, wait: bool) -> None: - pass + ... @abc.abstractmethod def wait(self) -> None: - pass + ... @abc.abstractmethod def flush(self): - pass + ... @abc.abstractmethod def start(self): - pass - - def pause(self): - pass - - def resume(self): - pass + ... @abc.abstractmethod - def stop(self, seconds: Optional[float] = None): - pass + def stop(self, seconds: Optional[float] = None) -> None: + ... - def close(self): - self.stop() + @abc.abstractmethod + def close(self) -> None: + ... diff --git a/src/neptune/internal/operation_processors/operation_storage.py b/src/neptune/internal/operation_processors/operation_storage.py index c8ebf8d2d..3fc31b465 100644 --- a/src/neptune/internal/operation_processors/operation_storage.py +++ b/src/neptune/internal/operation_processors/operation_storage.py @@ -54,7 +54,7 @@ def data_path(self) -> Path: def upload_path(self) -> Path: return self.data_path / "upload_path" - def close(self): + def cleanup(self) -> None: shutil.rmtree(self.data_path, ignore_errors=True) parent = self.data_path.parent diff --git a/src/neptune/internal/operation_processors/read_only_operation_processor.py b/src/neptune/internal/operation_processors/read_only_operation_processor.py index 388a65503..365a11189 100644 --- a/src/neptune/internal/operation_processors/read_only_operation_processor.py +++ b/src/neptune/internal/operation_processors/read_only_operation_processor.py @@ -45,5 +45,8 @@ def flush(self): def start(self): pass - def stop(self, seconds: Optional[float] = None): + def stop(self, seconds: Optional[float] = None) -> None: + pass + + def close(self) -> None: pass diff --git a/src/neptune/internal/operation_processors/sync_operation_processor.py b/src/neptune/internal/operation_processors/sync_operation_processor.py index bfa49f9e4..0833b34ec 100644 --- a/src/neptune/internal/operation_processors/sync_operation_processor.py +++ b/src/neptune/internal/operation_processors/sync_operation_processor.py @@ -64,8 +64,9 @@ def flush(self): def start(self): pass - def stop(self, seconds: Optional[float] = None): - self._operation_storage.close() + def stop(self, seconds: Optional[float] = None) -> None: + # Remove local files + self._operation_storage.cleanup() - def close(self): + def close(self) -> None: pass