diff --git a/neptune/experiments.py b/neptune/experiments.py index bcdd1f719..cbc0e099e 100644 --- a/neptune/experiments.py +++ b/neptune/experiments.py @@ -595,12 +595,21 @@ def log_artifact(self, artifact, destination=None): # save file under different name experiment.log_artifact('images/wrong_prediction_1.png', 'images/my_image_1.png') """ - if not os.path.exists(artifact): - raise FileNotFound(artifact) - - target_name = os.path.basename(artifact) if destination is None else destination + if isinstance(artifact, str): + if os.path.exists(artifact): + target_name = os.path.basename(artifact) if destination is None else destination + upload_entry = UploadEntry(os.path.abspath(artifact), normalize_file_name(target_name)) + else: + raise FileNotFound(artifact) + elif hasattr(artifact, 'read'): + if destination is not None: + upload_entry = UploadEntry(artifact, normalize_file_name(destination)) + else: + raise ValueError("destination is required for file streams") + else: + raise ValueError("artifact is a local path or an IO object") - upload_to_storage(upload_entries=[UploadEntry(os.path.abspath(artifact), normalize_file_name(target_name))], + upload_to_storage(upload_entries=[upload_entry], upload_api_fun=self._backend.upload_experiment_output, upload_tar_api_fun=self._backend.extract_experiment_output, experiment=self) diff --git a/neptune/internal/backends/hosted_neptune_backend.py b/neptune/internal/backends/hosted_neptune_backend.py index d6861bcc3..15f8bfd08 100644 --- a/neptune/internal/backends/hosted_neptune_backend.py +++ b/neptune/internal/backends/hosted_neptune_backend.py @@ -801,34 +801,28 @@ def _convert_channel_to_channel_with_last_value(self, channel): ) ) - def _upload_loop(self, fun, data, checksums=None, **kwargs): + def _upload_loop(self, fun, data, **kwargs): ret = None for part in data.generate(): - skip = False - if checksums and part.start in checksums: - skip = checksums[part.start].checksum == part.md5() + part_to_send = part.get_data() + ret = with_api_exceptions_handler(self._upload_loop_chunk)(fun, part, part_to_send, data, **kwargs) - if not skip: - part_to_send = part.get_data() - ret = with_api_exceptions_handler(self._upload_loop_chunk)(fun, part, part_to_send, data, **kwargs) - else: - part.skip() data.close() return ret def _upload_loop_chunk(self, fun, part, part_to_send, data, **kwargs): - if part.end: + if data.length is not None: binary_range = "bytes=%d-%d/%d" % (part.start, part.end - 1, data.length) else: - binary_range = "bytes=%d-/%d" % (part.start, data.length) - response = fun(data=part_to_send, - headers={ - "Content-Type": "application/octet-stream", - "Content-Filename": data.filename, - "X-Range": binary_range, - "X-File-Permissions": data.permissions - }, - **kwargs) + binary_range = "bytes=%d-%d" % (part.start, part.end - 1) + headers = { + "Content-Type": "application/octet-stream", + "Content-Filename": data.filename, + "X-Range": binary_range, + } + if data.permissions is not None: + headers["X-File-Permissions"] = data.permissions + response = fun(data=part_to_send, headers=headers, **kwargs) response.raise_for_status() return response diff --git a/neptune/internal/storage/datastream.py b/neptune/internal/storage/datastream.py index 1626123c1..c0def6c0f 100644 --- a/neptune/internal/storage/datastream.py +++ b/neptune/internal/storage/datastream.py @@ -14,7 +14,6 @@ # limitations under the License. # -import hashlib import io import os import stat @@ -26,33 +25,33 @@ class FileChunk(object): - def __init__(self, fobj, start, end): - self.fobj = fobj + def __init__(self, data, start, end): + self.data = data self.start = start self.end = end def get_data(self): - self.fobj.seek(self.start) - return io.BytesIO(self.fobj.read(self.end - self.start)) + if isinstance(self.data, str): + return io.StringIO(self.data) + else: + return io.BytesIO(self.data) - def skip(self): - pass - - def md5(self): - hash_md5 = hashlib.md5() - self.fobj.seek(self.start) - item = self.fobj.read(self.end - self.start) - hash_md5.update(item) - return hash_md5.hexdigest() + def __eq__(self, other): + return self.__dict__ == other.__dict__ class FileChunkStream(object): def __init__(self, upload_entry): self.filename = upload_entry.target_path - self.fobj = io.open(upload_entry.source_path, 'rb') - self.length = os.path.getsize(upload_entry.source_path) - self.permissions = self.permissions_to_unix_string(upload_entry.source_path) + if upload_entry.is_stream(): + self.fobj = upload_entry.source_path + self.length = None + self.permissions = '----------' + else: + self.fobj = io.open(upload_entry.source_path, 'rb') + self.length = os.path.getsize(upload_entry.source_path) + self.permissions = self.permissions_to_unix_string(upload_entry.source_path) @classmethod def permissions_to_unix_string(cls, path): @@ -70,14 +69,17 @@ def __eq__(self, fs): return False def generate(self, chunk_size=BYTES_IN_ONE_MB): - num_chunks = (self.length + chunk_size - 1) // chunk_size - if num_chunks == 0: - yield FileChunk(self.fobj, 0, 0) - - for i in range(num_chunks): - start = i * chunk_size - end = min(self.length, (i + 1) * chunk_size) - yield FileChunk(self.fobj, start, end) + last_offset = 0 + while True: + chunk = self.fobj.read(chunk_size) + if chunk: + new_offset = last_offset + len(chunk) + yield FileChunk(chunk, last_offset, new_offset) + last_offset = new_offset + else: + if last_offset == 0: + yield FileChunk(chunk, 0, 0) + break def close(self): self.fobj.close() diff --git a/neptune/internal/storage/storage_utils.py b/neptune/internal/storage/storage_utils.py index 7befc79dd..888be9e72 100644 --- a/neptune/internal/storage/storage_utils.py +++ b/neptune/internal/storage/storage_utils.py @@ -54,6 +54,9 @@ def __repr__(self): """ return self.to_str() + def is_stream(self): + return hasattr(self.source_path, 'read') + class UploadPackage(object): def __init__(self): @@ -107,14 +110,15 @@ def scan_unique_upload_entries(upload_entries): """ walked_entries = set() for entry in upload_entries: - if os.path.isdir(entry.source_path): + if entry.is_stream() or not os.path.isdir(entry.source_path): + walked_entries.add(entry) + else: for root, _, files in os.walk(entry.source_path): path_relative_to_entry_source = os.path.relpath(root, entry.source_path) target_root = os.path.normpath(os.path.join(entry.target_path, path_relative_to_entry_source)) for filename in files: walked_entries.add(UploadEntry(os.path.join(root, filename), os.path.join(target_root, filename))) - else: - walked_entries.add(entry) + return walked_entries @@ -122,13 +126,19 @@ def split_upload_files(upload_entries, max_package_size=1 * 1024 * 1024, max_fil current_package = UploadPackage() for entry in upload_entries: - size = os.path.getsize(entry.source_path) - - if (size + current_package.size > max_package_size or current_package.len > max_files) \ - and not current_package.is_empty(): + if entry.is_stream(): yield current_package current_package.reset() - current_package.update(entry, size) + current_package.update(entry, 0) + yield current_package + current_package.reset() + else: + size = os.path.getsize(entry.source_path) + if (size + current_package.size > max_package_size or current_package.len > max_files) \ + and not current_package.is_empty(): + yield current_package + current_package.reset() + current_package.update(entry, size) yield current_package @@ -140,10 +150,11 @@ def normalize_file_name(name): def upload_to_storage(upload_entries, upload_api_fun, upload_tar_api_fun, **kwargs): for package in split_upload_files(scan_unique_upload_entries(upload_entries)): if package.is_empty(): - break + continue uploading_multiple_entries = package.len > 1 - creating_a_single_empty_dir = package.len == 1 and os.path.isdir(package.items[0].source_path) + creating_a_single_empty_dir = package.len == 1 and not package.items[0].is_stream() \ + and os.path.isdir(package.items[0].source_path) if uploading_multiple_entries or creating_a_single_empty_dir: data = compress_to_tar_gz_in_memory(upload_entries=package.items) diff --git a/tests/neptune/internal/storage/test_datastream.py b/tests/neptune/internal/storage/test_datastream.py index 1a0c320f5..04f33bbb9 100644 --- a/tests/neptune/internal/storage/test_datastream.py +++ b/tests/neptune/internal/storage/test_datastream.py @@ -16,9 +16,11 @@ import unittest +from io import StringIO from mock import patch -from neptune.internal.storage.datastream import FileChunkStream +from neptune.internal.storage.datastream import FileChunkStream, FileChunk +from neptune.internal.storage.storage_utils import UploadEntry class TestFileChunkStream(unittest.TestCase): @@ -57,6 +59,24 @@ def test_permissions_to_unix_string_for_nonexistent_file(self): # then self.assertEqual('-' * 10, permissions_string) + def test_generate_chunks_from_stream(self): + # given + text = u"ABCDEFGHIJKLMNOPRSTUWXYZ" + + # when + stream = FileChunkStream(UploadEntry(StringIO(text), "some/path")) + chunks = list() + for chunk in stream.generate(chunk_size=10): + chunks.append(chunk) + + # then + self.assertEqual(stream.length, None) + self.assertEqual(chunks, [ + FileChunk(u"ABCDEFGHIJ", 0, 10), + FileChunk(u"KLMNOPRSTU", 10, 20), + FileChunk(u"WXYZ", 20, 24) + ]) + if __name__ == '__main__': unittest.main()