From f0ccab80661b5cddc6d0095e79c1c8cf87eee3d9 Mon Sep 17 00:00:00 2001 From: mike76-dev Date: Wed, 6 Dec 2023 10:51:39 +0100 Subject: [PATCH] Put contentLength in upload options --- api/object.go | 2 ++ internal/testing/cluster_test.go | 8 ++++---- s3/backend.go | 7 +++++-- s3/s3.go | 2 +- worker/client/client.go | 25 +++++++++++++++++++++++-- 5 files changed, 35 insertions(+), 9 deletions(-) diff --git a/api/object.go b/api/object.go index 7141f54f5..f84f5b826 100644 --- a/api/object.go +++ b/api/object.go @@ -196,11 +196,13 @@ type ( ContractSet string MimeType string DisablePreshardingEncryption bool + Size int64 } UploadMultipartUploadPartOptions struct { DisablePreshardingEncryption bool EncryptionOffset int + ContentLength int64 } ) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index 738eff79a..ea5aee8b2 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -1982,7 +1982,7 @@ func TestMultipartUploads(t *testing.T) { // correctly. putPart := func(partNum int, offset int, data []byte) string { t.Helper() - res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), api.DefaultBucketName, objPath, mpr.UploadID, partNum, int64(len(data)), api.UploadMultipartUploadPartOptions{EncryptionOffset: offset}) + res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), api.DefaultBucketName, objPath, mpr.UploadID, partNum, api.UploadMultipartUploadPartOptions{EncryptionOffset: offset}) tt.OK(err) if res.ETag == "" { t.Fatal("expected non-empty ETag") @@ -2294,21 +2294,21 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { // upload a part that is a partial slab part3Data := bytes.Repeat([]byte{3}, int(slabSize)/4) - resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, int64(len(part3Data)), api.UploadMultipartUploadPartOptions{ + resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{ EncryptionOffset: int(slabSize + slabSize/4), }) tt.OK(err) // upload a part that is exactly a full slab part2Data := bytes.Repeat([]byte{2}, int(slabSize)) - resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, int64(len(part2Data)), api.UploadMultipartUploadPartOptions{ + resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{ EncryptionOffset: int(slabSize / 4), }) tt.OK(err) // upload another part the same size as the first one part1Data := bytes.Repeat([]byte{1}, int(slabSize)/4) - resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, int64(len(part1Data)), api.UploadMultipartUploadPartOptions{ + resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{ EncryptionOffset: 0, }) tt.OK(err) diff --git a/s3/backend.go b/s3/backend.go index e78ab9cfd..00bfadccf 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -337,7 +337,9 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectName string) (g // TODO: Metadata is currently ignored. The backend requires an update to // support it. func (s *s3) PutObject(ctx context.Context, bucketName, key string, meta map[string]string, input io.Reader, size int64) (gofakes3.PutObjectResult, error) { - opts := api.UploadObjectOptions{} + opts := api.UploadObjectOptions{ + Size: size, + } if ct, ok := meta["Content-Type"]; ok { opts.MimeType = ct } @@ -401,8 +403,9 @@ func (s *s3) CreateMultipartUpload(ctx context.Context, bucket, key string, meta } func (s *s3) UploadPart(ctx context.Context, bucket, object string, id gofakes3.UploadID, partNumber int, contentLength int64, input io.Reader) (*gofakes3.UploadPartResult, error) { - res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, contentLength, api.UploadMultipartUploadPartOptions{ + res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, api.UploadMultipartUploadPartOptions{ DisablePreshardingEncryption: true, + ContentLength: contentLength, }) if err != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) diff --git a/s3/s3.go b/s3/s3.go index 23ecc06c5..dc7ac664b 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -49,7 +49,7 @@ type bus interface { type worker interface { GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error) - UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, contentLength int64, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) + UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) } func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) { diff --git a/worker/client/client.go b/worker/client/client.go index fcb8f261f..711725021 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -174,7 +174,7 @@ func (c *Client) State() (state api.WorkerStateResponse, err error) { } // UploadMultipartUploadPart uploads part of the data for a multipart upload. -func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, contentLength int64, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { +func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) { path = api.ObjectPathEscape(path) c.c.Custom("PUT", fmt.Sprintf("/multipart/%s", path), []byte{}, nil) @@ -194,7 +194,17 @@ func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, buc panic(err) } req.SetBasicAuth("", c.c.WithContext(ctx).Password) - req.Header.Add("X-Content-Length", fmt.Sprintf("%d", contentLength)) + if opts.ContentLength != 0 { + req.ContentLength = opts.ContentLength + } else { + if s, ok := r.(io.Seeker); ok { + length, err := s.Seek(0, io.SeekEnd) + if err == nil { + req.ContentLength = length + } + _, _ = s.Seek(0, io.SeekStart) + } + } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err @@ -226,6 +236,17 @@ func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, path str panic(err) } req.SetBasicAuth("", c.c.WithContext(ctx).Password) + if opts.Size != 0 { + req.ContentLength = opts.Size + } else { + if s, ok := r.(io.Seeker); ok { + length, err := s.Seek(0, io.SeekEnd) + if err == nil { + req.ContentLength = length + } + _, _ = s.Seek(0, io.SeekStart) + } + } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err