diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cd296e0d..d259ee83c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## neptune-client 0.13.1 [UNRELEASED] + +### Fixes +- Fix issue with file upload retry buffer causing 400 bad requests ([#743](https://github.com/neptune-ai/neptune-client/pull/743)) + ## neptune-client 0.13.0 ### Features diff --git a/neptune/internal/storage/datastream.py b/neptune/internal/storage/datastream.py index 45c430cf8..e3047a303 100644 --- a/neptune/internal/storage/datastream.py +++ b/neptune/internal/storage/datastream.py @@ -18,6 +18,7 @@ import os import stat import tarfile +from typing import AnyStr from future.builtins import object @@ -25,13 +26,13 @@ class FileChunk(object): - def __init__(self, data, start, end): + def __init__(self, data: AnyStr, start, end): self.data = data self.start = start self.end = end - def get_data(self): - return io.BytesIO(self.data) + def get_data(self) -> AnyStr: + return self.data def __eq__(self, other): return self.__dict__ == other.__dict__ @@ -84,7 +85,7 @@ def close(self): self.fobj.close() -def compress_to_tar_gz_in_memory(upload_entries): +def compress_to_tar_gz_in_memory(upload_entries) -> bytes: f = io.BytesIO(b'') with tarfile.TarFile.open(fileobj=f, mode='w|gz', dereference=True) as archive: diff --git a/neptune/new/internal/backends/hosted_file_operations.py b/neptune/new/internal/backends/hosted_file_operations.py index 33e0881c6..c8e4341bf 100644 --- a/neptune/new/internal/backends/hosted_file_operations.py +++ b/neptune/new/internal/backends/hosted_file_operations.py @@ -17,7 +17,7 @@ import os import time from io import BytesIO -from typing import List, Optional, Dict, Iterable, Callable, Set, Union +from typing import List, Optional, Dict, Iterable, Callable, Set, Union, AnyStr from urllib.parse import urlencode from bravado.client import SwaggerClient @@ -91,7 +91,7 @@ def upload_file_set_attribute(swagger_client: SwaggerClient, ) result = upload_raw_data(http_client=swagger_client.swagger_spec.http_client, url=url, - data=BytesIO(data), + data=data, headers={"Content-Type": "application/octet-stream"}, query_params={ "experimentId": run_id, @@ -152,15 +152,20 @@ def _attribute_upload_response_handler(result: bytes) -> None: raise InternalClientError("Unexpected response from server: {}".format(result)) -def _upload_loop(file_chunk_stream: FileChunkStream, response_handler: Callable[[bytes], None], **kwargs): - for chunk in file_chunk_stream.generate(): - result = _upload_loop_chunk(chunk, file_chunk_stream, **kwargs) +def _upload_loop(file_chunk_stream: FileChunkStream, + response_handler: Callable[[bytes], None], + query_params: dict, + **kwargs): + for iteration, chunk in enumerate(file_chunk_stream.generate()): + if 'reset' in query_params and iteration != 0: + query_params['reset'] = str(False) + result = _upload_loop_chunk(chunk, file_chunk_stream, query_params=query_params.copy(), **kwargs) response_handler(result) file_chunk_stream.close() -def _upload_loop_chunk(chunk: FileChunk, file_chunk_stream: FileChunkStream, **kwargs): +def _upload_loop_chunk(chunk: FileChunk, file_chunk_stream: FileChunkStream, query_params: dict, **kwargs): if file_chunk_stream.length is not None: binary_range = "bytes=%d-%d/%d" % (chunk.start, chunk.end - 1, file_chunk_stream.length) else: @@ -172,13 +177,13 @@ def _upload_loop_chunk(chunk: FileChunk, file_chunk_stream: FileChunkStream, **k } if file_chunk_stream.permissions is not None: headers["X-File-Permissions"] = file_chunk_stream.permissions - return upload_raw_data(data=chunk.get_data(), headers=headers, **kwargs) + return upload_raw_data(data=chunk.get_data(), headers=headers, query_params=query_params, **kwargs) @with_api_exceptions_handler def upload_raw_data(http_client: RequestsClient, url: str, - data, + data: AnyStr, path_params: Optional[Dict[str, str]] = None, query_params: Optional[Dict[str, str]] = None, headers: Optional[Dict[str, str]] = None): diff --git a/tests/neptune/new/internal/backends/test_hosted_file_operations.py b/tests/neptune/new/internal/backends/test_hosted_file_operations.py index 90bb9e318..5b3ff7927 100644 --- a/tests/neptune/new/internal/backends/test_hosted_file_operations.py +++ b/tests/neptune/new/internal/backends/test_hosted_file_operations.py @@ -19,7 +19,7 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory import mock -from mock import MagicMock, patch +from mock import MagicMock, patch, call from neptune.new.internal.backends.hosted_file_operations import upload_file_attribute, upload_file_set_attribute, \ download_file_attribute, _get_content_disposition_filename, _attribute_upload_response_handler, \ @@ -88,35 +88,55 @@ def test_upload_file_attribute_from_stream(self, upload_loop_mock): }) @unittest.skipIf(IS_WINDOWS, "Windows behaves strangely") - @patch('neptune.new.internal.backends.hosted_file_operations._upload_loop') + @patch('neptune.new.internal.backends.hosted_file_operations._upload_loop_chunk') @patch('neptune.new.internal.utils.glob', new=lambda path, recursive=False: [path.replace('*', 'file.txt')]) - def test_upload_single_file_in_file_set_attribute(self, upload_loop_mock): + def test_upload_single_file_in_file_set_attribute(self, upload_loop_chunk_mock): # given exp_uuid = uuid.uuid4() swagger_mock = self._get_swagger_mock() - upload_loop_mock.return_value = b'null' + upload_loop_chunk_mock.return_value = b'null' + chunk_size = 1024 * 1024 # when with NamedTemporaryFile("w") as temp_file: + with open(temp_file.name, 'wb') as handler: + handler.write(os.urandom(2 * chunk_size)) + upload_file_set_attribute( swagger_client=swagger_mock, - run_id=exp_uuid, + run_id=str(exp_uuid), attribute="some/attribute", file_globs=[temp_file.name], reset=True) # then - upload_loop_mock.assert_called_once_with( - file_chunk_stream=mock.ANY, - response_handler=_attribute_upload_response_handler, - http_client=swagger_mock.swagger_spec.http_client, - url="https://ui.neptune.ai/uploadFileSetChunk", - query_params={ - "experimentId": str(exp_uuid), - "attribute": "some/attribute", - "reset": "True", - "path": os.path.basename(temp_file.name) - }) + upload_loop_chunk_mock.assert_has_calls([ + call( + mock.ANY, + mock.ANY, + http_client=swagger_mock.swagger_spec.http_client, + query_params={ + "experimentId": str(exp_uuid), + "attribute": "some/attribute", + "reset": "True", + "path": os.path.basename(temp_file.name) + }, + url='https://ui.neptune.ai/uploadFileSetChunk' + ), + call( + mock.ANY, + mock.ANY, + http_client=swagger_mock.swagger_spec.http_client, + query_params={ + "experimentId": str(exp_uuid), + "attribute": "some/attribute", + "reset": "False", + "path": os.path.basename(temp_file.name) + }, + url='https://ui.neptune.ai/uploadFileSetChunk' + ) + ]) + @unittest.skipIf(IS_WINDOWS, "Windows behaves strangely") @patch('neptune.new.internal.backends.hosted_file_operations.upload_raw_data')