Skip to content

Commit

Permalink
Merge pull request #743 from neptune-ai/hj/fix-400-on-retry
Browse files Browse the repository at this point in the history
Fix errors when uploading File and FileSet data
  • Loading branch information
Raalsky authored Oct 27, 2021
2 parents 6884882 + 7b86a5a commit 39784e6
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 28 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## neptune-client 0.13.1 [UNRELEASED]

### Fixes
- Fix issue with file upload retry buffer causing 400 bad requests ([#743](https://github.com/neptune-ai/neptune-client/pull/743))

## neptune-client 0.13.0

### Features
Expand Down
9 changes: 5 additions & 4 deletions neptune/internal/storage/datastream.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@
import os
import stat
import tarfile
from typing import AnyStr

from future.builtins import object

from neptune.internal.hardware.constants import BYTES_IN_ONE_MB


class FileChunk(object):
def __init__(self, data, start, end):
def __init__(self, data: AnyStr, start, end):
self.data = data
self.start = start
self.end = end

def get_data(self):
return io.BytesIO(self.data)
def get_data(self) -> AnyStr:
return self.data

def __eq__(self, other):
return self.__dict__ == other.__dict__
Expand Down Expand Up @@ -84,7 +85,7 @@ def close(self):
self.fobj.close()


def compress_to_tar_gz_in_memory(upload_entries):
def compress_to_tar_gz_in_memory(upload_entries) -> bytes:
f = io.BytesIO(b'')

with tarfile.TarFile.open(fileobj=f, mode='w|gz', dereference=True) as archive:
Expand Down
21 changes: 13 additions & 8 deletions neptune/new/internal/backends/hosted_file_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import os
import time
from io import BytesIO
from typing import List, Optional, Dict, Iterable, Callable, Set, Union
from typing import List, Optional, Dict, Iterable, Callable, Set, Union, AnyStr
from urllib.parse import urlencode

from bravado.client import SwaggerClient
Expand Down Expand Up @@ -91,7 +91,7 @@ def upload_file_set_attribute(swagger_client: SwaggerClient,
)
result = upload_raw_data(http_client=swagger_client.swagger_spec.http_client,
url=url,
data=BytesIO(data),
data=data,
headers={"Content-Type": "application/octet-stream"},
query_params={
"experimentId": run_id,
Expand Down Expand Up @@ -152,15 +152,20 @@ def _attribute_upload_response_handler(result: bytes) -> None:
raise InternalClientError("Unexpected response from server: {}".format(result))


def _upload_loop(file_chunk_stream: FileChunkStream, response_handler: Callable[[bytes], None], **kwargs):
for chunk in file_chunk_stream.generate():
result = _upload_loop_chunk(chunk, file_chunk_stream, **kwargs)
def _upload_loop(file_chunk_stream: FileChunkStream,
response_handler: Callable[[bytes], None],
query_params: dict,
**kwargs):
for iteration, chunk in enumerate(file_chunk_stream.generate()):
if 'reset' in query_params and iteration != 0:
query_params['reset'] = str(False)
result = _upload_loop_chunk(chunk, file_chunk_stream, query_params=query_params.copy(), **kwargs)
response_handler(result)

file_chunk_stream.close()


def _upload_loop_chunk(chunk: FileChunk, file_chunk_stream: FileChunkStream, **kwargs):
def _upload_loop_chunk(chunk: FileChunk, file_chunk_stream: FileChunkStream, query_params: dict, **kwargs):
if file_chunk_stream.length is not None:
binary_range = "bytes=%d-%d/%d" % (chunk.start, chunk.end - 1, file_chunk_stream.length)
else:
Expand All @@ -172,13 +177,13 @@ def _upload_loop_chunk(chunk: FileChunk, file_chunk_stream: FileChunkStream, **k
}
if file_chunk_stream.permissions is not None:
headers["X-File-Permissions"] = file_chunk_stream.permissions
return upload_raw_data(data=chunk.get_data(), headers=headers, **kwargs)
return upload_raw_data(data=chunk.get_data(), headers=headers, query_params=query_params, **kwargs)


@with_api_exceptions_handler
def upload_raw_data(http_client: RequestsClient,
url: str,
data,
data: AnyStr,
path_params: Optional[Dict[str, str]] = None,
query_params: Optional[Dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None):
Expand Down
52 changes: 36 additions & 16 deletions tests/neptune/new/internal/backends/test_hosted_file_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from tempfile import NamedTemporaryFile, TemporaryDirectory

import mock
from mock import MagicMock, patch
from mock import MagicMock, patch, call

from neptune.new.internal.backends.hosted_file_operations import upload_file_attribute, upload_file_set_attribute, \
download_file_attribute, _get_content_disposition_filename, _attribute_upload_response_handler, \
Expand Down Expand Up @@ -88,35 +88,55 @@ def test_upload_file_attribute_from_stream(self, upload_loop_mock):
})

@unittest.skipIf(IS_WINDOWS, "Windows behaves strangely")
@patch('neptune.new.internal.backends.hosted_file_operations._upload_loop')
@patch('neptune.new.internal.backends.hosted_file_operations._upload_loop_chunk')
@patch('neptune.new.internal.utils.glob', new=lambda path, recursive=False: [path.replace('*', 'file.txt')])
def test_upload_single_file_in_file_set_attribute(self, upload_loop_mock):
def test_upload_single_file_in_file_set_attribute(self, upload_loop_chunk_mock):
# given
exp_uuid = uuid.uuid4()
swagger_mock = self._get_swagger_mock()
upload_loop_mock.return_value = b'null'
upload_loop_chunk_mock.return_value = b'null'
chunk_size = 1024 * 1024

# when
with NamedTemporaryFile("w") as temp_file:
with open(temp_file.name, 'wb') as handler:
handler.write(os.urandom(2 * chunk_size))

upload_file_set_attribute(
swagger_client=swagger_mock,
run_id=exp_uuid,
run_id=str(exp_uuid),
attribute="some/attribute",
file_globs=[temp_file.name],
reset=True)

# then
upload_loop_mock.assert_called_once_with(
file_chunk_stream=mock.ANY,
response_handler=_attribute_upload_response_handler,
http_client=swagger_mock.swagger_spec.http_client,
url="https://ui.neptune.ai/uploadFileSetChunk",
query_params={
"experimentId": str(exp_uuid),
"attribute": "some/attribute",
"reset": "True",
"path": os.path.basename(temp_file.name)
})
upload_loop_chunk_mock.assert_has_calls([
call(
mock.ANY,
mock.ANY,
http_client=swagger_mock.swagger_spec.http_client,
query_params={
"experimentId": str(exp_uuid),
"attribute": "some/attribute",
"reset": "True",
"path": os.path.basename(temp_file.name)
},
url='https://ui.neptune.ai/uploadFileSetChunk'
),
call(
mock.ANY,
mock.ANY,
http_client=swagger_mock.swagger_spec.http_client,
query_params={
"experimentId": str(exp_uuid),
"attribute": "some/attribute",
"reset": "False",
"path": os.path.basename(temp_file.name)
},
url='https://ui.neptune.ai/uploadFileSetChunk'
)
])


@unittest.skipIf(IS_WINDOWS, "Windows behaves strangely")
@patch('neptune.new.internal.backends.hosted_file_operations.upload_raw_data')
Expand Down

0 comments on commit 39784e6

Please sign in to comment.