Skip to content

Commit

Permalink
Merge pull request #776 from SiaFoundation/chris/multipart-upload
Browse files Browse the repository at this point in the history
Use update instead of CreateInBatches in CompleteMultipartUpload
  • Loading branch information
ChrisSchinnerl authored Nov 30, 2023
2 parents e1efe33 + 89e6b72 commit 5a77190
Show file tree
Hide file tree
Showing 17 changed files with 195 additions and 196 deletions.
15 changes: 7 additions & 8 deletions api/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,13 @@ type (
}

MultipartAddPartRequest struct {
Bucket string `json:"bucket"`
ETag string `json:"eTag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
PartialSlabs []object.PartialSlab `json:"partialSlabs"`
PartNumber int `json:"partNumber"`
Slices []object.SlabSlice `json:"slices"`
Bucket string `json:"bucket"`
ETag string `json:"eTag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
PartNumber int `json:"partNumber"`
Slices []object.SlabSlice `json:"slices"`
}

MultipartCompleteResponse struct {
Expand Down
6 changes: 6 additions & 0 deletions api/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
)

Expand Down Expand Up @@ -120,6 +121,11 @@ func (rs RedundancySettings) Redundancy() float64 {
return float64(rs.TotalShards) / float64(rs.MinShards)
}

// SlabSizeNoRedundancy returns the size of a slab without added redundancy.
func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 {
return uint64(rs.MinShards) * rhpv2.SectorSize
}

// Validate returns an error if the redundancy settings are not considered
// valid.
func (rs RedundancySettings) Validate() error {
Expand Down
4 changes: 2 additions & 2 deletions api/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type (

type (
AddPartialSlabResponse struct {
SlabBufferMaxSizeSoftReached bool `json:"slabBufferMaxSizeSoftReached"`
Slabs []object.PartialSlab `json:"slabs"`
SlabBufferMaxSizeSoftReached bool `json:"slabBufferMaxSizeSoftReached"`
Slabs []object.SlabSlice `json:"slabs"`
}

// MigrationSlabsRequest is the request type for the /slabs/migration endpoint.
Expand Down
6 changes: 3 additions & 3 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type (
UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object) error

AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error)
CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error)
CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string) (api.MultipartCreateResponse, error)
MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error)
Expand All @@ -161,7 +161,7 @@ type (
PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)
SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error)

AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, bufferSize int64, err error)
AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, bufferSize int64, err error)
FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error)
Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error)
RefreshHealth(ctx context.Context) error
Expand Down Expand Up @@ -2150,7 +2150,7 @@ func (b *bus) multipartHandlerUploadPartPUT(jc jape.Context) {
jc.Error(errors.New("upload_id must be non-empty"), http.StatusBadRequest)
return
}
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs)
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices)
if jc.Check("failed to upload part", err) != nil {
return
}
Expand Down
17 changes: 8 additions & 9 deletions bus/client/multipart-upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ func (c *Client) AbortMultipartUpload(ctx context.Context, bucket, path string,
}

// AddMultipartPart adds a part to a multipart upload.
func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error) {
func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) {
err = c.c.WithContext(ctx).PUT("/multipart/part", api.MultipartAddPartRequest{
Bucket: bucket,
ETag: eTag,
Path: path,
ContractSet: contractSet,
UploadID: uploadID,
PartNumber: partNumber,
Slices: slices,
PartialSlabs: partialSlab,
Bucket: bucket,
ETag: eTag,
Path: path,
ContractSet: contractSet,
UploadID: uploadID,
PartNumber: partNumber,
Slices: slices,
})
return
}
Expand Down
2 changes: 1 addition & 1 deletion bus/client/slabs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

// AddPartialSlab adds a partial slab to the bus.
func (c *Client) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, slabBufferMaxSizeSoftReached bool, err error) {
func (c *Client) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, slabBufferMaxSizeSoftReached bool, err error) {
c.c.Custom("POST", "/slabs/partial", nil, &api.AddPartialSlabResponse{})
values := url.Values{}
values.Set("minShards", fmt.Sprint(minShards))
Expand Down
100 changes: 73 additions & 27 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@ import (
"lukechampine.com/frand"
)

const (
testEtag = "d34db33f"
testMimeType = "application/octet-stream"
)

// TestNewTestCluster is a test for creating a cluster of Nodes for testing,
// making sure that it forms contracts, renews contracts and shuts down.
func TestNewTestCluster(t *testing.T) {
cluster := newTestCluster(t, clusterOptsDefault)
defer cluster.Shutdown()
b := cluster.Bus
w := cluster.Worker
tt := cluster.tt

// Upload packing should be disabled by default.
Expand All @@ -52,27 +46,6 @@ func TestNewTestCluster(t *testing.T) {
t.Fatalf("expected upload packing to be disabled by default, got %v", ups.Enabled)
}

// Try talking to the bus API by adding an object.
err = b.AddObject(context.Background(), api.DefaultBucketName, "foo", testAutopilotConfig.Contracts.Set, object.Object{
Key: object.GenerateEncryptionKey(),
Slabs: []object.SlabSlice{
{
Slab: object.Slab{
Key: object.GenerateEncryptionKey(),
MinShards: 1,
Shards: []object.Sector{}, // slab without sectors
},
Offset: 0,
Length: 0,
},
},
}, api.AddObjectOptions{MimeType: testMimeType, ETag: testEtag})
tt.OK(err)

// Try talking to the worker and request the object.
err = w.DeleteObject(context.Background(), api.DefaultBucketName, "foo", api.DeleteObjectOptions{})
tt.OK(err)

// See if autopilot is running by triggering the loop.
_, err = cluster.Autopilot.Trigger(false)
tt.OK(err)
Expand Down Expand Up @@ -2294,3 +2267,76 @@ func TestBusRecordedMetrics(t *testing.T) {
t.Fatal("expected zero ListSpending")
}
}

func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

cluster := newTestCluster(t, testClusterOptions{
hosts: testRedundancySettings.TotalShards,
uploadPacking: true,
})
defer cluster.Shutdown()
defer cluster.Shutdown()
b := cluster.Bus
w := cluster.Worker
slabSize := testRedundancySettings.SlabSizeNoRedundancy()
tt := cluster.tt

// start a new multipart upload. We upload the parts in reverse order
objPath := "/foo"
mpr, err := b.CreateMultipartUpload(context.Background(), api.DefaultBucketName, objPath, api.CreateMultipartOptions{Key: object.GenerateEncryptionKey()})
tt.OK(err)
if mpr.UploadID == "" {
t.Fatal("expected non-empty upload ID")
}

// upload a part that is a partial slab
part3Data := bytes.Repeat([]byte{3}, int(slabSize)/4)
resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{
EncryptionOffset: int(slabSize + slabSize/4),
})
tt.OK(err)

// upload a part that is exactly a full slab
part2Data := bytes.Repeat([]byte{2}, int(slabSize))
resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{
EncryptionOffset: int(slabSize / 4),
})
tt.OK(err)

// upload another part the same size as the first one
part1Data := bytes.Repeat([]byte{1}, int(slabSize)/4)
resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{
EncryptionOffset: 0,
})
tt.OK(err)

// finish the upload
tt.OKAll(b.CompleteMultipartUpload(context.Background(), api.DefaultBucketName, objPath, mpr.UploadID, []api.MultipartCompletedPart{
{
PartNumber: 1,
ETag: resp1.ETag,
},
{
PartNumber: 2,
ETag: resp2.ETag,
},
{
PartNumber: 3,
ETag: resp3.ETag,
},
}))

// download the object and verify its integrity
dst := new(bytes.Buffer)
tt.OK(w.DownloadObject(context.Background(), dst, api.DefaultBucketName, objPath, api.DownloadObjectOptions{}))
expectedData := append(part1Data, append(part2Data, part3Data...)...)
receivedData := dst.Bytes()
if len(receivedData) != len(expectedData) {
t.Fatalf("expected %v bytes, got %v", len(expectedData), len(receivedData))
} else if !bytes.Equal(receivedData, expectedData) {
t.Fatal("unexpected data")
}
}
8 changes: 2 additions & 6 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ func GenerateEncryptionKey() EncryptionKey {

// An Object is a unit of data that has been stored on a host.
type Object struct {
Key EncryptionKey `json:"key"`
Slabs []SlabSlice `json:"slabs"`
PartialSlabs []PartialSlab `json:"partialSlab,omitempty"`
Key EncryptionKey `json:"key"`
Slabs []SlabSlice `json:"slabs"`
}

// NewObject returns a new Object with a random key.
Expand Down Expand Up @@ -146,9 +145,6 @@ func (o Object) TotalSize() int64 {
for _, ss := range o.Slabs {
n += int64(ss.Length)
}
for _, partialSlab := range o.PartialSlabs {
n += int64(partialSlab.Length)
}
return n
}

Expand Down
18 changes: 13 additions & 5 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ type Slab struct {
Health float64 `json:"health"`
Key EncryptionKey `json:"key"`
MinShards uint8 `json:"minShards"`
Shards []Sector `json:"shards"`
Shards []Sector `json:"shards,omitempty"`
}

type PartialSlab struct {
Key EncryptionKey `json:"key"`
Offset uint32 `json:"offset"`
Length uint32 `json:"length"`
func (s Slab) IsPartial() bool {
return len(s.Shards) == 0
}

// NewSlab returns a new slab for the shards.
Expand All @@ -42,6 +40,16 @@ func NewSlab(minShards uint8) Slab {
}
}

// NewPartialSlab returns a new partial slab.
func NewPartialSlab(ec EncryptionKey, minShards uint8) Slab {
return Slab{
Health: 1,
Key: ec,
MinShards: minShards,
Shards: nil,
}
}

// ContractsFromShards is a helper to extract all contracts used by a set of
// shards.
func ContractsFromShards(shards []Sector) map[types.PublicKey]map[types.FileContractID]struct{} {
Expand Down
Loading

0 comments on commit 5a77190

Please sign in to comment.