diff --git a/api/multipart.go b/api/multipart.go index 5caebaeb4..a9953aec9 100644 --- a/api/multipart.go +++ b/api/multipart.go @@ -60,14 +60,13 @@ type ( } MultipartAddPartRequest struct { - Bucket string `json:"bucket"` - ETag string `json:"eTag"` - Path string `json:"path"` - ContractSet string `json:"contractSet"` - UploadID string `json:"uploadID"` - PartialSlabs []object.PartialSlab `json:"partialSlabs"` - PartNumber int `json:"partNumber"` - Slices []object.SlabSlice `json:"slices"` + Bucket string `json:"bucket"` + ETag string `json:"eTag"` + Path string `json:"path"` + ContractSet string `json:"contractSet"` + UploadID string `json:"uploadID"` + PartNumber int `json:"partNumber"` + Slices []object.SlabSlice `json:"slices"` } MultipartCompleteResponse struct { diff --git a/api/setting.go b/api/setting.go index 07bb605a5..d11089010 100644 --- a/api/setting.go +++ b/api/setting.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" ) @@ -120,6 +121,11 @@ func (rs RedundancySettings) Redundancy() float64 { return float64(rs.TotalShards) / float64(rs.MinShards) } +// SlabSizeNoRedundancy returns the size of a slab without added redundancy. +func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 { + return uint64(rs.MinShards) * rhpv2.SectorSize +} + // Validate returns an error if the redundancy settings are not considered // valid. func (rs RedundancySettings) Validate() error { diff --git a/api/slab.go b/api/slab.go index 56a5272eb..1a5d3fc79 100644 --- a/api/slab.go +++ b/api/slab.go @@ -34,8 +34,8 @@ type ( type ( AddPartialSlabResponse struct { - SlabBufferMaxSizeSoftReached bool `json:"slabBufferMaxSizeSoftReached"` - Slabs []object.PartialSlab `json:"slabs"` + SlabBufferMaxSizeSoftReached bool `json:"slabBufferMaxSizeSoftReached"` + Slabs []object.SlabSlice `json:"slabs"` } // MigrationSlabsRequest is the request type for the /slabs/migration endpoint. diff --git a/bus/bus.go b/bus/bus.go index d1ffb5e37..fb71e5a39 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -150,7 +150,7 @@ type ( UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object) error AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error) - AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error) + AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string) (api.MultipartCreateResponse, error) MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error) @@ -161,7 +161,7 @@ type ( PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error) SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error) - AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, bufferSize int64, err error) + AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, bufferSize int64, err error) FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error) Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error) RefreshHealth(ctx context.Context) error @@ -2138,7 +2138,7 @@ func (b *bus) multipartHandlerUploadPartPUT(jc jape.Context) { jc.Error(errors.New("upload_id must be non-empty"), http.StatusBadRequest) return } - err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs) + err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices) if jc.Check("failed to upload part", err) != nil { return } diff --git a/bus/client/multipart-upload.go b/bus/client/multipart-upload.go index b5f688327..e12c1c43d 100644 --- a/bus/client/multipart-upload.go +++ b/bus/client/multipart-upload.go @@ -19,16 +19,15 @@ func (c *Client) AbortMultipartUpload(ctx context.Context, bucket, path string, } // AddMultipartPart adds a part to a multipart upload. -func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error) { +func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) { err = c.c.WithContext(ctx).PUT("/multipart/part", api.MultipartAddPartRequest{ - Bucket: bucket, - ETag: eTag, - Path: path, - ContractSet: contractSet, - UploadID: uploadID, - PartNumber: partNumber, - Slices: slices, - PartialSlabs: partialSlab, + Bucket: bucket, + ETag: eTag, + Path: path, + ContractSet: contractSet, + UploadID: uploadID, + PartNumber: partNumber, + Slices: slices, }) return } diff --git a/bus/client/slabs.go b/bus/client/slabs.go index 962895c37..e407e0360 100644 --- a/bus/client/slabs.go +++ b/bus/client/slabs.go @@ -16,7 +16,7 @@ import ( ) // AddPartialSlab adds a partial slab to the bus. -func (c *Client) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, slabBufferMaxSizeSoftReached bool, err error) { +func (c *Client) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, slabBufferMaxSizeSoftReached bool, err error) { c.c.Custom("POST", "/slabs/partial", nil, &api.AddPartialSlabResponse{}) values := url.Values{} values.Set("minShards", fmt.Sprint(minShards)) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index ddf1da12d..aabe25a9f 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -31,18 +31,12 @@ import ( "lukechampine.com/frand" ) -const ( - testEtag = "d34db33f" - testMimeType = "application/octet-stream" -) - // TestNewTestCluster is a test for creating a cluster of Nodes for testing, // making sure that it forms contracts, renews contracts and shuts down. func TestNewTestCluster(t *testing.T) { cluster := newTestCluster(t, clusterOptsDefault) defer cluster.Shutdown() b := cluster.Bus - w := cluster.Worker tt := cluster.tt // Upload packing should be disabled by default. @@ -52,27 +46,6 @@ func TestNewTestCluster(t *testing.T) { t.Fatalf("expected upload packing to be disabled by default, got %v", ups.Enabled) } - // Try talking to the bus API by adding an object. - err = b.AddObject(context.Background(), api.DefaultBucketName, "foo", testAutopilotConfig.Contracts.Set, object.Object{ - Key: object.GenerateEncryptionKey(), - Slabs: []object.SlabSlice{ - { - Slab: object.Slab{ - Key: object.GenerateEncryptionKey(), - MinShards: 1, - Shards: []object.Sector{}, // slab without sectors - }, - Offset: 0, - Length: 0, - }, - }, - }, api.AddObjectOptions{MimeType: testMimeType, ETag: testEtag}) - tt.OK(err) - - // Try talking to the worker and request the object. - err = w.DeleteObject(context.Background(), api.DefaultBucketName, "foo", api.DeleteObjectOptions{}) - tt.OK(err) - // See if autopilot is running by triggering the loop. _, err = cluster.Autopilot.Trigger(false) tt.OK(err) @@ -2294,3 +2267,76 @@ func TestBusRecordedMetrics(t *testing.T) { t.Fatal("expected zero ListSpending") } } + +func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + cluster := newTestCluster(t, testClusterOptions{ + hosts: testRedundancySettings.TotalShards, + uploadPacking: true, + }) + defer cluster.Shutdown() + defer cluster.Shutdown() + b := cluster.Bus + w := cluster.Worker + slabSize := testRedundancySettings.SlabSizeNoRedundancy() + tt := cluster.tt + + // start a new multipart upload. We upload the parts in reverse order + objPath := "/foo" + mpr, err := b.CreateMultipartUpload(context.Background(), api.DefaultBucketName, objPath, api.CreateMultipartOptions{Key: object.GenerateEncryptionKey()}) + tt.OK(err) + if mpr.UploadID == "" { + t.Fatal("expected non-empty upload ID") + } + + // upload a part that is a partial slab + part3Data := bytes.Repeat([]byte{3}, int(slabSize)/4) + resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{ + EncryptionOffset: int(slabSize + slabSize/4), + }) + tt.OK(err) + + // upload a part that is exactly a full slab + part2Data := bytes.Repeat([]byte{2}, int(slabSize)) + resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{ + EncryptionOffset: int(slabSize / 4), + }) + tt.OK(err) + + // upload another part the same size as the first one + part1Data := bytes.Repeat([]byte{1}, int(slabSize)/4) + resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{ + EncryptionOffset: 0, + }) + tt.OK(err) + + // finish the upload + tt.OKAll(b.CompleteMultipartUpload(context.Background(), api.DefaultBucketName, objPath, mpr.UploadID, []api.MultipartCompletedPart{ + { + PartNumber: 1, + ETag: resp1.ETag, + }, + { + PartNumber: 2, + ETag: resp2.ETag, + }, + { + PartNumber: 3, + ETag: resp3.ETag, + }, + })) + + // download the object and verify its integrity + dst := new(bytes.Buffer) + tt.OK(w.DownloadObject(context.Background(), dst, api.DefaultBucketName, objPath, api.DownloadObjectOptions{})) + expectedData := append(part1Data, append(part2Data, part3Data...)...) + receivedData := dst.Bytes() + if len(receivedData) != len(expectedData) { + t.Fatalf("expected %v bytes, got %v", len(expectedData), len(receivedData)) + } else if !bytes.Equal(receivedData, expectedData) { + t.Fatal("unexpected data") + } +} diff --git a/object/object.go b/object/object.go index 78aebe293..7a4ebf000 100644 --- a/object/object.go +++ b/object/object.go @@ -111,9 +111,8 @@ func GenerateEncryptionKey() EncryptionKey { // An Object is a unit of data that has been stored on a host. type Object struct { - Key EncryptionKey `json:"key"` - Slabs []SlabSlice `json:"slabs"` - PartialSlabs []PartialSlab `json:"partialSlab,omitempty"` + Key EncryptionKey `json:"key"` + Slabs []SlabSlice `json:"slabs"` } // NewObject returns a new Object with a random key. @@ -146,9 +145,6 @@ func (o Object) TotalSize() int64 { for _, ss := range o.Slabs { n += int64(ss.Length) } - for _, partialSlab := range o.PartialSlabs { - n += int64(partialSlab.Length) - } return n } diff --git a/object/slab.go b/object/slab.go index 36b802b7c..9c3afa608 100644 --- a/object/slab.go +++ b/object/slab.go @@ -25,13 +25,11 @@ type Slab struct { Health float64 `json:"health"` Key EncryptionKey `json:"key"` MinShards uint8 `json:"minShards"` - Shards []Sector `json:"shards"` + Shards []Sector `json:"shards,omitempty"` } -type PartialSlab struct { - Key EncryptionKey `json:"key"` - Offset uint32 `json:"offset"` - Length uint32 `json:"length"` +func (s Slab) IsPartial() bool { + return len(s.Shards) == 0 } // NewSlab returns a new slab for the shards. @@ -42,6 +40,16 @@ func NewSlab(minShards uint8) Slab { } } +// NewPartialSlab returns a new partial slab. +func NewPartialSlab(ec EncryptionKey, minShards uint8) Slab { + return Slab{ + Health: 1, + Key: ec, + MinShards: minShards, + Shards: nil, + } +} + // ContractsFromShards is a helper to extract all contracts used by a set of // shards. func ContractsFromShards(shards []Sector) map[types.PublicKey]map[types.FileContractID]struct{} { diff --git a/stores/metadata.go b/stores/metadata.go index f44ad599b..180bda6ad 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -186,6 +186,7 @@ type ( rawObjectSector struct { // object ObjectID uint + ObjectIndex uint64 ObjectKey []byte ObjectName string ObjectSize int64 @@ -412,16 +413,13 @@ func (raw rawObject) convert() (api.Object, error) { // filter out slabs without slab ID and buffered slabs - this is expected // for an empty object or objects that end with a partial slab. var filtered rawObject - var partialSlabSectors []*rawObjectSector minHealth := math.MaxFloat64 - for i, sector := range raw { - if sector.SlabID != 0 && !sector.SlabBuffered { + for _, sector := range raw { + if sector.SlabID != 0 { filtered = append(filtered, sector) if sector.SlabHealth < minHealth { minHealth = sector.SlabHealth } - } else if sector.SlabBuffered { - partialSlabSectors = append(partialSlabSectors, &raw[i]) } } @@ -431,9 +429,6 @@ func (raw rawObject) convert() (api.Object, error) { var start int // create a helper function to add a slab and update the state addSlab := func(end int) error { - if filtered[start].SlabBuffered { - return nil // ignore partial slabs - } if slab, err := filtered[start:end].toSlabSlice(); err != nil { return err } else { @@ -445,12 +440,12 @@ func (raw rawObject) convert() (api.Object, error) { curr := filtered[0] for j, sector := range filtered { - if sector.SectorIndex == 0 { + if sector.ObjectIndex == 0 { + return api.Object{}, api.ErrObjectCorrupted + } else if sector.SectorIndex == 0 && !sector.SlabBuffered { return api.Object{}, api.ErrObjectCorrupted } - if sector.SlabID != curr.SlabID || - sector.SliceOffset != curr.SliceOffset || - sector.SliceLength != curr.SliceLength { + if sector.ObjectIndex != curr.ObjectIndex { if err := addSlab(j); err != nil { return api.Object{}, err } @@ -462,20 +457,6 @@ func (raw rawObject) convert() (api.Object, error) { } } - // fetch a potential partial slab from the buffer. - var partialSlabs []object.PartialSlab - for _, pss := range partialSlabSectors { - var key object.EncryptionKey - if err := key.UnmarshalBinary(pss.SlabKey); err != nil { - return api.Object{}, err - } - partialSlabs = append(partialSlabs, object.PartialSlab{ - Key: key, - Offset: pss.SliceOffset, - Length: pss.SliceLength, - }) - } - // return object return api.Object{ ObjectMetadata: api.ObjectMetadata{ @@ -487,9 +468,8 @@ func (raw rawObject) convert() (api.Object, error) { Size: raw[0].ObjectSize, }, Object: object.Object{ - Key: key, - PartialSlabs: partialSlabs, - Slabs: slabs, + Key: key, + Slabs: slabs, }, }, nil } @@ -497,6 +477,8 @@ func (raw rawObject) convert() (api.Object, error) { func (raw rawObject) toSlabSlice() (slice object.SlabSlice, _ error) { if len(raw) == 0 { return object.SlabSlice{}, errors.New("no sectors found") + } else if raw[0].SlabBuffered && len(raw) != 1 { + return object.SlabSlice{}, errors.New("buffered slab with multiple sectors") } // unmarshal key @@ -504,6 +486,15 @@ func (raw rawObject) toSlabSlice() (slice object.SlabSlice, _ error) { return object.SlabSlice{}, err } + // handle partial slab + if raw[0].SlabBuffered { + slice.Offset = raw[0].SliceOffset + slice.Length = raw[0].SliceLength + slice.Slab.MinShards = raw[0].SlabMinShards + slice.Slab.Health = raw[0].SlabHealth + return + } + // hydrate all sectors slabID := raw[0].SlabID sectors := make([]object.Sector, 0, len(raw)) @@ -1486,7 +1477,7 @@ func (s *SQLStore) FetchPartialSlab(ctx context.Context, ec object.EncryptionKey return s.slabBufferMgr.FetchPartialSlab(ctx, ec, offset, length) } -func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) ([]object.PartialSlab, int64, error) { +func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) ([]object.SlabSlice, int64, error) { var contractSetID uint if err := s.db.Raw("SELECT id FROM contract_sets WHERE name = ?", contractSet).Scan(&contractSetID).Error; err != nil { return nil, 0, err @@ -1712,7 +1703,7 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, } // Create all slices. This also creates any missing slabs or sectors. - if err := s.createSlices(tx, &obj.ID, nil, cs.ID, contracts, o.Slabs, o.PartialSlabs); err != nil { + if err := s.createSlices(tx, &obj.ID, nil, cs.ID, contracts, o.Slabs); err != nil { return fmt.Errorf("failed to create slices: %w", err) } return nil @@ -2004,7 +1995,7 @@ func (s *SQLStore) UnhealthySlabs(ctx context.Context, healthCutoff float64, set return slabs, nil } -func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.FileContractID]dbContract, slices []object.SlabSlice, partialSlabs []object.PartialSlab) error { +func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.FileContractID]dbContract, slices []object.SlabSlice) error { if (objID == nil && multiPartID == nil) || (objID != nil && multiPartID != nil) { return fmt.Errorf("either objID or multiPartID must be set") } @@ -2016,17 +2007,17 @@ func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractS return fmt.Errorf("failed to marshal slab key: %w", err) } slab := &dbSlab{ - Key: slabKey, - MinShards: ss.MinShards, - TotalShards: uint8(len(ss.Shards)), + Key: slabKey, + DBContractSetID: contractSetID, + MinShards: ss.MinShards, + TotalShards: uint8(len(ss.Shards)), } err = tx.Where(dbSlab{Key: slabKey}). - Assign(dbSlab{ - DBContractSetID: contractSetID, - }). FirstOrCreate(&slab).Error if err != nil { return fmt.Errorf("failed to create slab %v/%v: %w", i+1, len(slices), err) + } else if slab.DBContractSetID != contractSetID { + return fmt.Errorf("slab already exists in another contract set %v != %v", slab.DBContractSetID, contractSetID) } // Create Slice. @@ -2074,37 +2065,6 @@ func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractS } } } - - // Handle partial slabs. We create a slice for each partial slab. - if len(partialSlabs) == 0 { - return nil - } - - for i, partialSlab := range partialSlabs { - key, err := partialSlab.Key.MarshalBinary() - if err != nil { - return err - } - var buffer dbBufferedSlab - err = tx.Joins("DBSlab"). - Take(&buffer, "DBSlab.key = ?", key). - Error - if err != nil { - return fmt.Errorf("failed to fetch buffered slab: %w", err) - } - - err = tx.Create(&dbSlice{ - DBObjectID: objID, - ObjectIndex: uint(len(slices) + i + 1), - DBMultipartPartID: multiPartID, - DBSlabID: buffer.DBSlab.ID, - Offset: partialSlab.Offset, - Length: partialSlab.Length, - }).Error - if err != nil { - return fmt.Errorf("failed to create slice for partial slab: %w", err) - } - } return nil } @@ -2116,7 +2076,7 @@ func (s *SQLStore) object(ctx context.Context, txn *gorm.DB, bucket string, path // accordingly var rows rawObject tx := s.db. - Select("o.id as ObjectID, o.health as ObjectHealth, o.key as ObjectKey, o.object_id as ObjectName, o.size as ObjectSize, o.mime_type as ObjectMimeType, o.created_at as ObjectModTime, o.etag as ObjectETag, sli.object_index, sli.offset as SliceOffset, sli.length as SliceLength, sla.id as SlabID, sla.health as SlabHealth, sla.key as SlabKey, sla.min_shards as SlabMinShards, bs.id IS NOT NULL AS SlabBuffered, sec.slab_index as SectorIndex, sec.root as SectorRoot, sec.latest_host as LatestHost, c.fcid as FCID, h.public_key as HostKey"). + Select("o.id as ObjectID, o.health as ObjectHealth, sli.object_index as ObjectIndex, o.key as ObjectKey, o.object_id as ObjectName, o.size as ObjectSize, o.mime_type as ObjectMimeType, o.created_at as ObjectModTime, o.etag as ObjectETag, sli.object_index, sli.offset as SliceOffset, sli.length as SliceLength, sla.id as SlabID, sla.health as SlabHealth, sla.key as SlabKey, sla.min_shards as SlabMinShards, bs.id IS NOT NULL AS SlabBuffered, sec.slab_index as SectorIndex, sec.root as SectorRoot, sec.latest_host as LatestHost, c.fcid as FCID, h.public_key as HostKey"). Model(&dbObject{}). Table("objects o"). Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id"). diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 5c8d74220..73b5a648d 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -2561,8 +2561,8 @@ func TestPartialSlab(t *testing.T) { assertBuffer(buffer1Name, 4, false, false) // Use the added partial slab to create an object. - testObject := func(partialSlabs []object.PartialSlab) object.Object { - return object.Object{ + testObject := func(partialSlabs []object.SlabSlice) object.Object { + obj := object.Object{ Key: object.GenerateEncryptionKey(), Slabs: []object.SlabSlice{ { @@ -2579,8 +2579,9 @@ func TestPartialSlab(t *testing.T) { Length: rhpv2.SectorSize, }, }, - PartialSlabs: slabs, } + obj.Slabs = append(obj.Slabs, partialSlabs...) + return obj } obj := testObject(slabs) err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "key", testContractSet, testETag, testMimeType, obj) @@ -3289,16 +3290,15 @@ func TestMarkSlabUploadedAfterRenew(t *testing.T) { // create a full buffered slab. completeSize := bufferedSlabSize(1) - ps, _, err := ss.AddPartialSlab(context.Background(), frand.Bytes(completeSize), 1, 1, testContractSet) + slabs, _, err := ss.AddPartialSlab(context.Background(), frand.Bytes(completeSize), 1, 1, testContractSet) if err != nil { t.Fatal(err) } // add it to an object to prevent it from getting pruned. err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, "", "", object.Object{ - Key: object.GenerateEncryptionKey(), - Slabs: nil, - PartialSlabs: ps, + Key: object.GenerateEncryptionKey(), + Slabs: slabs, }) if err != nil { t.Fatal(err) diff --git a/stores/multipart.go b/stores/multipart.go index fb18680ff..681f456a3 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -85,7 +85,7 @@ func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path strin }, err } -func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlabs []object.PartialSlab) (err error) { +func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) { // collect all used contracts usedContracts := make(map[types.PublicKey]map[types.FileContractID]struct{}) for _, s := range slices { @@ -131,9 +131,6 @@ func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractS for _, slice := range slices { size += uint64(slice.Length) } - for _, ps := range partialSlabs { - size += uint64(ps.Length) - } // Create a new part. part := dbMultipartPart{ Etag: eTag, @@ -146,7 +143,7 @@ func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractS return fmt.Errorf("failed to create part: %w", err) } // Create the slices. - err = s.createSlices(tx, nil, &part.ID, cs.ID, contracts, slices, partialSlabs) + err = s.createSlices(tx, nil, &part.ID, cs.ID, contracts, slices) if err != nil { return fmt.Errorf("failed to create slices: %w", err) } @@ -399,19 +396,20 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str return fmt.Errorf("failed to create object: %w", err) } - // Assign the right object id and unassign the multipart upload. Also - // clear the ID to make sure new slices are created with IDs in - // ascending order. + // Assign the right object id and unassign the multipart upload. Also + // set the right object_index to make sure the slices are sorted + // correctly when retrieving the object later. for i := range slices { - slices[i].ID = 0 - slices[i].DBObjectID = &obj.ID - slices[i].ObjectIndex = uint(i + 1) - slices[i].DBMultipartPartID = nil - } - - // Save updated slices. - if err := tx.CreateInBatches(slices, 100).Error; err != nil { - return fmt.Errorf("failed to save slices: %w", err) + err = tx.Model(&dbSlice{}). + Where("id", slices[i].ID). + Updates(map[string]interface{}{ + "db_object_id": obj.ID, + "object_index": uint(i + 1), + "db_multipart_part_id": nil, + }).Error + if err != nil { + return fmt.Errorf("failed to update slice %v: %w", i, err) + } } // Delete the multipart upload. diff --git a/stores/multipart_test.go b/stores/multipart_test.go index 3db14ea3d..775c674e5 100644 --- a/stores/multipart_test.go +++ b/stores/multipart_test.go @@ -3,7 +3,6 @@ package stores import ( "context" "encoding/hex" - "fmt" "testing" "time" @@ -58,7 +57,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { t.Fatal(err) } etag := hex.EncodeToString(frand.Bytes(16)) - err = ss.AddMultipartPart(ctx, api.DefaultBucketName, objName, testContractSet, etag, resp.UploadID, i, []object.SlabSlice{}, partialSlabs) + err = ss.AddMultipartPart(ctx, api.DefaultBucketName, objName, testContractSet, etag, resp.UploadID, i, partialSlabs) if err != nil { t.Fatal(err) } @@ -132,10 +131,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { t.Fatalf("expected object size to be %v, got %v", totalSize, obj.Size) } else if obj.TotalSize() != totalSize { for _, f := range obj.Slabs { - fmt.Println("slice", f.Length) - } - for _, f := range obj.PartialSlabs { - fmt.Println("ps", f.Length) + t.Log("slice", f.Length, f.IsPartial()) } t.Fatalf("expected object total size to be %v, got %v", totalSize, obj.TotalSize()) } diff --git a/stores/slabbuffer.go b/stores/slabbuffer.go index 40e7c944f..d894ffbea 100644 --- a/stores/slabbuffer.go +++ b/stores/slabbuffer.go @@ -149,7 +149,7 @@ func (mgr *SlabBufferManager) Close() error { return errors.Join(errs...) } -func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet uint) ([]object.PartialSlab, int64, error) { +func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet uint) ([]object.SlabSlice, int64, error) { gid := bufferGID(minShards, totalShards, uint32(contractSet)) // Sanity check input. @@ -171,13 +171,13 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m // Find a buffer to use. We use at most 1 existing buffer + either 1 buffer // that can fit the remainder of the data or 1 new buffer to avoid splitting // the data over too many slabs. - var slab object.PartialSlab - var slabs []object.PartialSlab + var slab object.SlabSlice + var slabs []object.SlabSlice var err error var usedBuffers []*SlabBuffer for _, buffer := range buffers { var used bool - slab, data, used, err = buffer.recordAppend(data, len(usedBuffers) > 0) + slab, data, used, err = buffer.recordAppend(data, len(usedBuffers) > 0, minShards) if err != nil { return nil, 0, err } @@ -201,7 +201,7 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m return nil, 0, err } var used bool - slab, data, used, err = sb.recordAppend(data, true) + slab, data, used, err = sb.recordAppend(data, true, minShards) if err != nil { return nil, 0, err } @@ -432,19 +432,19 @@ func (buf *SlabBuffer) acquireForUpload(lockingDuration time.Duration) bool { return true } -func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool) (object.PartialSlab, []byte, bool, error) { +func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool, minShards uint8) (object.SlabSlice, []byte, bool, error) { buf.mu.Lock() defer buf.mu.Unlock() remainingSpace := buf.maxSize - buf.size if remainingSpace == 0 { - return object.PartialSlab{}, data, false, nil + return object.SlabSlice{}, data, false, nil } else if int64(len(data)) <= remainingSpace { _, err := buf.file.WriteAt(data, buf.size) if err != nil { - return object.PartialSlab{}, nil, true, err + return object.SlabSlice{}, nil, true, err } - slab := object.PartialSlab{ - Key: buf.slabKey, + slab := object.SlabSlice{ + Slab: object.NewPartialSlab(buf.slabKey, minShards), Offset: uint32(buf.size), Length: uint32(len(data)), } @@ -453,17 +453,17 @@ func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool) (object.PartialSl } else if !mustFit { _, err := buf.file.WriteAt(data[:remainingSpace], buf.size) if err != nil { - return object.PartialSlab{}, nil, true, err + return object.SlabSlice{}, nil, true, err } - slab := object.PartialSlab{ - Key: buf.slabKey, + slab := object.SlabSlice{ + Slab: object.NewPartialSlab(buf.slabKey, minShards), Offset: uint32(buf.size), Length: uint32(remainingSpace), } buf.size += remainingSpace return slab, data[remainingSpace:], true, nil } else { - return object.PartialSlab{}, data, false, nil + return object.SlabSlice{}, data, false, nil } } diff --git a/worker/download.go b/worker/download.go index 425bcbcd5..aa3f5e8ce 100644 --- a/worker/download.go +++ b/worker/download.go @@ -213,20 +213,7 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o for _, s := range o.Slabs { ss = append(ss, slabSlice{ SlabSlice: s, - PartialSlab: false, - }) - } - for _, ps := range o.PartialSlabs { - // add fake slab for partial slabs - ss = append(ss, slabSlice{ - SlabSlice: object.SlabSlice{ - Slab: object.Slab{ - Key: ps.Key, - }, - Offset: ps.Offset, - Length: ps.Length, - }, - PartialSlab: true, + PartialSlab: s.IsPartial(), }) } slabs := slabsForDownload(ss, offset, length) @@ -245,7 +232,7 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o } if slab != nil { slabs[i].SlabSlice.Slab = *slab - slabs[i].PartialSlab = false + slabs[i].PartialSlab = slab.IsPartial() } else { slabs[i].Data = data } diff --git a/worker/upload.go b/worker/upload.go index 258c4d2ab..62c9f6b08 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -261,10 +261,12 @@ func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, o // add partial slabs var bufferSizeLimitReached bool if len(partialSlabData) > 0 { - obj.PartialSlabs, bufferSizeLimitReached, err = w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) + var pss []object.SlabSlice + pss, bufferSizeLimitReached, err = w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) if err != nil { return "", err } + obj.Slabs = append(obj.Slabs, pss...) } // persist the object @@ -298,14 +300,16 @@ func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, bucket, path, // add parital slabs var bufferSizeLimitReached bool if len(partialSlabData) > 0 { - obj.PartialSlabs, bufferSizeLimitReached, err = w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) + var pss []object.SlabSlice + pss, bufferSizeLimitReached, err = w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) if err != nil { return "", err } + obj.Slabs = append(obj.Slabs, pss...) } // persist the part - err = w.bus.AddMultipartPart(ctx, bucket, path, up.contractSet, eTag, uploadID, partNumber, obj.Slabs, obj.PartialSlabs) + err = w.bus.AddMultipartPart(ctx, bucket, path, up.contractSet, eTag, uploadID, partNumber, obj.Slabs) if err != nil { return "", fmt.Errorf("couldn't add multi part: %w", err) } diff --git a/worker/worker.go b/worker/worker.go index 97d366c03..ed4a81b61 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -146,10 +146,10 @@ type Bus interface { AddObject(ctx context.Context, bucket, path, contractSet string, o object.Object, opts api.AddObjectOptions) error DeleteObject(ctx context.Context, bucket, path string, opts api.DeleteObjectOptions) error - AddMultipartPart(ctx context.Context, bucket, path, contractSet, ETag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlabs []object.PartialSlab) (err error) + AddMultipartPart(ctx context.Context, bucket, path, contractSet, ETag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, err error) - AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, slabBufferMaxSizeSoftReached bool, err error) + AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, slabBufferMaxSizeSoftReached bool, err error) FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error) Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error) @@ -1035,7 +1035,7 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { } // return early if the object is empty - if len(res.Object.Slabs) == 0 && len(res.Object.PartialSlabs) == 0 { + if len(res.Object.Slabs) == 0 { return }