Skip to content

Commit

Permalink
Fix race condition in disk queue (#626)
Browse files Browse the repository at this point in the history
  • Loading branch information
aniezurawski authored Jul 16, 2021
1 parent 5b45de6 commit 36d221f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
### Features
- Added NEPTUNE_MONITORING_NAMEPSACE environment variable ([#623](https://github.com/neptune-ai/neptune-client/pull/623))

### Fixes
- Use absolute path for operations queue([#624](https://github.com/neptune-ai/neptune-client/pull/624))
- Fix race condition in operations queue([#626](https://github.com/neptune-ai/neptune-client/pull/626))

## neptune-client 0.10.1

### Features
Expand Down
11 changes: 7 additions & 4 deletions neptune/new/internal/containers/disk_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,20 @@ def put(self, obj: T) -> int:

def get(self) -> Tuple[Optional[T], int]:
if self._should_skip_to_ack:
self._should_skip_to_ack = False
return self._skip_and_get()
else:
return self._get()

def _skip_and_get(self) -> Tuple[Optional[T], int]:
ack_version = self._last_ack_file.read_local()
ver = -1
while True:
obj, ver = self._get()
obj, next_ver = self._get()
if obj is None:
return None, ver
ver = next_ver
if ver > ack_version:
self._should_skip_to_ack = False
if ver > ack_version + 1:
_logger.warning("Possible data loss. Last acknowledged operation version: %d, next: %d",
ack_version, ver)
Expand All @@ -104,7 +106,7 @@ def _get(self) -> Tuple[Optional[T], int]:
_json = self._reader.get()
if not _json:
if self._read_file_version >= self._write_file_version:
return None, self._last_put_file.read_local()
return None, -1
self._reader.close()
self._read_file_version = self._next_log_file_version(self._read_file_version)
self._reader = JsonFileSplitter(self._get_log_file(self._read_file_version))
Expand All @@ -121,9 +123,10 @@ def get_batch(self, size: int) -> Tuple[List[T], int]:
return [], ver
ret = [first]
for _ in range(0, size - 1):
obj, ver = self._get()
obj, next_ver = self._get()
if not obj:
break
ver = next_ver
ret.append(obj)
return ret, ver

Expand Down

0 comments on commit 36d221f

Please sign in to comment.