Skip to content

Commit

Permalink
Fix waiting for queue to be empty
Browse files Browse the repository at this point in the history
  • Loading branch information
aniezurawski authored May 4, 2021
1 parent 812a0e7 commit ff037b3
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions neptune/new/internal/containers/disk_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ def __init__(
self._from_dict = from_dict
self._max_file_size = max_file_size

self._event_empty = Event()
self._event_empty.set()

try:
os.makedirs(self._dir_path)
except FileExistsError:
Expand All @@ -63,6 +60,12 @@ def __init__(
self._file_size = 0
self._should_skip_to_ack = True

self._event_empty = Event()
if self.is_empty():
self._event_empty.set()
else:
self._event_empty.clear()

def put(self, obj: T) -> int:
version = self._last_put_file.read_local() + 1
self._event_empty.clear()
Expand Down Expand Up @@ -101,7 +104,6 @@ def _get(self) -> Tuple[Optional[T], int]:
_json = self._reader.get()
if not _json:
if self._read_file_version >= self._write_file_version:
self._event_empty.set()
return None, self._last_put_file.read_local()
self._reader.close()
self._read_file_version = self._next_log_file_version(self._read_file_version)
Expand Down Expand Up @@ -141,6 +143,9 @@ def wait_for_empty(self, seconds: Optional[float] = None) -> None:

def ack(self, version: int) -> None:
self._last_ack_file.write(version)
if self.is_empty():
self._event_empty.set()

log_versions = self._get_all_log_file_versions()
for i in range(0, len(log_versions) - 1):
if log_versions[i + 1] <= version:
Expand Down

0 comments on commit ff037b3

Please sign in to comment.