Skip to content

Commit

Permalink
Fixed cleaning operation storage when using async mode and forking (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky authored Aug 10, 2023
1 parent 1977426 commit a1fb010
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 27 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

### Fixes
- Load CLI plug-ins in try..except block to avoid a failure in loading a plug-in to crash entire CLI ([#1392](https://github.com/neptune-ai/neptune-client/pull/1392))
- Fixed cleaning operation storage when using sync mode and forking ([#1413](https://github.com/neptune-ai/neptune-client/pull/1413))
- Fixed cleaning operation storage when using `sync` mode and forking ([#1413](https://github.com/neptune-ai/neptune-client/pull/1413))
- Fix FileDependenciesStrategy when the dependency file is in a folder ([#1411](https://github.com/neptune-ai/neptune-client/pull/1411))

- Fixed cleaning operation storage when using `async` mode and forking ([#1418](https://github.com/neptune-ai/neptune-client/pull/1418))


## neptune 1.4.1
Expand Down
8 changes: 5 additions & 3 deletions src/neptune/internal/disk_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ def flush(self):
self._last_put_file.flush()

def close(self):
"""
Close and remove underlying files if queue is empty
"""
self._reader.close()
self._writer.close()
self._last_ack_file.close()
self._last_put_file.close()

def cleanup_if_empty(self) -> None:
"""
Remove underlying files if queue is empty
"""
if self.is_empty():
self._remove_data()

Expand Down Expand Up @@ -258,3 +259,4 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.flush()
self.close()
self.cleanup_if_empty()
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def _wait_for_queue_empty(self, initial_queue_size: int, seconds: Optional[float
already_synced_proc,
)

def stop(self, seconds: Optional[float] = None):
def stop(self, seconds: Optional[float] = None) -> None:
ts = time()
self._queue.flush()
if self._consumer.is_running():
Expand All @@ -204,7 +204,12 @@ def stop(self, seconds: Optional[float] = None):
self._consumer.interrupt()
sec_left = None if seconds is None else seconds - (time() - ts)
self._consumer.join(sec_left)
self._queue.close()

# Close resources
self.close()

# Remove local files
self._queue.cleanup_if_empty()

def close(self):
self._queue.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,10 @@ def flush(self):
def start(self):
pass

def stop(self, seconds: Optional[float] = None):
def stop(self, seconds: Optional[float] = None) -> None:
self.close()
# Remove local files
self._queue.cleanup_if_empty()

def close(self) -> None:
self._queue.close()
29 changes: 15 additions & 14 deletions src/neptune/internal/operation_processors/operation_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,32 @@


class OperationProcessor(abc.ABC):
def pause(self):
pass

def resume(self):
pass

@abc.abstractmethod
def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
pass
...

@abc.abstractmethod
def wait(self) -> None:
pass
...

@abc.abstractmethod
def flush(self):
pass
...

@abc.abstractmethod
def start(self):
pass

def pause(self):
pass

def resume(self):
pass
...

@abc.abstractmethod
def stop(self, seconds: Optional[float] = None):
pass
def stop(self, seconds: Optional[float] = None) -> None:
...

def close(self):
self.stop()
@abc.abstractmethod
def close(self) -> None:
...
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def data_path(self) -> Path:
def upload_path(self) -> Path:
return self.data_path / "upload_path"

def close(self):
def cleanup(self) -> None:
shutil.rmtree(self.data_path, ignore_errors=True)

parent = self.data_path.parent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,8 @@ def flush(self):
def start(self):
pass

def stop(self, seconds: Optional[float] = None):
def stop(self, seconds: Optional[float] = None) -> None:
pass

def close(self) -> None:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ def flush(self):
def start(self):
pass

def stop(self, seconds: Optional[float] = None):
self._operation_storage.close()
def stop(self, seconds: Optional[float] = None) -> None:
# Remove local files
self._operation_storage.cleanup()

def close(self):
def close(self) -> None:
pass

0 comments on commit a1fb010

Please sign in to comment.