Skip to content

Commit

Permalink
stores: add support for completing multipart upload; testing: extend …
Browse files Browse the repository at this point in the history
…test with listing parts
  • Loading branch information
ChrisSchinnerl committed Sep 13, 2023
1 parent f1b6477 commit 0e4e03e
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 63 deletions.
31 changes: 25 additions & 6 deletions api/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ type (
MultipartAbortResponse struct {
}
MultipartCompleteRequest struct {
Parts []MultipartCompletedPart
}
MultipartCompletedPart struct {
PartNumber int `json:"partNumber"`
ETag string `json:"etag"`
}
MultipartCompleteResponse struct {
}
Expand All @@ -513,23 +518,37 @@ type (
UsedContracts map[types.PublicKey]types.FileContractID `json:"usedContracts"`
}
MultipartListUploadsRequest struct {
Bucket string
Prefix string
KeyMarker string
UploadIDMarker string
Limit int
Bucket string `json:"bucket"`
Prefix string `json:"prefix"`
KeyMarker string `json:"keyMarker"`
UploadIDMarker string `json:"uploadIDMarker"`
Limit int `json:"limit"`
}
MultipartListUploadsResponse struct {
Uploads []MultipartListUploadItem `json:"uploads"`
}
MultipartListUploadItem struct {
Path string `json:"objectName"`
Path string `json:"path"`
UploadID string `json:"uploadID"`
CreatedAt time.Time `json:"createdAt"`
}
MultipartListPartsRequest struct {
Bucket string `json:"bucket"`
Path string `json:"path"`
UploadID string `json:"uploadID"`
PartNumberMarker int `json:"partNumberMarker"`
Limit int64 `json:"limit"`
}
MultipartListPartsResponse struct {
IsTruncated bool `json:"isTruncated"`
NextMarker int `json:"nextMarker"`
Parts []MultipartListPartItem `json:"parts"`
}
MultipartListPartItem struct {
PartNumber int `json:"partNumber"`
LastModified time.Time `json:"lastModified"`
ETag string `json:"etag"`
Size int64 `json:"size"`
}
)

Expand Down
23 changes: 22 additions & 1 deletion bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"time"

"github.com/SiaFoundation/gofakes3"
"go.sia.tech/core/consensus"
rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
Expand Down Expand Up @@ -123,6 +124,7 @@ type (
AddMultipartPart(ctx context.Context, bucket, path, contractSet, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab, etag string, usedContracts map[types.PublicKey]types.FileContractID) (err error)
CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error)
ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)

MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) error
PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)
Expand Down Expand Up @@ -1826,6 +1828,21 @@ func (b *bus) multipartHandlerUploadPartPUT(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}
if req.Bucket == "" {
req.Bucket = api.DefaultBucketName
} else if req.ContractSet == "" {
jc.Error(errors.New("contract_set must be non-empty"), http.StatusBadRequest)
return
} else if req.Etag == "" {
jc.Error(errors.New("etag must be non-empty"), http.StatusBadRequest)
return
} else if req.PartNumber <= 0 || req.PartNumber > gofakes3.MaxUploadPartNumber {
jc.Error(fmt.Errorf("part_number must be between 1 and %d", gofakes3.MaxUploadPartNumber), http.StatusBadRequest)
return
} else if req.UploadID == "" {
jc.Error(errors.New("upload_id must be non-empty"), http.StatusBadRequest)
return
}
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs, req.Etag, req.UsedContracts)
if jc.Check("failed to upload part", err) != nil {
return
Expand All @@ -1849,7 +1866,11 @@ func (b *bus) multipartHandlerListPartsPOST(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}
panic("not implemented")
resp, err := b.ms.ListMultipartUploadParts(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.PartNumberMarker, int64(req.Limit))
if jc.Check("failed to list multipart upload parts", err) != nil {
return
}
jc.Encode(resp)
}

// Handler returns an HTTP handler that serves the bus API.
Expand Down
11 changes: 11 additions & 0 deletions bus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,17 @@ func (c *Client) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMa
return
}

func (c *Client) ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, err error) {
err = c.c.WithContext(ctx).POST("/multipart/listparts", api.MultipartListPartsRequest{
Bucket: bucket,
Path: object,
UploadID: uploadID,
PartNumberMarker: marker,
Limit: limit,
}, &resp)
return
}

// NewClient returns a client that communicates with a renterd store server
// listening on the specified address.
func NewClient(addr, password string) *Client {
Expand Down
39 changes: 33 additions & 6 deletions internal/testing/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@ func TestS3MultipartUploads(t *testing.T) {
t.Fatal(err)
}

// Enable upload packing to speed up test.
err = cluster.Bus.UpdateSetting(context.Background(), api.SettingUploadPacking, api.UploadPackingSettings{
Enabled: true,
})
if err != nil {
t.Fatal(err)
}

// add hosts
if _, err := cluster.AddHostsBlocking(testRedundancySettings.TotalShards); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -362,16 +370,35 @@ func TestS3MultipartUploads(t *testing.T) {
t.Fatal("unexpected upload:", upload.UploadID, upload.Key)
}

// Add a part.
part, err := core.PutObjectPart(context.Background(), "multipart", "foo", uploadID, 1, bytes.NewReader([]byte("hello")), 5, minio.PutObjectPartOptions{})
// Add 3 parts out of order to make sure the object is reconstructed
// correctly.
putPart := func(partNum int, data []byte) {
t.Helper()
part, err := core.PutObjectPart(context.Background(), "multipart", "foo", uploadID, partNum, bytes.NewReader(data), int64(len(data)), minio.PutObjectPartOptions{})
if err != nil {
t.Fatal(err)
} else if part.ETag == "" {
t.Fatal("expected non-empty ETag")
}
}
putPart(2, []byte("world"))
putPart(1, []byte("hello"))
putPart(3, []byte("!"))

// List parts
lop, err := core.ListObjectParts(context.Background(), "multipart", "foo", uploadID, 0, 0)
if err != nil {
t.Fatal(err)
} else if part.ETag == "" {
t.Fatal("expected non-empty ETag")
} else if lop.Bucket != "multipart" || lop.Key != "foo" || lop.UploadID != uploadID || len(lop.ObjectParts) != 3 {
t.Fatal("unexpected response:", lop)
} else if part1 := lop.ObjectParts[0]; part1.PartNumber != 1 || part1.Size != 5 || part1.ETag == "" {
t.Fatal("unexpected part:", part1)

Check failure on line 395 in internal/testing/s3_test.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, 1.20)

Test go.sia.tech/renterd/internal/testing/TestS3MultipartUploads failed in 11.74s

s3_test.go:395: unexpected part: {1 2023-09-13 11:50:58.87 +0000 UTC a457becea951fb5c53cd312277fc31c18e93c99433478691d79959d1237264a4 177 }
} else if part2 := lop.ObjectParts[1]; part2.PartNumber != 2 || part2.Size != 5 || part2.ETag == "" {
t.Fatal("unexpected part:", part2)
} else if part3 := lop.ObjectParts[2]; part3.PartNumber != 3 || part3.Size != 1 || part3.ETag == "" {
t.Fatal("unexpected part:", part3)
}

// TODO: list parts

// TODO: complete upload

// TODO: download object
Expand Down
27 changes: 25 additions & 2 deletions s3/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (s *s3) CreateMultipartUpload(bucket, object string, meta map[string]string
}

func (s *s3) UploadPart(bucket, object string, id gofakes3.UploadID, partNumber int, contentLength int64, input io.Reader) (etag string, err error) {
etag, err = s.w.UploadPart(context.Background(), input, object, string(id), partNumber, api.UploadWithDisabledPreshardingEncryption())
etag, err = s.w.UploadMultipartUploadPart(context.Background(), input, object, string(id), partNumber, api.UploadWithDisabledPreshardingEncryption())
if err != nil {
return "", gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}
Expand Down Expand Up @@ -441,7 +441,30 @@ func (s *s3) ListMultipartUploads(bucket string, marker *gofakes3.UploadListMark
}

func (s *s3) ListParts(bucket, object string, uploadID gofakes3.UploadID, marker int, limit int64) (*gofakes3.ListMultipartUploadPartsResult, error) {
panic("not implemented")
resp, err := s.b.ListMultipartUploadParts(context.Background(), bucket, object, string(uploadID), marker, limit)
if err != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}
var parts []gofakes3.ListMultipartUploadPartItem
for _, part := range resp.Parts {
parts = append(parts, gofakes3.ListMultipartUploadPartItem{
PartNumber: part.PartNumber,
LastModified: gofakes3.NewContentTime(part.LastModified),
ETag: part.ETag,
Size: part.Size,
})
}

return &gofakes3.ListMultipartUploadPartsResult{
Bucket: bucket,
Key: object,
UploadID: uploadID,
PartNumberMarker: marker,
NextPartNumberMarker: resp.NextMarker,
MaxParts: limit,
IsTruncated: resp.IsTruncated,
Parts: parts,
}, nil
}

func (s *s3) AbortMultipartUpload(bucket, object string, id gofakes3.UploadID) error {
Expand Down
3 changes: 2 additions & 1 deletion s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ type bus interface {

CreateMultipartUpload(ctx context.Context, bucket, path string) (api.MultipartCreateResponse, error)
ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
ListMultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)

UploadParams(ctx context.Context) (api.UploadParams, error)
}

type worker interface {
UploadObject(ctx context.Context, r io.Reader, path string, opts ...api.UploadOption) (err error)
GetObject(ctx context.Context, path, bucket string, opts ...api.DownloadObjectOption) (api.GetObjectResponse, error)
UploadPart(ctx context.Context, r io.Reader, path, uploadID string, partNumber int, opts ...api.UploadOption) (etag string, err error)
UploadMultipartUploadPart(ctx context.Context, r io.Reader, path, uploadID string, partNumber int, opts ...api.UploadOption) (etag string, err error)
}

func (l *gofakes3Logger) Print(level gofakes3.LogLevel, v ...interface{}) {
Expand Down
Loading

0 comments on commit 0e4e03e

Please sign in to comment.