From 4eae0ab26fccd758c4b54b336204e51fd8ebda85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Jankowski?= Date: Tue, 21 Nov 2023 11:14:03 +0100 Subject: [PATCH] Added timestamp of operation put to disk queue (#1569) --- CHANGELOG.md | 6 ++ src/neptune/internal/disk_queue.py | 17 ++--- .../neptune/new/internal/test_disk_queue.py | 62 +++++++++++++------ 3 files changed, 59 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc0373823..b6f1fd4b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## [UNRELEASED] 1.8.5 + +### Changes +- Added timestamp of operation put to disk queue ([#1569](https://github.com/neptune-ai/neptune-client/pull/1569)) + + ## neptune 1.8.4 ### Changes diff --git a/src/neptune/internal/disk_queue.py b/src/neptune/internal/disk_queue.py index 1517117f9..3857ed663 100644 --- a/src/neptune/internal/disk_queue.py +++ b/src/neptune/internal/disk_queue.py @@ -23,6 +23,7 @@ from dataclasses import dataclass from glob import glob from pathlib import Path +from time import time from typing import ( Callable, Generic, @@ -38,6 +39,7 @@ from neptune.internal.utils.sync_offset_file import SyncOffsetFile T = TypeVar("T") +Timestamp = float _logger = logging.getLogger(__name__) @@ -47,6 +49,7 @@ class QueueElement(Generic[T]): obj: T ver: int size: int + at: Optional[Timestamp] = None class DiskQueue(Generic[T]): @@ -91,7 +94,7 @@ def __init__( def put(self, obj: T) -> int: version = self._last_put_file.read_local() + 1 - _json = json.dumps(self._serialize(obj, version)) + _json = json.dumps(self._serialize(obj=obj, version=version, at=time())) if self._file_size + len(_json) > self._max_file_size: old_writer = self._writer self._writer = open(self._get_log_file(version), "a") @@ -137,8 +140,8 @@ def _get(self) -> Optional[QueueElement[T]]: # It is safe. Max recursion level is 2. return self._get() try: - obj, ver = self._deserialize(_json) - return QueueElement[T](obj, ver, size) + obj, ver, at = self._deserialize(_json) + return QueueElement[T](obj, ver, size, at) except Exception as e: raise MalformedOperation from e @@ -237,11 +240,11 @@ def _next_log_file_version(self, version: int) -> int: return log_versions[i + 1] raise ValueError("Missing log file with version > {}".format(version)) - def _serialize(self, obj: T, version: int) -> dict: - return {"obj": self._to_dict(obj), "version": version} + def _serialize(self, obj: T, version: int, at: Optional[Timestamp] = None) -> dict: + return {"obj": self._to_dict(obj), "version": version, "at": at} - def _deserialize(self, data: dict) -> Tuple[T, int]: - return self._from_dict(data["obj"]), data["version"] + def _deserialize(self, data: dict) -> Tuple[T, int, Optional[Timestamp]]: + return self._from_dict(data["obj"]), data["version"], data.get("at") def __enter__(self): return self diff --git a/tests/unit/neptune/new/internal/test_disk_queue.py b/tests/unit/neptune/new/internal/test_disk_queue.py index 74bcb73ec..6b8a81aa0 100644 --- a/tests/unit/neptune/new/internal/test_disk_queue.py +++ b/tests/unit/neptune/new/internal/test_disk_queue.py @@ -20,6 +20,8 @@ from glob import glob from pathlib import Path from tempfile import TemporaryDirectory +from typing import Optional +from unittest.mock import patch from neptune.internal.disk_queue import ( DiskQueue, @@ -37,15 +39,21 @@ def __eq__(self, other): return isinstance(other, TestDiskQueue.Obj) and self.num == other.num and self.txt == other.txt @staticmethod - def get_obj_size_bytes(obj, version) -> int: - return len(json.dumps({"obj": obj.__dict__, "version": version})) + def get_obj_size_bytes(obj, version, at: Optional[int] = None) -> int: + return len(json.dumps({"obj": obj.__dict__, "version": version, "at": at})) @staticmethod - def get_queue_element(obj, version) -> QueueElement[Obj]: - obj_size = len(json.dumps({"obj": obj.__dict__, "version": version})) - return QueueElement(obj, version, obj_size) + def get_queue_element(obj, version, at: Optional[int] = None) -> QueueElement[Obj]: + obj_size = len(json.dumps({"obj": obj.__dict__, "version": version, "at": at})) + return QueueElement(obj, version, obj_size, at) + + @patch("neptune.internal.disk_queue.time") + def test_put(self, time_mock): + # given + time_mock.side_effect = [ + 1234, + ] - def test_put(self): with TemporaryDirectory() as dirpath: queue = DiskQueue[TestDiskQueue.Obj]( Path(dirpath), @@ -56,10 +64,14 @@ def test_put(self): obj = TestDiskQueue.Obj(5, "test") queue.put(obj) queue.flush() - self.assertEqual(queue.get(), self.get_queue_element(obj, 1)) + self.assertEqual(queue.get(), self.get_queue_element(obj, 1, 1234)) queue.close() - def test_multiple_files(self): + @patch("neptune.internal.disk_queue.time") + def test_multiple_files(self, time_mock): + # given + time_mock.side_effect = list(range(1234, 1234 + 100)) + with TemporaryDirectory() as dirpath: queue = DiskQueue[TestDiskQueue.Obj]( Path(dirpath), @@ -74,13 +86,17 @@ def test_multiple_files(self): queue.flush() for i in range(1, 101): obj = TestDiskQueue.Obj(i, str(i)) - self.assertEqual(queue.get(), self.get_queue_element(obj, i)) + self.assertEqual(queue.get(), self.get_queue_element(obj, i, 1234 + i - 1)) queue.close() self.assertTrue(queue._read_file_version > 90) self.assertTrue(queue._write_file_version > 90) self.assertTrue(len(glob(dirpath + "/data-*.log")) > 10) - def test_get_batch(self): + @patch("neptune.internal.disk_queue.time") + def test_get_batch(self, time_mock): + # given + time_mock.side_effect = list(range(1234, 1234 + 90)) + with TemporaryDirectory() as dirpath: queue = DiskQueue[TestDiskQueue.Obj]( Path(dirpath), @@ -95,23 +111,27 @@ def test_get_batch(self): queue.flush() self.assertEqual( queue.get_batch(25), - [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(1, 26)], + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i, 1234 + i - 1) for i in range(1, 26)], ) self.assertEqual( queue.get_batch(25), - [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(26, 51)], + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i, 1234 + i - 1) for i in range(26, 51)], ) self.assertEqual( queue.get_batch(25), - [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(51, 76)], + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i, 1234 + i - 1) for i in range(51, 76)], ) self.assertEqual( queue.get_batch(25), - [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(76, 91)], + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i, 1234 + i - 1) for i in range(76, 91)], ) queue.close() - def test_batch_limit(self): + @patch("neptune.internal.disk_queue.time") + def test_batch_limit(self, time_mock): + # given + time_mock.side_effect = [1234, 1235, 1236, 1237, 1238] + with TemporaryDirectory() as dirpath: obj_size = self.get_obj_size_bytes(TestDiskQueue.Obj(1, "1"), 1) queue = DiskQueue[TestDiskQueue.Obj]( @@ -129,16 +149,20 @@ def test_batch_limit(self): self.assertEqual( queue.get_batch(5), - [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1) for i in range(3)], + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1, 1234 + i) for i in range(3)], ) self.assertEqual( queue.get_batch(2), - [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1) for i in range(3, 5)], + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1, 1234 + i) for i in range(3, 5)], ) queue.close() - def test_resuming_queue(self): + @patch("neptune.internal.disk_queue.time") + def test_resuming_queue(self, time_mock): + # given + time_mock.side_effect = list(range(1234, 1234 + 500)) + with TemporaryDirectory() as dirpath: queue = DiskQueue[TestDiskQueue.Obj]( Path(dirpath), @@ -172,7 +196,7 @@ def test_resuming_queue(self): ) for i in range(version_to_ack + 1, 501): obj = TestDiskQueue.Obj(i, str(i)) - self.assertEqual(queue.get(), self.get_queue_element(obj, i)) + self.assertEqual(queue.get(), self.get_queue_element(obj, i, 1234 + i - 1)) queue.close()