Skip to content

Commit

Permalink
Added timestamp of operation put to disk queue (#1569)
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky authored Nov 21, 2023
1 parent 4c1aeec commit 4eae0ab
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 26 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## [UNRELEASED] 1.8.5

### Changes
- Added timestamp of operation put to disk queue ([#1569](https://github.com/neptune-ai/neptune-client/pull/1569))


## neptune 1.8.4

### Changes
Expand Down
17 changes: 10 additions & 7 deletions src/neptune/internal/disk_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dataclasses import dataclass
from glob import glob
from pathlib import Path
from time import time
from typing import (
Callable,
Generic,
Expand All @@ -38,6 +39,7 @@
from neptune.internal.utils.sync_offset_file import SyncOffsetFile

T = TypeVar("T")
Timestamp = float

_logger = logging.getLogger(__name__)

Expand All @@ -47,6 +49,7 @@ class QueueElement(Generic[T]):
obj: T
ver: int
size: int
at: Optional[Timestamp] = None


class DiskQueue(Generic[T]):
Expand Down Expand Up @@ -91,7 +94,7 @@ def __init__(

def put(self, obj: T) -> int:
version = self._last_put_file.read_local() + 1
_json = json.dumps(self._serialize(obj, version))
_json = json.dumps(self._serialize(obj=obj, version=version, at=time()))
if self._file_size + len(_json) > self._max_file_size:
old_writer = self._writer
self._writer = open(self._get_log_file(version), "a")
Expand Down Expand Up @@ -137,8 +140,8 @@ def _get(self) -> Optional[QueueElement[T]]:
# It is safe. Max recursion level is 2.
return self._get()
try:
obj, ver = self._deserialize(_json)
return QueueElement[T](obj, ver, size)
obj, ver, at = self._deserialize(_json)
return QueueElement[T](obj, ver, size, at)
except Exception as e:
raise MalformedOperation from e

Expand Down Expand Up @@ -237,11 +240,11 @@ def _next_log_file_version(self, version: int) -> int:
return log_versions[i + 1]
raise ValueError("Missing log file with version > {}".format(version))

def _serialize(self, obj: T, version: int) -> dict:
return {"obj": self._to_dict(obj), "version": version}
def _serialize(self, obj: T, version: int, at: Optional[Timestamp] = None) -> dict:
return {"obj": self._to_dict(obj), "version": version, "at": at}

def _deserialize(self, data: dict) -> Tuple[T, int]:
return self._from_dict(data["obj"]), data["version"]
def _deserialize(self, data: dict) -> Tuple[T, int, Optional[Timestamp]]:
return self._from_dict(data["obj"]), data["version"], data.get("at")

def __enter__(self):
return self
Expand Down
62 changes: 43 additions & 19 deletions tests/unit/neptune/new/internal/test_disk_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from glob import glob
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional
from unittest.mock import patch

from neptune.internal.disk_queue import (
DiskQueue,
Expand All @@ -37,15 +39,21 @@ def __eq__(self, other):
return isinstance(other, TestDiskQueue.Obj) and self.num == other.num and self.txt == other.txt

@staticmethod
def get_obj_size_bytes(obj, version) -> int:
return len(json.dumps({"obj": obj.__dict__, "version": version}))
def get_obj_size_bytes(obj, version, at: Optional[int] = None) -> int:
return len(json.dumps({"obj": obj.__dict__, "version": version, "at": at}))

@staticmethod
def get_queue_element(obj, version) -> QueueElement[Obj]:
obj_size = len(json.dumps({"obj": obj.__dict__, "version": version}))
return QueueElement(obj, version, obj_size)
def get_queue_element(obj, version, at: Optional[int] = None) -> QueueElement[Obj]:
obj_size = len(json.dumps({"obj": obj.__dict__, "version": version, "at": at}))
return QueueElement(obj, version, obj_size, at)

@patch("neptune.internal.disk_queue.time")
def test_put(self, time_mock):
# given
time_mock.side_effect = [
1234,
]

def test_put(self):
with TemporaryDirectory() as dirpath:
queue = DiskQueue[TestDiskQueue.Obj](
Path(dirpath),
Expand All @@ -56,10 +64,14 @@ def test_put(self):
obj = TestDiskQueue.Obj(5, "test")
queue.put(obj)
queue.flush()
self.assertEqual(queue.get(), self.get_queue_element(obj, 1))
self.assertEqual(queue.get(), self.get_queue_element(obj, 1, 1234))
queue.close()

def test_multiple_files(self):
@patch("neptune.internal.disk_queue.time")
def test_multiple_files(self, time_mock):
# given
time_mock.side_effect = list(range(1234, 1234 + 100))

with TemporaryDirectory() as dirpath:
queue = DiskQueue[TestDiskQueue.Obj](
Path(dirpath),
Expand All @@ -74,13 +86,17 @@ def test_multiple_files(self):
queue.flush()
for i in range(1, 101):
obj = TestDiskQueue.Obj(i, str(i))
self.assertEqual(queue.get(), self.get_queue_element(obj, i))
self.assertEqual(queue.get(), self.get_queue_element(obj, i, 1234 + i - 1))
queue.close()
self.assertTrue(queue._read_file_version > 90)
self.assertTrue(queue._write_file_version > 90)
self.assertTrue(len(glob(dirpath + "/data-*.log")) > 10)

def test_get_batch(self):
@patch("neptune.internal.disk_queue.time")
def test_get_batch(self, time_mock):
# given
time_mock.side_effect = list(range(1234, 1234 + 90))

with TemporaryDirectory() as dirpath:
queue = DiskQueue[TestDiskQueue.Obj](
Path(dirpath),
Expand All @@ -95,23 +111,27 @@ def test_get_batch(self):
queue.flush()
self.assertEqual(
queue.get_batch(25),
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(1, 26)],
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i, 1234 + i - 1) for i in range(1, 26)],
)
self.assertEqual(
queue.get_batch(25),
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(26, 51)],
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i, 1234 + i - 1) for i in range(26, 51)],
)
self.assertEqual(
queue.get_batch(25),
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(51, 76)],
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i, 1234 + i - 1) for i in range(51, 76)],
)
self.assertEqual(
queue.get_batch(25),
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(76, 91)],
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i, 1234 + i - 1) for i in range(76, 91)],
)
queue.close()

def test_batch_limit(self):
@patch("neptune.internal.disk_queue.time")
def test_batch_limit(self, time_mock):
# given
time_mock.side_effect = [1234, 1235, 1236, 1237, 1238]

with TemporaryDirectory() as dirpath:
obj_size = self.get_obj_size_bytes(TestDiskQueue.Obj(1, "1"), 1)
queue = DiskQueue[TestDiskQueue.Obj](
Expand All @@ -129,16 +149,20 @@ def test_batch_limit(self):

self.assertEqual(
queue.get_batch(5),
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1) for i in range(3)],
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1, 1234 + i) for i in range(3)],
)
self.assertEqual(
queue.get_batch(2),
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1) for i in range(3, 5)],
[self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1, 1234 + i) for i in range(3, 5)],
)

queue.close()

def test_resuming_queue(self):
@patch("neptune.internal.disk_queue.time")
def test_resuming_queue(self, time_mock):
# given
time_mock.side_effect = list(range(1234, 1234 + 500))

with TemporaryDirectory() as dirpath:
queue = DiskQueue[TestDiskQueue.Obj](
Path(dirpath),
Expand Down Expand Up @@ -172,7 +196,7 @@ def test_resuming_queue(self):
)
for i in range(version_to_ack + 1, 501):
obj = TestDiskQueue.Obj(i, str(i))
self.assertEqual(queue.get(), self.get_queue_element(obj, i))
self.assertEqual(queue.get(), self.get_queue_element(obj, i, 1234 + i - 1))

queue.close()

Expand Down

0 comments on commit 4eae0ab

Please sign in to comment.