From 005b860bd0b2d1d75e074f983e14602962206641 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Nie=C5=BCurawski?= Date: Tue, 14 Dec 2021 16:16:36 +0100 Subject: [PATCH] Limit number of values per log operation (#780) * Limit number of values per log operation * Review fixes * Fix * Fix --- neptune/new/attributes/series/file_series.py | 9 +++---- neptune/new/attributes/series/float_series.py | 9 +++---- neptune/new/attributes/series/series.py | 17 ++++++------- .../new/attributes/series/string_series.py | 7 +++--- neptune/utils.py | 8 +++++++ .../attributes/series/test_float_series.py | 9 +++++++ .../new/attributes/series/test_series.py | 24 ------------------- .../attributes/series/test_string_series.py | 9 +++++++ 8 files changed, 49 insertions(+), 43 deletions(-) diff --git a/neptune/new/attributes/series/file_series.py b/neptune/new/attributes/series/file_series.py index 7db737d2d..ae30e7763 100644 --- a/neptune/new/attributes/series/file_series.py +++ b/neptune/new/attributes/series/file_series.py @@ -17,7 +17,7 @@ import os import pathlib -from typing import Optional, Iterable +from typing import List, Optional, Iterable from neptune.new.internal.utils import base64_encode @@ -32,15 +32,16 @@ Operation, ) from neptune.new.attributes.series.series import Series +from neptune.utils import split_to_chunks Val = FileSeriesVal Data = File class FileSeries(Series[Val, Data]): - def _get_log_operation_from_value( + def _get_log_operations_from_value( self, value: Val, step: Optional[float], timestamp: float - ) -> Operation: + ) -> List[Operation]: values = [ LogImages.ValueType( ImageValue( @@ -53,7 +54,7 @@ def _get_log_operation_from_value( ) for val in value.values ] - return LogImages(self._path, values) + return [LogImages(self._path, chunk) for chunk in split_to_chunks(values, 1)] def _get_clear_operation(self) -> Operation: return ClearImageLog(self._path) diff --git a/neptune/new/attributes/series/float_series.py b/neptune/new/attributes/series/float_series.py index 218837a98..80ee793ff 100644 --- a/neptune/new/attributes/series/float_series.py +++ b/neptune/new/attributes/series/float_series.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from typing import Union, Optional, Iterable +from typing import List, Union, Optional, Iterable import click @@ -30,6 +30,7 @@ ConfigFloatSeries, ) from neptune.new.attributes.series.series import Series +from neptune.utils import split_to_chunks Val = FloatSeriesVal Data = Union[float, int] @@ -51,13 +52,13 @@ def configure( with self._container.lock(): self._enqueue_operation(ConfigFloatSeries(self._path, min, max, unit), wait) - def _get_log_operation_from_value( + def _get_log_operations_from_value( self, value: Val, step: Optional[float], timestamp: float - ) -> Operation: + ) -> List[Operation]: values = [ LogFloats.ValueType(val, step=step, ts=timestamp) for val in value.values ] - return LogFloats(self._path, values) + return [LogFloats(self._path, chunk) for chunk in split_to_chunks(values, 100)] def _get_clear_operation(self) -> Operation: return ClearFloatLog(self._path) diff --git a/neptune/new/attributes/series/series.py b/neptune/new/attributes/series/series.py index 0ca3bd218..5db29db12 100644 --- a/neptune/new/attributes/series/series.py +++ b/neptune/new/attributes/series/series.py @@ -15,7 +15,7 @@ # import abc import time -from typing import Optional, TypeVar, Generic, Union, Iterable +from typing import List, Optional, TypeVar, Generic, Union, Iterable from neptune.new.internal.operation import Operation @@ -34,9 +34,9 @@ def clear(self, wait: bool = False) -> None: self._clear_impl(wait) @abc.abstractmethod - def _get_log_operation_from_value( + def _get_log_operations_from_value( self, value: Val, step: Optional[float], timestamp: float - ) -> Operation: + ) -> List[Operation]: pass # pylint: disable=unused-argument @@ -69,9 +69,9 @@ def assign(self, value, wait: bool = False) -> None: else: self._enqueue_operation(clear_op, wait=False) ts = time.time() - self._enqueue_operation( - self._get_log_operation_from_value(value, None, ts), wait=wait - ) + ops = self._get_log_operations_from_value(value, None, ts) + for op in ops: + self._enqueue_operation(op, wait=wait) def log( self, @@ -98,10 +98,11 @@ def log( if not timestamp: timestamp = time.time() - op = self._get_log_operation_from_value(value, step, timestamp) + ops = self._get_log_operations_from_value(value, step, timestamp) with self._container.lock(): - self._enqueue_operation(op, wait) + for op in ops: + self._enqueue_operation(op, wait) def _clear_impl(self, wait: bool = False) -> None: op = self._get_clear_operation() diff --git a/neptune/new/attributes/series/string_series.py b/neptune/new/attributes/series/string_series.py index 0f787106c..546ba145b 100644 --- a/neptune/new/attributes/series/string_series.py +++ b/neptune/new/attributes/series/string_series.py @@ -24,6 +24,7 @@ from neptune.new.internal.operation import LogStrings, ClearStringLog, Operation from neptune.new.attributes.series.series import Series +from neptune.utils import split_to_chunks if TYPE_CHECKING: from neptune.new.run import Run @@ -39,9 +40,9 @@ def __init__(self, container: "Run", path: List[str]): super().__init__(container, path) self._value_truncation_occurred = False - def _get_log_operation_from_value( + def _get_log_operations_from_value( self, value: Val, step: Optional[float], timestamp: float - ) -> Operation: + ) -> List[Operation]: values = [v[:MAX_STRING_SERIES_VALUE_LENGTH] for v in value.values] if not self._value_truncation_occurred and any( [len(v) > MAX_STRING_SERIES_VALUE_LENGTH for v in value.values] @@ -56,7 +57,7 @@ def _get_log_operation_from_value( ) values = [LogStrings.ValueType(val, step=step, ts=timestamp) for val in values] - return LogStrings(self._path, values) + return [LogStrings(self._path, chunk) for chunk in split_to_chunks(values, 10)] def _get_clear_operation(self) -> Operation: return ClearStringLog(self._path) diff --git a/neptune/utils.py b/neptune/utils.py index e2def0bea..0707b70d8 100644 --- a/neptune/utils.py +++ b/neptune/utils.py @@ -24,6 +24,7 @@ import sys import time from json.decoder import JSONDecodeError +from typing import Collection import click import numpy as np @@ -427,3 +428,10 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): pass + + +def split_to_chunks(collection: Collection, chunk_size: int): + chunked_list = list() + for i in range(0, len(collection), chunk_size): + chunked_list.append(collection[i : i + chunk_size]) + return chunked_list diff --git a/tests/neptune/new/attributes/series/test_float_series.py b/tests/neptune/new/attributes/series/test_float_series.py index 79875bff7..bbb6a9ef0 100644 --- a/tests/neptune/new/attributes/series/test_float_series.py +++ b/tests/neptune/new/attributes/series/test_float_series.py @@ -43,3 +43,12 @@ def test_get(self): var.log(5) var.log(34) self.assertEqual(34, var.fetch_last()) + + def test_log(self): + exp, path = self._create_run(), self._random_path() + var = FloatSeries(exp, path) + var.log([val for val in range(0, 5000)]) + self.assertEqual(4999, var.fetch_last()) + values = list(var.fetch_values()["value"].array) + expected = list(range(0, 5000)) + self.assertEqual(len(set(expected)), len(set(values))) diff --git a/tests/neptune/new/attributes/series/test_series.py b/tests/neptune/new/attributes/series/test_series.py index 886f6383b..1e6e0a659 100644 --- a/tests/neptune/new/attributes/series/test_series.py +++ b/tests/neptune/new/attributes/series/test_series.py @@ -15,20 +15,17 @@ # # pylint: disable=protected-access -from typing import Optional, Iterable from mock import MagicMock, call, patch from neptune.new.internal.operation import ( ClearFloatLog, LogFloats, - Operation, ConfigFloatSeries, ClearStringLog, ) from neptune.new.attributes.series.float_series import FloatSeries, FloatSeriesVal from neptune.new.attributes.series.string_series import StringSeries, StringSeriesVal -from neptune.new.attributes.series.series import Series from tests.neptune.new.attributes.test_attribute_base import TestAttributeBase @@ -74,7 +71,6 @@ def test_log(self): value_and_expected = [ (13, [LogFloats.ValueType(13, None, self._now())]), (15.3, [LogFloats.ValueType(15.3, None, self._now())]), - ([], []), ( [1, 9, 7], [ @@ -179,23 +175,3 @@ def test_clear(self): var = FloatSeries(exp, path) var.clear(wait=wait) processor.enqueue_operation.assert_called_once_with(ClearFloatLog(path), wait) - - class SeriesTestClass(Series[FloatSeriesVal, int]): - def _get_log_operation_from_value( - self, value: FloatSeriesVal, step: Optional[float], timestamp: float - ) -> Operation: - values = [ - LogFloats.ValueType(val, step=step, ts=timestamp) - for val in value.values - ] - return LogFloats(self._path, values) - - def _get_clear_operation(self) -> Operation: - return ClearFloatLog(self._path) - - # pylint: disable=unused-argument - def _data_to_value(self, values: Iterable, **kwargs) -> FloatSeriesVal: - return FloatSeriesVal(values) - - def _is_value_type(self, value) -> bool: - return isinstance(value, FloatSeriesVal) diff --git a/tests/neptune/new/attributes/series/test_string_series.py b/tests/neptune/new/attributes/series/test_string_series.py index 8ecc52e85..5767c3342 100644 --- a/tests/neptune/new/attributes/series/test_string_series.py +++ b/tests/neptune/new/attributes/series/test_string_series.py @@ -37,3 +37,12 @@ def test_get(self): var.log("asdfhadh") var.log("hej!") self.assertEqual("hej!", var.fetch_last()) + + def test_log(self): + exp, path = self._create_run(), self._random_path() + var = StringSeries(exp, path) + var.log([str(val) for val in range(0, 5000)]) + self.assertEqual("4999", var.fetch_last()) + values = list(var.fetch_values()["value"].array) + expected = list(range(0, 5000)) + self.assertEqual(len(set(expected)), len(set(values)))