Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use update instead of CreateInBatches in CompleteMultipartUpload #776

Merged
merged 11 commits into from
Nov 30, 2023
15 changes: 7 additions & 8 deletions api/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,13 @@ type (
}

MultipartAddPartRequest struct {
Bucket string `json:"bucket"`
ETag string `json:"eTag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
PartialSlabs []object.PartialSlab `json:"partialSlabs"`
PartNumber int `json:"partNumber"`
Slices []object.SlabSlice `json:"slices"`
Bucket string `json:"bucket"`
ETag string `json:"eTag"`
Path string `json:"path"`
ContractSet string `json:"contractSet"`
UploadID string `json:"uploadID"`
PartNumber int `json:"partNumber"`
Slices []object.SlabSlice `json:"slices"`
}

MultipartCompleteResponse struct {
Expand Down
6 changes: 6 additions & 0 deletions api/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
)

Expand Down Expand Up @@ -120,6 +121,11 @@ func (rs RedundancySettings) Redundancy() float64 {
return float64(rs.TotalShards) / float64(rs.MinShards)
}

// SlabSizeNoRedundancy returns the size of a slab without added redundancy.
func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 {
return uint64(rs.MinShards) * rhpv2.SectorSize
}

// Validate returns an error if the redundancy settings are not considered
// valid.
func (rs RedundancySettings) Validate() error {
Expand Down
4 changes: 2 additions & 2 deletions api/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type (

type (
AddPartialSlabResponse struct {
SlabBufferMaxSizeSoftReached bool `json:"slabBufferMaxSizeSoftReached"`
Slabs []object.PartialSlab `json:"slabs"`
SlabBufferMaxSizeSoftReached bool `json:"slabBufferMaxSizeSoftReached"`
Slabs []object.SlabSlice `json:"slabs"`
}

// MigrationSlabsRequest is the request type for the /slabs/migration endpoint.
Expand Down
6 changes: 3 additions & 3 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type (
UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object) error

AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error)
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error)
CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string) (api.MultipartCreateResponse, error)
MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error)
Expand All @@ -161,7 +161,7 @@ type (
PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)
SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error)

AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, bufferSize int64, err error)
AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, bufferSize int64, err error)
FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error)
Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error)
RefreshHealth(ctx context.Context) error
Expand Down Expand Up @@ -2138,7 +2138,7 @@ func (b *bus) multipartHandlerUploadPartPUT(jc jape.Context) {
jc.Error(errors.New("upload_id must be non-empty"), http.StatusBadRequest)
return
}
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs)
err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices)
if jc.Check("failed to upload part", err) != nil {
return
}
Expand Down
17 changes: 8 additions & 9 deletions bus/client/multipart-upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ func (c *Client) AbortMultipartUpload(ctx context.Context, bucket, path string,
}

// AddMultipartPart adds a part to a multipart upload.
func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error) {
func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) {
err = c.c.WithContext(ctx).PUT("/multipart/part", api.MultipartAddPartRequest{
Bucket: bucket,
ETag: eTag,
Path: path,
ContractSet: contractSet,
UploadID: uploadID,
PartNumber: partNumber,
Slices: slices,
PartialSlabs: partialSlab,
Bucket: bucket,
ETag: eTag,
Path: path,
ContractSet: contractSet,
UploadID: uploadID,
PartNumber: partNumber,
Slices: slices,
})
return
}
Expand Down
2 changes: 1 addition & 1 deletion bus/client/slabs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

// AddPartialSlab adds a partial slab to the bus.
func (c *Client) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, slabBufferMaxSizeSoftReached bool, err error) {
func (c *Client) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, slabBufferMaxSizeSoftReached bool, err error) {
c.c.Custom("POST", "/slabs/partial", nil, &api.AddPartialSlabResponse{})
values := url.Values{}
values.Set("minShards", fmt.Sprint(minShards))
Expand Down
100 changes: 73 additions & 27 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@ import (
"lukechampine.com/frand"
)

const (
testEtag = "d34db33f"
testMimeType = "application/octet-stream"
)

// TestNewTestCluster is a test for creating a cluster of Nodes for testing,
// making sure that it forms contracts, renews contracts and shuts down.
func TestNewTestCluster(t *testing.T) {
cluster := newTestCluster(t, clusterOptsDefault)
defer cluster.Shutdown()
b := cluster.Bus
w := cluster.Worker
tt := cluster.tt

// Upload packing should be disabled by default.
Expand All @@ -52,27 +46,6 @@ func TestNewTestCluster(t *testing.T) {
t.Fatalf("expected upload packing to be disabled by default, got %v", ups.Enabled)
}

// Try talking to the bus API by adding an object.
err = b.AddObject(context.Background(), api.DefaultBucketName, "foo", testAutopilotConfig.Contracts.Set, object.Object{
Key: object.GenerateEncryptionKey(),
Slabs: []object.SlabSlice{
{
Slab: object.Slab{
Key: object.GenerateEncryptionKey(),
MinShards: 1,
Shards: []object.Sector{}, // slab without sectors
},
Offset: 0,
Length: 0,
},
},
}, api.AddObjectOptions{MimeType: testMimeType, ETag: testEtag})
tt.OK(err)

// Try talking to the worker and request the object.
err = w.DeleteObject(context.Background(), api.DefaultBucketName, "foo", api.DeleteObjectOptions{})
tt.OK(err)

// See if autopilot is running by triggering the loop.
_, err = cluster.Autopilot.Trigger(false)
tt.OK(err)
Expand Down Expand Up @@ -2294,3 +2267,76 @@ func TestBusRecordedMetrics(t *testing.T) {
t.Fatal("expected zero ListSpending")
}
}

func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

cluster := newTestCluster(t, testClusterOptions{
hosts: testRedundancySettings.TotalShards,
uploadPacking: true,
})
defer cluster.Shutdown()
defer cluster.Shutdown()
b := cluster.Bus
w := cluster.Worker
slabSize := testRedundancySettings.SlabSizeNoRedundancy()
tt := cluster.tt

// start a new multipart upload. We upload the parts in reverse order
objPath := "/foo"
mpr, err := b.CreateMultipartUpload(context.Background(), api.DefaultBucketName, objPath, api.CreateMultipartOptions{Key: object.GenerateEncryptionKey()})
tt.OK(err)
if mpr.UploadID == "" {
t.Fatal("expected non-empty upload ID")
}

// upload a part that is a partial slab
part3Data := bytes.Repeat([]byte{3}, int(slabSize)/4)
resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{
EncryptionOffset: int(slabSize + slabSize/4),
})
tt.OK(err)

// upload a part that is exactly a full slab
part2Data := bytes.Repeat([]byte{2}, int(slabSize))
resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{
EncryptionOffset: int(slabSize / 4),
})
tt.OK(err)

// upload another part the same size as the first one
part1Data := bytes.Repeat([]byte{1}, int(slabSize)/4)
resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{
EncryptionOffset: 0,
})
tt.OK(err)

// finish the upload
tt.OKAll(b.CompleteMultipartUpload(context.Background(), api.DefaultBucketName, objPath, mpr.UploadID, []api.MultipartCompletedPart{
{
PartNumber: 1,
ETag: resp1.ETag,
},
{
PartNumber: 2,
ETag: resp2.ETag,
},
{
PartNumber: 3,
ETag: resp3.ETag,
},
}))

// download the object and verify its integrity
dst := new(bytes.Buffer)
tt.OK(w.DownloadObject(context.Background(), dst, api.DefaultBucketName, objPath, api.DownloadObjectOptions{}))
expectedData := append(part1Data, append(part2Data, part3Data...)...)
receivedData := dst.Bytes()
if len(receivedData) != len(expectedData) {
t.Fatalf("expected %v bytes, got %v", len(expectedData), len(receivedData))
} else if !bytes.Equal(receivedData, expectedData) {
t.Fatal("unexpected data")
}
}
8 changes: 2 additions & 6 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ func GenerateEncryptionKey() EncryptionKey {

// An Object is a unit of data that has been stored on a host.
type Object struct {
Key EncryptionKey `json:"key"`
Slabs []SlabSlice `json:"slabs"`
PartialSlabs []PartialSlab `json:"partialSlab,omitempty"`
Key EncryptionKey `json:"key"`
Slabs []SlabSlice `json:"slabs"`
}

// NewObject returns a new Object with a random key.
Expand Down Expand Up @@ -146,9 +145,6 @@ func (o Object) TotalSize() int64 {
for _, ss := range o.Slabs {
n += int64(ss.Length)
}
for _, partialSlab := range o.PartialSlabs {
n += int64(partialSlab.Length)
}
return n
}

Expand Down
18 changes: 13 additions & 5 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ type Slab struct {
Health float64 `json:"health"`
Key EncryptionKey `json:"key"`
MinShards uint8 `json:"minShards"`
Shards []Sector `json:"shards"`
Shards []Sector `json:"shards,omitempty"`
}

type PartialSlab struct {
Key EncryptionKey `json:"key"`
Offset uint32 `json:"offset"`
Length uint32 `json:"length"`
func (s Slab) IsPartial() bool {
return len(s.Shards) == 0
}

// NewSlab returns a new slab for the shards.
Expand All @@ -42,6 +40,16 @@ func NewSlab(minShards uint8) Slab {
}
}

// NewPartialSlab returns a new partial slab.
func NewPartialSlab(ec EncryptionKey, minShards uint8) Slab {
return Slab{
Health: 1,
Key: ec,
MinShards: minShards,
Shards: nil,
}
}

// ContractsFromShards is a helper to extract all contracts used by a set of
// shards.
func ContractsFromShards(shards []Sector) map[types.PublicKey]map[types.FileContractID]struct{} {
Expand Down
Loading