Skip to content

Commit

Permalink
Warn about large source uploading (#231)
Browse files Browse the repository at this point in the history
* Warn about large source uploading
  • Loading branch information
szymon-kuklewicz authored Apr 30, 2020
1 parent 895a830 commit 2e8543c
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 14 deletions.
4 changes: 2 additions & 2 deletions neptune/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def update_tags(self, experiment, tags_to_add, tags_to_delete):
pass

@abstractmethod
def upload_experiment_source(self, experiment, data):
def upload_experiment_source(self, experiment, data, progress_indicator):
pass

@abstractmethod
Expand Down Expand Up @@ -138,7 +138,7 @@ def send_hardware_metric_reports(self, experiment, metrics, metric_reports):
pass

@abstractmethod
def upload_experiment_output(self, experiment, data):
def upload_experiment_output(self, experiment, data, progress_indicator):
pass

@abstractmethod
Expand Down
1 change: 1 addition & 0 deletions neptune/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,7 @@ def _start(self,
upload_to_storage(upload_entries=upload_source_entries,
upload_api_fun=self._backend.upload_experiment_source,
upload_tar_api_fun=self._backend.extract_experiment_source,
warn_limit=100 * 1024 * 1024,
experiment=self)

self._execution_context.start(
Expand Down
9 changes: 6 additions & 3 deletions neptune/internal/backends/hosted_neptune_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,13 @@ def update_tags(self, experiment, tags_to_add, tags_to_delete):
else:
raise

def upload_experiment_source(self, experiment, data):
def upload_experiment_source(self, experiment, data, progress_indicator):
try:
# Api exception handling is done in _upload_loop
self._upload_loop(partial(self._upload_raw_data,
api_method=self.backend_swagger_client.api.uploadExperimentSource),
data=data,
progress_indicator=progress_indicator,
path_params={'experimentId': experiment.internal_id},
query_params={})
except HTTPError as e:
Expand Down Expand Up @@ -659,12 +660,13 @@ def send_hardware_metric_reports(self, experiment, metrics, metric_reports):
raise ExperimentNotFound(
experiment_short_id=experiment.id, project_qualified_name=experiment._project.full_id)

def upload_experiment_output(self, experiment, data):
def upload_experiment_output(self, experiment, data, progress_indicator):
try:
# Api exception handling is done in _upload_loop
self._upload_loop(partial(self._upload_raw_data,
api_method=self.backend_swagger_client.api.uploadExperimentOutput),
data=data,
progress_indicator=progress_indicator,
path_params={'experimentId': experiment.internal_id},
query_params={})
except HTTPError as e:
Expand Down Expand Up @@ -806,11 +808,12 @@ def _convert_channel_to_channel_with_last_value(self, channel):
)
)

def _upload_loop(self, fun, data, **kwargs):
def _upload_loop(self, fun, data, progress_indicator, **kwargs):
ret = None
for part in data.generate():
part_to_send = part.get_data()
ret = with_api_exceptions_handler(self._upload_loop_chunk)(fun, part, part_to_send, data, **kwargs)
progress_indicator.progress(part.end - part.start)

data.close()
return ret
Expand Down
4 changes: 2 additions & 2 deletions neptune/internal/backends/offline_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def update_experiment(self, experiment, properties):
def update_tags(self, experiment, tags_to_add, tags_to_delete):
pass

def upload_experiment_source(self, experiment, data):
def upload_experiment_source(self, experiment, data, progress_indicator):
pass

def extract_experiment_source(self, experiment, data):
Expand Down Expand Up @@ -116,7 +116,7 @@ def create_hardware_metric(self, experiment, metric):
def send_hardware_metric_reports(self, experiment, metrics, metric_reports):
pass

def upload_experiment_output(self, experiment, data):
def upload_experiment_output(self, experiment, data, progress_indicator):
pass

def extract_experiment_output(self, experiment, data):
Expand Down
78 changes: 73 additions & 5 deletions neptune/internal/storage/storage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import time
from pprint import pformat
from abc import ABCMeta, abstractmethod
import six

from neptune.internal.storage.datastream import compress_to_tar_gz_in_memory, FileChunkStream

_logger = logging.getLogger(__name__)


class UploadEntry(object):
def __init__(self, source_path, target_path):
Expand Down Expand Up @@ -102,6 +108,53 @@ def __repr__(self):
return self.to_str()


@six.add_metaclass(ABCMeta)
class ProgressIndicator(object):

@abstractmethod
def progress(self, steps):
pass

@abstractmethod
def complete(self):
pass


class LoggingProgressIndicator(ProgressIndicator):
def __init__(self, total, frequency=10):
self.current = 0
self.total = total
self.last_warning = time.time()
self.frequency = frequency
_logger.warning('You are sending %dMB of source code to Neptune. '
'It is pretty uncommon - please make sure it\'s what you wanted.',
self.total / (1024 * 1024))

def progress(self, steps):
self.current += steps
if time.time() - self.last_warning > self.frequency:
_logger.warning('%d MB / %d MB (%d%%) of source code was sent to Neptune.',
self.current / (1024 * 1024),
self.total / (1024 * 1024),
100 * self.current / self.total)
self.last_warning = time.time()

def complete(self):
_logger.warning('%d MB (100%%) of source code was sent to Neptune.',
self.total / (1024 * 1024))


class SilentProgressIndicator(ProgressIndicator):
def __init__(self):
pass

def progress(self, steps):
pass

def complete(self):
pass


def scan_unique_upload_entries(upload_entries):
"""
Returns upload entries for all files that could be found for given upload entries.
Expand All @@ -127,8 +180,9 @@ def split_upload_files(upload_entries, max_package_size=1 * 1024 * 1024, max_fil

for entry in upload_entries:
if entry.is_stream():
yield current_package
current_package.reset()
if current_package.len > 0:
yield current_package
current_package.reset()
current_package.update(entry, 0)
yield current_package
current_package.reset()
Expand All @@ -147,8 +201,19 @@ def normalize_file_name(name):
return name.replace(os.sep, '/')


def upload_to_storage(upload_entries, upload_api_fun, upload_tar_api_fun, **kwargs):
for package in split_upload_files(scan_unique_upload_entries(upload_entries)):
def upload_to_storage(upload_entries, upload_api_fun, upload_tar_api_fun, warn_limit=None, **kwargs):
unique_upload_entries = scan_unique_upload_entries(upload_entries)
progress_indicator = SilentProgressIndicator()

if warn_limit is not None:
total_size = 0
for entry in unique_upload_entries:
if not entry.is_stream():
total_size += os.path.getsize(entry.source_path)
if total_size >= warn_limit:
progress_indicator = LoggingProgressIndicator(total_size)

for package in split_upload_files(unique_upload_entries):
if package.is_empty():
continue

Expand All @@ -159,6 +224,9 @@ def upload_to_storage(upload_entries, upload_api_fun, upload_tar_api_fun, **kwar
if uploading_multiple_entries or creating_a_single_empty_dir:
data = compress_to_tar_gz_in_memory(upload_entries=package.items)
upload_tar_api_fun(**dict(kwargs, data=data))
progress_indicator.progress(package.size)
else:
file_chunk_stream = FileChunkStream(package.items[0])
upload_api_fun(**dict(kwargs, data=file_chunk_stream))
upload_api_fun(**dict(kwargs, data=file_chunk_stream, progress_indicator=progress_indicator))

progress_indicator.complete()
39 changes: 37 additions & 2 deletions tests/neptune/internal/storage/test_upload_storage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

import unittest

from mock import patch
from mock import patch, MagicMock

from neptune.internal.storage.storage_utils import UploadEntry, UploadPackage, split_upload_files
from neptune.internal.storage.storage_utils \
import UploadEntry, UploadPackage, split_upload_files, upload_to_storage


class TestUploadStorageUtils(unittest.TestCase):
Expand Down Expand Up @@ -52,3 +53,37 @@ def test_split_upload_files_should_not_generate_empty_packages(self, getsize):
expected.update(entry, size)
for package in split_upload_files([upload_entry], max_package_size=self.MAX_PACKAGE_SIZE):
self.assertFalse(package.is_empty())

@patch('io.open', new=MagicMock)
@patch('os.path.getsize', new=lambda path: 101 * 1024 * 1024)
@patch('neptune.internal.storage.storage_utils._logger.warning')
def test_upload_large_sources_should_generate_warning(self, warning):
# GIVEN
entry = UploadEntry("/tmp/mocked/file", "some_file")

# WHEN
upload_to_storage(upload_entries=[entry],
upload_api_fun=MagicMock(),
upload_tar_api_fun=MagicMock(),
warn_limit=100 * 1024 * 1024)

# THEN
warning.assert_any_call('You are sending %dMB of source code to Neptune. '
'It is pretty uncommon - please make sure it\'s what you wanted.', 101)
warning.assert_any_call('%d MB (100%%) of source code was sent to Neptune.', 101)

@patch('io.open', new=MagicMock)
@patch('os.path.getsize', new=lambda path: 99 * 1024 * 1024)
@patch('neptune.internal.storage.storage_utils._logger.warning')
def test_upload_small_sources_should_not_generate_warning(self, warning):
# GIVEN
entry = UploadEntry("/tmp/mocked/file", "some_file")

# WHEN
upload_to_storage(upload_entries=[entry],
upload_api_fun=MagicMock(),
upload_tar_api_fun=MagicMock(),
warn_limit=100 * 1024 * 1024)

# THEN
warning.assert_not_called()

0 comments on commit 2e8543c

Please sign in to comment.