Skip to content

Commit

Permalink
Put contentLength in upload options
Browse files Browse the repository at this point in the history
  • Loading branch information
mike76-dev committed Dec 6, 2023
1 parent a1c9518 commit f0ccab8
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 9 deletions.
2 changes: 2 additions & 0 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,13 @@ type (
ContractSet string
MimeType string
DisablePreshardingEncryption bool
Size int64
}

UploadMultipartUploadPartOptions struct {
DisablePreshardingEncryption bool
EncryptionOffset int
ContentLength int64
}
)

Expand Down
8 changes: 4 additions & 4 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,7 @@ func TestMultipartUploads(t *testing.T) {
// correctly.
putPart := func(partNum int, offset int, data []byte) string {
t.Helper()
res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), api.DefaultBucketName, objPath, mpr.UploadID, partNum, int64(len(data)), api.UploadMultipartUploadPartOptions{EncryptionOffset: offset})
res, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(data), api.DefaultBucketName, objPath, mpr.UploadID, partNum, api.UploadMultipartUploadPartOptions{EncryptionOffset: offset})
tt.OK(err)
if res.ETag == "" {
t.Fatal("expected non-empty ETag")
Expand Down Expand Up @@ -2294,21 +2294,21 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) {

// upload a part that is a partial slab
part3Data := bytes.Repeat([]byte{3}, int(slabSize)/4)
resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, int64(len(part3Data)), api.UploadMultipartUploadPartOptions{
resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{
EncryptionOffset: int(slabSize + slabSize/4),
})
tt.OK(err)

// upload a part that is exactly a full slab
part2Data := bytes.Repeat([]byte{2}, int(slabSize))
resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, int64(len(part2Data)), api.UploadMultipartUploadPartOptions{
resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{
EncryptionOffset: int(slabSize / 4),
})
tt.OK(err)

// upload another part the same size as the first one
part1Data := bytes.Repeat([]byte{1}, int(slabSize)/4)
resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, int64(len(part1Data)), api.UploadMultipartUploadPartOptions{
resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{
EncryptionOffset: 0,
})
tt.OK(err)
Expand Down
7 changes: 5 additions & 2 deletions s3/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectName string) (g
// TODO: Metadata is currently ignored. The backend requires an update to
// support it.
func (s *s3) PutObject(ctx context.Context, bucketName, key string, meta map[string]string, input io.Reader, size int64) (gofakes3.PutObjectResult, error) {
opts := api.UploadObjectOptions{}
opts := api.UploadObjectOptions{
Size: size,
}
if ct, ok := meta["Content-Type"]; ok {
opts.MimeType = ct
}
Expand Down Expand Up @@ -401,8 +403,9 @@ func (s *s3) CreateMultipartUpload(ctx context.Context, bucket, key string, meta
}

func (s *s3) UploadPart(ctx context.Context, bucket, object string, id gofakes3.UploadID, partNumber int, contentLength int64, input io.Reader) (*gofakes3.UploadPartResult, error) {
res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, contentLength, api.UploadMultipartUploadPartOptions{
res, err := s.w.UploadMultipartUploadPart(ctx, input, bucket, object, string(id), partNumber, api.UploadMultipartUploadPartOptions{
DisablePreshardingEncryption: true,
ContentLength: contentLength,
})
if err != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type bus interface {
type worker interface {
GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error)
UploadObject(ctx context.Context, r io.Reader, bucket, path string, opts api.UploadObjectOptions) (*api.UploadObjectResponse, error)
UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, contentLength int64, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error)
UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error)
}

func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) {
Expand Down
25 changes: 23 additions & 2 deletions worker/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (c *Client) State() (state api.WorkerStateResponse, err error) {
}

// UploadMultipartUploadPart uploads part of the data for a multipart upload.
func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, contentLength int64, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) {
func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts api.UploadMultipartUploadPartOptions) (*api.UploadMultipartUploadPartResponse, error) {
path = api.ObjectPathEscape(path)
c.c.Custom("PUT", fmt.Sprintf("/multipart/%s", path), []byte{}, nil)

Expand All @@ -194,7 +194,17 @@ func (c *Client) UploadMultipartUploadPart(ctx context.Context, r io.Reader, buc
panic(err)
}
req.SetBasicAuth("", c.c.WithContext(ctx).Password)
req.Header.Add("X-Content-Length", fmt.Sprintf("%d", contentLength))
if opts.ContentLength != 0 {
req.ContentLength = opts.ContentLength
} else {
if s, ok := r.(io.Seeker); ok {
length, err := s.Seek(0, io.SeekEnd)
if err == nil {
req.ContentLength = length
}
_, _ = s.Seek(0, io.SeekStart)
}
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -226,6 +236,17 @@ func (c *Client) UploadObject(ctx context.Context, r io.Reader, bucket, path str
panic(err)
}
req.SetBasicAuth("", c.c.WithContext(ctx).Password)
if opts.Size != 0 {
req.ContentLength = opts.Size
} else {
if s, ok := r.(io.Seeker); ok {
length, err := s.Seek(0, io.SeekEnd)
if err == nil {
req.ContentLength = length
}
_, _ = s.Seek(0, io.SeekStart)
}
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
Expand Down

0 comments on commit f0ccab8

Please sign in to comment.