Skip to content

Commit

Permalink
Fix sending stdout/stderr on Windows (#222)
Browse files Browse the repository at this point in the history
This PR accounts for a low granularity in datetime.now() on Windows.
  • Loading branch information
pitercl authored Apr 1, 2020
1 parent 6bcfb4f commit 3e6ec0e
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 4 deletions.
27 changes: 25 additions & 2 deletions neptune/internal/streams/channel_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ class ChannelWriter(object):
__SPLIT_PATTERN = re.compile(r'[\n\r]{1,2}')

def __init__(self, experiment, channel_name, channel_namespace=ChannelNamespace.USER):
self._time_started = experiment.get_system_properties()['created']
self._experiment = experiment
self._channel_name = channel_name
self._channel_namespace = channel_namespace
self._data = None
self._x_offset = TimeOffsetGenerator(self._experiment.get_system_properties()['created'])

def write(self, data):
if self._data is None:
Expand All @@ -40,7 +40,7 @@ def write(self, data):
lines = self.__SPLIT_PATTERN.split(self._data)
for line in lines[:-1]:
value = ChannelValue(
x=(datetime.now(tz=self._time_started.tzinfo) - self._time_started).total_seconds() * 1000,
x=self._x_offset.next(),
y=dict(text_value=str(line)),
ts=None
)
Expand All @@ -53,3 +53,26 @@ def write(self, data):
)

self._data = lines[-1]


class TimeOffsetGenerator(object):
def __init__(self, start):
self._start = start
self._previous_millis_from_start = None

def next(self):
"""
This method returns the number of milliseconds from start.
It returns a float, with microsecond granularity.
Since on Windows, datetime.now() has actually a millisecond granularity,
we remember the last returned value and in case of a collision, we add a microsecond.
"""
millis_from_start = (datetime.now(tz=self._start.tzinfo) - self._start).total_seconds() * 1000
if self._previous_millis_from_start is not None and self._previous_millis_from_start >= millis_from_start:
microsecond = 0.001
self._previous_millis_from_start = self._previous_millis_from_start + microsecond
else:
self._previous_millis_from_start = millis_from_start

return self._previous_millis_from_start
49 changes: 47 additions & 2 deletions tests/neptune/test_channel_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import unittest
from datetime import datetime

from mock import MagicMock
import mock

from neptune.internal.streams.channel_writer import ChannelWriter

Expand All @@ -26,16 +26,61 @@ class TestChannelWriter(unittest.TestCase):

def test_write_data_to_channel_writer(self):
# given
experiment = MagicMock()
experiment = mock.MagicMock()
experiment.get_system_properties.return_value = {"created": datetime.now()}
channel_name = 'a channel name'
writer = ChannelWriter(experiment, channel_name)

# when
writer.write('some\ndata')

# then
# pylint: disable=protected-access
experiment._channels_values_sender.send.assert_called_once()

@mock.patch('neptune.internal.streams.channel_writer.datetime')
def test_write_data_with_low_resolution_datetime_now(self, dt):
# given
experiment = mock.MagicMock()
experiment.get_system_properties.return_value = {"created": datetime(2022, 2, 2, 2, 2, 2, 2)}
channel_name = 'a channel name'
writer = ChannelWriter(experiment, channel_name)

# and
dt.now.return_value = datetime(2022, 2, 2, 2, 2, 2, 3)

# when
writer.write('text1\ntext2\n')

# then
# pylint: disable=protected-access
x_to_text = self._extract_x_to_text_from_calls(experiment._channels_values_sender.send.call_args_list)
self.assertEqual(x_to_text, {0.001: 'text1', 0.002: 'text2'})

@mock.patch('neptune.internal.streams.channel_writer.datetime')
def test_write_data_with_high_resolution_datetime_now(self, dt):
# given
experiment = mock.MagicMock()
experiment.get_system_properties.return_value = {"created": datetime(2022, 2, 2, 2, 2, 2, 2)}
channel_name = 'a channel name'
writer = ChannelWriter(experiment, channel_name)

# when
dt.now.return_value = datetime(2022, 2, 2, 2, 2, 2, 4)
writer.write('text1\n')
dt.now.return_value = datetime(2022, 2, 2, 2, 2, 2, 5)
writer.write('text2\n')

# then
# pylint: disable=protected-access
x_to_text = self._extract_x_to_text_from_calls(experiment._channels_values_sender.send.call_args_list)
self.assertEqual(x_to_text, {0.002: 'text1', 0.003: 'text2'})

@staticmethod
def _extract_x_to_text_from_calls(calls):
channel_values = [kwargs['channel_value'] for (_, kwargs) in calls]
return dict((v.x, v.y['text_value']) for v in channel_values)


if __name__ == '__main__':
unittest.main()

0 comments on commit 3e6ec0e

Please sign in to comment.