Skip to content

Commit

Permalink
Limit number of values per log operation (#780)
Browse files Browse the repository at this point in the history
* Limit number of values per log operation

* Review fixes

* Fix

* Fix
  • Loading branch information
aniezurawski authored Dec 14, 2021
1 parent 098f3a5 commit 005b860
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 43 deletions.
9 changes: 5 additions & 4 deletions neptune/new/attributes/series/file_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import os
import pathlib

from typing import Optional, Iterable
from typing import List, Optional, Iterable

from neptune.new.internal.utils import base64_encode

Expand All @@ -32,15 +32,16 @@
Operation,
)
from neptune.new.attributes.series.series import Series
from neptune.utils import split_to_chunks

Val = FileSeriesVal
Data = File


class FileSeries(Series[Val, Data]):
def _get_log_operation_from_value(
def _get_log_operations_from_value(
self, value: Val, step: Optional[float], timestamp: float
) -> Operation:
) -> List[Operation]:
values = [
LogImages.ValueType(
ImageValue(
Expand All @@ -53,7 +54,7 @@ def _get_log_operation_from_value(
)
for val in value.values
]
return LogImages(self._path, values)
return [LogImages(self._path, chunk) for chunk in split_to_chunks(values, 1)]

def _get_clear_operation(self) -> Operation:
return ClearImageLog(self._path)
Expand Down
9 changes: 5 additions & 4 deletions neptune/new/attributes/series/float_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Union, Optional, Iterable
from typing import List, Union, Optional, Iterable

import click

Expand All @@ -30,6 +30,7 @@
ConfigFloatSeries,
)
from neptune.new.attributes.series.series import Series
from neptune.utils import split_to_chunks

Val = FloatSeriesVal
Data = Union[float, int]
Expand All @@ -51,13 +52,13 @@ def configure(
with self._container.lock():
self._enqueue_operation(ConfigFloatSeries(self._path, min, max, unit), wait)

def _get_log_operation_from_value(
def _get_log_operations_from_value(
self, value: Val, step: Optional[float], timestamp: float
) -> Operation:
) -> List[Operation]:
values = [
LogFloats.ValueType(val, step=step, ts=timestamp) for val in value.values
]
return LogFloats(self._path, values)
return [LogFloats(self._path, chunk) for chunk in split_to_chunks(values, 100)]

def _get_clear_operation(self) -> Operation:
return ClearFloatLog(self._path)
Expand Down
17 changes: 9 additions & 8 deletions neptune/new/attributes/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#
import abc
import time
from typing import Optional, TypeVar, Generic, Union, Iterable
from typing import List, Optional, TypeVar, Generic, Union, Iterable

from neptune.new.internal.operation import Operation

Expand All @@ -34,9 +34,9 @@ def clear(self, wait: bool = False) -> None:
self._clear_impl(wait)

@abc.abstractmethod
def _get_log_operation_from_value(
def _get_log_operations_from_value(
self, value: Val, step: Optional[float], timestamp: float
) -> Operation:
) -> List[Operation]:
pass

# pylint: disable=unused-argument
Expand Down Expand Up @@ -69,9 +69,9 @@ def assign(self, value, wait: bool = False) -> None:
else:
self._enqueue_operation(clear_op, wait=False)
ts = time.time()
self._enqueue_operation(
self._get_log_operation_from_value(value, None, ts), wait=wait
)
ops = self._get_log_operations_from_value(value, None, ts)
for op in ops:
self._enqueue_operation(op, wait=wait)

def log(
self,
Expand All @@ -98,10 +98,11 @@ def log(
if not timestamp:
timestamp = time.time()

op = self._get_log_operation_from_value(value, step, timestamp)
ops = self._get_log_operations_from_value(value, step, timestamp)

with self._container.lock():
self._enqueue_operation(op, wait)
for op in ops:
self._enqueue_operation(op, wait)

def _clear_impl(self, wait: bool = False) -> None:
op = self._get_clear_operation()
Expand Down
7 changes: 4 additions & 3 deletions neptune/new/attributes/series/string_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from neptune.new.internal.operation import LogStrings, ClearStringLog, Operation
from neptune.new.attributes.series.series import Series
from neptune.utils import split_to_chunks

if TYPE_CHECKING:
from neptune.new.run import Run
Expand All @@ -39,9 +40,9 @@ def __init__(self, container: "Run", path: List[str]):
super().__init__(container, path)
self._value_truncation_occurred = False

def _get_log_operation_from_value(
def _get_log_operations_from_value(
self, value: Val, step: Optional[float], timestamp: float
) -> Operation:
) -> List[Operation]:
values = [v[:MAX_STRING_SERIES_VALUE_LENGTH] for v in value.values]
if not self._value_truncation_occurred and any(
[len(v) > MAX_STRING_SERIES_VALUE_LENGTH for v in value.values]
Expand All @@ -56,7 +57,7 @@ def _get_log_operation_from_value(
)

values = [LogStrings.ValueType(val, step=step, ts=timestamp) for val in values]
return LogStrings(self._path, values)
return [LogStrings(self._path, chunk) for chunk in split_to_chunks(values, 10)]

def _get_clear_operation(self) -> Operation:
return ClearStringLog(self._path)
Expand Down
8 changes: 8 additions & 0 deletions neptune/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import sys
import time
from json.decoder import JSONDecodeError
from typing import Collection

import click
import numpy as np
Expand Down Expand Up @@ -427,3 +428,10 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
pass


def split_to_chunks(collection: Collection, chunk_size: int):
chunked_list = list()
for i in range(0, len(collection), chunk_size):
chunked_list.append(collection[i : i + chunk_size])
return chunked_list
9 changes: 9 additions & 0 deletions tests/neptune/new/attributes/series/test_float_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,12 @@ def test_get(self):
var.log(5)
var.log(34)
self.assertEqual(34, var.fetch_last())

def test_log(self):
exp, path = self._create_run(), self._random_path()
var = FloatSeries(exp, path)
var.log([val for val in range(0, 5000)])
self.assertEqual(4999, var.fetch_last())
values = list(var.fetch_values()["value"].array)
expected = list(range(0, 5000))
self.assertEqual(len(set(expected)), len(set(values)))
24 changes: 0 additions & 24 deletions tests/neptune/new/attributes/series/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@
#

# pylint: disable=protected-access
from typing import Optional, Iterable

from mock import MagicMock, call, patch

from neptune.new.internal.operation import (
ClearFloatLog,
LogFloats,
Operation,
ConfigFloatSeries,
ClearStringLog,
)
from neptune.new.attributes.series.float_series import FloatSeries, FloatSeriesVal
from neptune.new.attributes.series.string_series import StringSeries, StringSeriesVal
from neptune.new.attributes.series.series import Series

from tests.neptune.new.attributes.test_attribute_base import TestAttributeBase

Expand Down Expand Up @@ -74,7 +71,6 @@ def test_log(self):
value_and_expected = [
(13, [LogFloats.ValueType(13, None, self._now())]),
(15.3, [LogFloats.ValueType(15.3, None, self._now())]),
([], []),
(
[1, 9, 7],
[
Expand Down Expand Up @@ -179,23 +175,3 @@ def test_clear(self):
var = FloatSeries(exp, path)
var.clear(wait=wait)
processor.enqueue_operation.assert_called_once_with(ClearFloatLog(path), wait)

class SeriesTestClass(Series[FloatSeriesVal, int]):
def _get_log_operation_from_value(
self, value: FloatSeriesVal, step: Optional[float], timestamp: float
) -> Operation:
values = [
LogFloats.ValueType(val, step=step, ts=timestamp)
for val in value.values
]
return LogFloats(self._path, values)

def _get_clear_operation(self) -> Operation:
return ClearFloatLog(self._path)

# pylint: disable=unused-argument
def _data_to_value(self, values: Iterable, **kwargs) -> FloatSeriesVal:
return FloatSeriesVal(values)

def _is_value_type(self, value) -> bool:
return isinstance(value, FloatSeriesVal)
9 changes: 9 additions & 0 deletions tests/neptune/new/attributes/series/test_string_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,12 @@ def test_get(self):
var.log("asdfhadh")
var.log("hej!")
self.assertEqual("hej!", var.fetch_last())

def test_log(self):
exp, path = self._create_run(), self._random_path()
var = StringSeries(exp, path)
var.log([str(val) for val in range(0, 5000)])
self.assertEqual("4999", var.fetch_last())
values = list(var.fetch_values()["value"].array)
expected = list(range(0, 5000))
self.assertEqual(len(set(expected)), len(set(values)))

0 comments on commit 005b860

Please sign in to comment.