From 66288486cc69a2cd52208a39fd1ba80608de9ced Mon Sep 17 00:00:00 2001 From: mike76-dev Date: Tue, 5 Dec 2023 15:44:15 +0100 Subject: [PATCH 1/5] Pass contentLength to the worker --- s3/backend.go | 2 +- s3/s3.go | 2 +- worker/client/client.go | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/s3/backend.go b/s3/backend.go index 9f6c8beef..e78ab9cfd 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -401,7 +401,7 @@ func (s *s3) CreateMultipartUpload(ctx context.Context, bucket, key string, meta } func (s *s3) UploadPart(ctx context.Context, bucket, object string, id gofakes3.UploadID, partNumber int, contentLength int64, input io.Reader) (*gofakes3.UploadPartResult, error) { - res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, api.UploadMultipartUploadPartOptions{ + res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, contentLength, api.UploadMultipartUploadPartOptions{ DisablePreshardingEncryption: true, }) if err != nil { diff --git a/s3/s3.go b/s3/s3.go index dc7ac664b..23ecc06c5 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -49,7 +49,7 @@ type bus interface { type worker interface { GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) - UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) + UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, contentLength int64, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) } func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) { diff --git a/worker/client/client.go b/worker/client/client.go index 048ce225c..fcb8f261f 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -174,7 +174,7 @@ func (c *Client) State() (state api.WorkerStateResponse, err error) { } // UploadMultipartUploadPart uploads part of the data for a multipart upload. -func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { +func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, contentLength int64, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { path = api.ObjectPathEscape(path) c.c.Custom("PUT", fmt.Sprintf("/multipart/%s", path), []byte{}, nil) @@ -194,6 +194,7 @@ func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, buc panic(err) } req.SetBasicAuth("", c.c.WithContext(ctx).Password) + req.Header.Add("X-Content-Length", fmt.Sprintf("%d", contentLength)) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err From a1c9518c2be7079705d02174ac8f1e34b210986f Mon Sep 17 00:00:00 2001 From: mike76-dev Date: Tue, 5 Dec 2023 15:58:22 +0100 Subject: [PATCH 2/5] Fix cluster_test.go --- internal/testing/cluster_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index ea5aee8b2..738eff79a 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -1982,7 +1982,7 @@ func TestMultipartUploads(t *testing.T) { // correctly. putPart := func(partNum int, offset int, data []byte) string { t.Helper() - res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), api.DefaultBucketName, objPath, mpr.UploadID, partNum, api.UploadMultipartUploadPartOptions{EncryptionOffset: offset}) + res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), api.DefaultBucketName, objPath, mpr.UploadID, partNum, int64(len(data)), api.UploadMultipartUploadPartOptions{EncryptionOffset: offset}) tt.OK(err) if res.ETag == "" { t.Fatal("expected non-empty ETag") @@ -2294,21 +2294,21 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { // upload a part that is a partial slab part3Data := bytes.Repeat([]byte{3}, int(slabSize)/4) - resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{ + resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, int64(len(part3Data)), api.UploadMultipartUploadPartOptions{ EncryptionOffset: int(slabSize + slabSize/4), }) tt.OK(err) // upload a part that is exactly a full slab part2Data := bytes.Repeat([]byte{2}, int(slabSize)) - resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{ + resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, int64(len(part2Data)), api.UploadMultipartUploadPartOptions{ EncryptionOffset: int(slabSize / 4), }) tt.OK(err) // upload another part the same size as the first one part1Data := bytes.Repeat([]byte{1}, int(slabSize)/4) - resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{ + resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, int64(len(part1Data)), api.UploadMultipartUploadPartOptions{ EncryptionOffset: 0, }) tt.OK(err) From f0ccab80661b5cddc6d0095e79c1c8cf87eee3d9 Mon Sep 17 00:00:00 2001 From: mike76-dev Date: Wed, 6 Dec 2023 10:51:39 +0100 Subject: [PATCH 3/5] Put contentLength in upload options --- api/object.go | 2 ++ internal/testing/cluster_test.go | 8 ++++---- s3/backend.go | 7 +++++-- s3/s3.go | 2 +- worker/client/client.go | 25 +++++++++++++++++++++++-- 5 files changed, 35 insertions(+), 9 deletions(-) diff --git a/api/object.go b/api/object.go index 7141f54f5..f84f5b826 100644 --- a/api/object.go +++ b/api/object.go @@ -196,11 +196,13 @@ type ( ContractSet string MimeType string DisablePreshardingEncryption bool + Size int64 } UploadMultipartUploadPartOptions struct { DisablePreshardingEncryption bool EncryptionOffset int + ContentLength int64 } ) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index 738eff79a..ea5aee8b2 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -1982,7 +1982,7 @@ func TestMultipartUploads(t *testing.T) { // correctly. putPart := func(partNum int, offset int, data []byte) string { t.Helper() - res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), api.DefaultBucketName, objPath, mpr.UploadID, partNum, int64(len(data)), api.UploadMultipartUploadPartOptions{EncryptionOffset: offset}) + res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), api.DefaultBucketName, objPath, mpr.UploadID, partNum, api.UploadMultipartUploadPartOptions{EncryptionOffset: offset}) tt.OK(err) if res.ETag == "" { t.Fatal("expected non-empty ETag") @@ -2294,21 +2294,21 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { // upload a part that is a partial slab part3Data := bytes.Repeat([]byte{3}, int(slabSize)/4) - resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, int64(len(part3Data)), api.UploadMultipartUploadPartOptions{ + resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{ EncryptionOffset: int(slabSize + slabSize/4), }) tt.OK(err) // upload a part that is exactly a full slab part2Data := bytes.Repeat([]byte{2}, int(slabSize)) - resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, int64(len(part2Data)), api.UploadMultipartUploadPartOptions{ + resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{ EncryptionOffset: int(slabSize / 4), }) tt.OK(err) // upload another part the same size as the first one part1Data := bytes.Repeat([]byte{1}, int(slabSize)/4) - resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, int64(len(part1Data)), api.UploadMultipartUploadPartOptions{ + resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{ EncryptionOffset: 0, }) tt.OK(err) diff --git a/s3/backend.go b/s3/backend.go index e78ab9cfd..00bfadccf 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -337,7 +337,9 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectName string) (g // TODO: Metadata is currently ignored. The backend requires an update to // support it. func (s *s3) PutObject(ctx context.Context, bucketName, key string, meta map[string]string, input io.Reader, size int64) (gofakes3.PutObjectResult, error) { - opts := api.UploadObjectOptions{} + opts := api.UploadObjectOptions{ + Size: size, + } if ct, ok := meta["Content-Type"]; ok { opts.MimeType = ct } @@ -401,8 +403,9 @@ func (s *s3) CreateMultipartUpload(ctx context.Context, bucket, key string, meta } func (s *s3) UploadPart(ctx context.Context, bucket, object string, id gofakes3.UploadID, partNumber int, contentLength int64, input io.Reader) (*gofakes3.UploadPartResult, error) { - res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, contentLength, api.UploadMultipartUploadPartOptions{ + res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, api.UploadMultipartUploadPartOptions{ DisablePreshardingEncryption: true, + ContentLength: contentLength, }) if err != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) diff --git a/s3/s3.go b/s3/s3.go index 23ecc06c5..dc7ac664b 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -49,7 +49,7 @@ type bus interface { type worker interface { GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) - UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, contentLength int64, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) + UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) } func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) { diff --git a/worker/client/client.go b/worker/client/client.go index fcb8f261f..711725021 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -174,7 +174,7 @@ func (c *Client) State() (state api.WorkerStateResponse, err error) { } // UploadMultipartUploadPart uploads part of the data for a multipart upload. -func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, contentLength int64, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { +func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { path = api.ObjectPathEscape(path) c.c.Custom("PUT", fmt.Sprintf("/multipart/%s", path), []byte{}, nil) @@ -194,7 +194,17 @@ func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, buc panic(err) } req.SetBasicAuth("", c.c.WithContext(ctx).Password) - req.Header.Add("X-Content-Length", fmt.Sprintf("%d", contentLength)) + if opts.ContentLength != 0 { + req.ContentLength = opts.ContentLength + } else { + if s, ok := r.(io.Seeker); ok { + length, err := s.Seek(0, io.SeekEnd) + if err == nil { + req.ContentLength = length + } + _, _ = s.Seek(0, io.SeekStart) + } + } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err @@ -226,6 +236,17 @@ func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, path str panic(err) } req.SetBasicAuth("", c.c.WithContext(ctx).Password) + if opts.Size != 0 { + req.ContentLength = opts.Size + } else { + if s, ok := r.(io.Seeker); ok { + length, err := s.Seek(0, io.SeekEnd) + if err == nil { + req.ContentLength = length + } + _, _ = s.Seek(0, io.SeekStart) + } + } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err From ca9c4a1c2ccdecb81b6a673bbb687a94ffa84ec8 Mon Sep 17 00:00:00 2001 From: mike76-dev Date: Wed, 6 Dec 2023 11:17:30 +0100 Subject: [PATCH 4/5] Remove redundant code --- worker/client/client.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/worker/client/client.go b/worker/client/client.go index 711725021..2c2bf6104 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -198,10 +198,7 @@ func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, buc req.ContentLength = opts.ContentLength } else { if s, ok := r.(io.Seeker); ok { - length, err := s.Seek(0, io.SeekEnd) - if err == nil { - req.ContentLength = length - } + _, _ = s.Seek(0, io.SeekEnd) _, _ = s.Seek(0, io.SeekStart) } } @@ -240,10 +237,7 @@ func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, path str req.ContentLength = opts.Size } else { if s, ok := r.(io.Seeker); ok { - length, err := s.Seek(0, io.SeekEnd) - if err == nil { - req.ContentLength = length - } + _, _ = s.Seek(0, io.SeekEnd) _, _ = s.Seek(0, io.SeekStart) } } From 6da810dc665b0d3cae43c27359d029c5def60944 Mon Sep 17 00:00:00 2001 From: mike76-dev Date: Wed, 6 Dec 2023 11:26:39 +0100 Subject: [PATCH 5/5] Address comments --- api/object.go | 2 +- s3/backend.go | 2 +- worker/client/client.go | 26 ++++++++++++++++++++------ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/api/object.go b/api/object.go index f84f5b826..80b24d341 100644 --- a/api/object.go +++ b/api/object.go @@ -196,7 +196,7 @@ type ( ContractSet string MimeType string DisablePreshardingEncryption bool - Size int64 + ContentLength int64 } UploadMultipartUploadPartOptions struct { diff --git a/s3/backend.go b/s3/backend.go index 00bfadccf..83f59c637 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -338,7 +338,7 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectName string) (g // support it. func (s *s3) PutObject(ctx context.Context, bucketName, key string, meta map[string]string, input io.Reader, size int64) (gofakes3.PutObjectResult, error) { opts := api.UploadObjectOptions{ - Size: size, + ContentLength: size, } if ct, ok := meta["Content-Type"]; ok { opts.MimeType = ct diff --git a/worker/client/client.go b/worker/client/client.go index 2c2bf6104..f275bcdaf 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -198,8 +198,15 @@ func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, buc req.ContentLength = opts.ContentLength } else { if s, ok := r.(io.Seeker); ok { - _, _ = s.Seek(0, io.SeekEnd) - _, _ = s.Seek(0, io.SeekStart) + length, err := s.Seek(0, io.SeekEnd) + if err != nil { + return nil, err + } + _, err = s.Seek(0, io.SeekStart) + if err != nil { + return nil, err + } + req.ContentLength = length } } resp, err := http.DefaultClient.Do(req) @@ -233,12 +240,19 @@ func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, path str panic(err) } req.SetBasicAuth("", c.c.WithContext(ctx).Password) - if opts.Size != 0 { - req.ContentLength = opts.Size + if opts.ContentLength != 0 { + req.ContentLength = opts.ContentLength } else { if s, ok := r.(io.Seeker); ok { - _, _ = s.Seek(0, io.SeekEnd) - _, _ = s.Seek(0, io.SeekStart) + length, err := s.Seek(0, io.SeekEnd) + if err != nil { + return nil, err + } + _, err = s.Seek(0, io.SeekStart) + if err != nil { + return nil, err + } + req.ContentLength = length } } resp, err := http.DefaultClient.Do(req)