From 51e4850849536221454ea844bc38f0f7a992bdcf Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 10:06:39 +0100 Subject: [PATCH 01/11] stores: use update instead of create in CompleteMultipartUpload --- stores/multipart.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/stores/multipart.go b/stores/multipart.go index fb18680ff..118297732 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -403,15 +403,16 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str // clear the ID to make sure new slices are created with IDs in // ascending order. for i := range slices { - slices[i].ID = 0 - slices[i].DBObjectID = &obj.ID - slices[i].ObjectIndex = uint(i + 1) - slices[i].DBMultipartPartID = nil - } - - // Save updated slices. - if err := tx.CreateInBatches(slices, 100).Error; err != nil { - return fmt.Errorf("failed to save slices: %w", err) + err = tx.Model(&dbSlice{}). + Where("id", slices[i].ID). + Updates(map[string]interface{}{ + "db_object_id": obj.ID, + "object_index": uint(i + 1), + "db_multipart_part_id": nil, + }).Error + if err != nil { + return fmt.Errorf("failed to update slice %v: %w", i, err) + } } // Delete the multipart upload. From 85122cd811eb892c182e4fa7ef28a3db74f514bb Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 13:06:31 +0100 Subject: [PATCH 02/11] testing: add TestMultipartUploadWrappedByPartialSlabs --- api/setting.go | 6 +++ internal/testing/cluster_test.go | 66 ++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/api/setting.go b/api/setting.go index 07bb605a5..d11089010 100644 --- a/api/setting.go +++ b/api/setting.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" ) @@ -120,6 +121,11 @@ func (rs RedundancySettings) Redundancy() float64 { return float64(rs.TotalShards) / float64(rs.MinShards) } +// SlabSizeNoRedundancy returns the size of a slab without added redundancy. +func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 { + return uint64(rs.MinShards) * rhpv2.SectorSize +} + // Validate returns an error if the redundancy settings are not considered // valid. func (rs RedundancySettings) Validate() error { diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index ddf1da12d..ce8e70d8d 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -2294,3 +2294,69 @@ func TestBusRecordedMetrics(t *testing.T) { t.Fatal("expected zero ListSpending") } } + +func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + cluster := newTestCluster(t, testClusterOptions{ + hosts: testRedundancySettings.TotalShards, + uploadPacking: true, + }) + defer cluster.Shutdown() + defer cluster.Shutdown() + b := cluster.Bus + w := cluster.Worker + slabSize := testRedundancySettings.SlabSizeNoRedundancy() + tt := cluster.tt + + // Start a new multipart upload. + objPath := "/foo" + mpr, err := b.CreateMultipartUpload(context.Background(), api.DefaultBucketName, objPath, api.CreateMultipartOptions{Key: object.GenerateEncryptionKey()}) + tt.OK(err) + if mpr.UploadID == "" { + t.Fatal("expected non-empty upload ID") + } + + // Upload a part that is a partial slab. + part1Data := frand.Bytes(int(slabSize / 4)) + resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{}) + tt.OK(err) + + // Upload a part that is exactly a full slab. + part2Data := frand.Bytes(int(slabSize)) + resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{}) + tt.OK(err) + + // Upload another part the same size as the first one. + part3Data := frand.Bytes(int(slabSize / 4)) + resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{}) + tt.OK(err) + + // Finish the upload. + tt.OKAll(b.CompleteMultipartUpload(context.Background(), api.DefaultBucketName, objPath, mpr.UploadID, []api.MultipartCompletedPart{ + { + PartNumber: 1, + ETag: resp1.ETag, + }, + { + PartNumber: 2, + ETag: resp2.ETag, + }, + { + PartNumber: 3, + ETag: resp3.ETag, + }, + })) + + // Download the object and verify its integrity. + dst := new(bytes.Buffer) + expectedData := append(part1Data, append(part2Data, part3Data...)...) + tt.OK(w.DownloadObject(context.Background(), dst, api.DefaultBucketName, objPath, api.DownloadObjectOptions{})) + if dst.Len() != len(expectedData) { + t.Fatalf("expected %v bytes, got %v", len(expectedData), dst.Len()) + } else if !bytes.Equal(dst.Bytes(), expectedData) { + t.Fatal("unexpected data") + } +} From 21367df83e647d471be62e7e3eb7e58de0fae600 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 13:36:11 +0100 Subject: [PATCH 03/11] object: remove object.PartialSlab type --- api/multipart.go | 15 +++++++------- api/slab.go | 4 ++-- bus/bus.go | 6 +++--- bus/client/multipart-upload.go | 17 +++++++-------- bus/client/slabs.go | 2 +- object/object.go | 8 ++----- object/slab.go | 10 ++++----- stores/metadata.go | 38 ++++++++++++---------------------- stores/metadata_test.go | 12 +++++------ stores/multipart.go | 7 ++----- stores/multipart_test.go | 7 ++----- stores/slabbuffer.go | 30 ++++++++++++++++----------- worker/download.go | 17 ++------------- worker/upload.go | 10 ++++++--- worker/worker.go | 6 +++--- 15 files changed, 80 insertions(+), 109 deletions(-) diff --git a/api/multipart.go b/api/multipart.go index 5caebaeb4..a9953aec9 100644 --- a/api/multipart.go +++ b/api/multipart.go @@ -60,14 +60,13 @@ type ( } MultipartAddPartRequest struct { - Bucket string `json:"bucket"` - ETag string `json:"eTag"` - Path string `json:"path"` - ContractSet string `json:"contractSet"` - UploadID string `json:"uploadID"` - PartialSlabs []object.PartialSlab `json:"partialSlabs"` - PartNumber int `json:"partNumber"` - Slices []object.SlabSlice `json:"slices"` + Bucket string `json:"bucket"` + ETag string `json:"eTag"` + Path string `json:"path"` + ContractSet string `json:"contractSet"` + UploadID string `json:"uploadID"` + PartNumber int `json:"partNumber"` + Slices []object.SlabSlice `json:"slices"` } MultipartCompleteResponse struct { diff --git a/api/slab.go b/api/slab.go index 56a5272eb..1a5d3fc79 100644 --- a/api/slab.go +++ b/api/slab.go @@ -34,8 +34,8 @@ type ( type ( AddPartialSlabResponse struct { - SlabBufferMaxSizeSoftReached bool `json:"slabBufferMaxSizeSoftReached"` - Slabs []object.PartialSlab `json:"slabs"` + SlabBufferMaxSizeSoftReached bool `json:"slabBufferMaxSizeSoftReached"` + Slabs []object.SlabSlice `json:"slabs"` } // MigrationSlabsRequest is the request type for the /slabs/migration endpoint. diff --git a/bus/bus.go b/bus/bus.go index d1ffb5e37..fb71e5a39 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -150,7 +150,7 @@ type ( UpdateObject(ctx context.Context, bucketName, path, contractSet, ETag, mimeType string, o object.Object) error AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error) - AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error) + AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string) (api.MultipartCreateResponse, error) MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error) @@ -161,7 +161,7 @@ type ( PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error) SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error) - AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, bufferSize int64, err error) + AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, bufferSize int64, err error) FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error) Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error) RefreshHealth(ctx context.Context) error @@ -2138,7 +2138,7 @@ func (b *bus) multipartHandlerUploadPartPUT(jc jape.Context) { jc.Error(errors.New("upload_id must be non-empty"), http.StatusBadRequest) return } - err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices, req.PartialSlabs) + err := b.ms.AddMultipartPart(jc.Request.Context(), req.Bucket, req.Path, req.ContractSet, req.ETag, req.UploadID, req.PartNumber, req.Slices) if jc.Check("failed to upload part", err) != nil { return } diff --git a/bus/client/multipart-upload.go b/bus/client/multipart-upload.go index b5f688327..e12c1c43d 100644 --- a/bus/client/multipart-upload.go +++ b/bus/client/multipart-upload.go @@ -19,16 +19,15 @@ func (c *Client) AbortMultipartUpload(ctx context.Context, bucket, path string, } // AddMultipartPart adds a part to a multipart upload. -func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlab []object.PartialSlab) (err error) { +func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) { err = c.c.WithContext(ctx).PUT("/multipart/part", api.MultipartAddPartRequest{ - Bucket: bucket, - ETag: eTag, - Path: path, - ContractSet: contractSet, - UploadID: uploadID, - PartNumber: partNumber, - Slices: slices, - PartialSlabs: partialSlab, + Bucket: bucket, + ETag: eTag, + Path: path, + ContractSet: contractSet, + UploadID: uploadID, + PartNumber: partNumber, + Slices: slices, }) return } diff --git a/bus/client/slabs.go b/bus/client/slabs.go index 962895c37..e407e0360 100644 --- a/bus/client/slabs.go +++ b/bus/client/slabs.go @@ -16,7 +16,7 @@ import ( ) // AddPartialSlab adds a partial slab to the bus. -func (c *Client) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, slabBufferMaxSizeSoftReached bool, err error) { +func (c *Client) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, slabBufferMaxSizeSoftReached bool, err error) { c.c.Custom("POST", "/slabs/partial", nil, &api.AddPartialSlabResponse{}) values := url.Values{} values.Set("minShards", fmt.Sprint(minShards)) diff --git a/object/object.go b/object/object.go index 78aebe293..7a4ebf000 100644 --- a/object/object.go +++ b/object/object.go @@ -111,9 +111,8 @@ func GenerateEncryptionKey() EncryptionKey { // An Object is a unit of data that has been stored on a host. type Object struct { - Key EncryptionKey `json:"key"` - Slabs []SlabSlice `json:"slabs"` - PartialSlabs []PartialSlab `json:"partialSlab,omitempty"` + Key EncryptionKey `json:"key"` + Slabs []SlabSlice `json:"slabs"` } // NewObject returns a new Object with a random key. @@ -146,9 +145,6 @@ func (o Object) TotalSize() int64 { for _, ss := range o.Slabs { n += int64(ss.Length) } - for _, partialSlab := range o.PartialSlabs { - n += int64(partialSlab.Length) - } return n } diff --git a/object/slab.go b/object/slab.go index 36b802b7c..bf7e5500f 100644 --- a/object/slab.go +++ b/object/slab.go @@ -24,14 +24,12 @@ type Sector struct { type Slab struct { Health float64 `json:"health"` Key EncryptionKey `json:"key"` - MinShards uint8 `json:"minShards"` - Shards []Sector `json:"shards"` + MinShards uint8 `json:"minShards,omitempty"` + Shards []Sector `json:"shards,omitempty"` } -type PartialSlab struct { - Key EncryptionKey `json:"key"` - Offset uint32 `json:"offset"` - Length uint32 `json:"length"` +func (s Slab) IsPartial() bool { + return s.MinShards == 0 && len(s.Shards) == 0 } // NewSlab returns a new slab for the shards. diff --git a/stores/metadata.go b/stores/metadata.go index f44ad599b..8bbceeaf5 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -412,16 +412,13 @@ func (raw rawObject) convert() (api.Object, error) { // filter out slabs without slab ID and buffered slabs - this is expected // for an empty object or objects that end with a partial slab. var filtered rawObject - var partialSlabSectors []*rawObjectSector minHealth := math.MaxFloat64 - for i, sector := range raw { - if sector.SlabID != 0 && !sector.SlabBuffered { + for _, sector := range raw { + if sector.SlabID != 0 { filtered = append(filtered, sector) if sector.SlabHealth < minHealth { minHealth = sector.SlabHealth } - } else if sector.SlabBuffered { - partialSlabSectors = append(partialSlabSectors, &raw[i]) } } @@ -462,20 +459,6 @@ func (raw rawObject) convert() (api.Object, error) { } } - // fetch a potential partial slab from the buffer. - var partialSlabs []object.PartialSlab - for _, pss := range partialSlabSectors { - var key object.EncryptionKey - if err := key.UnmarshalBinary(pss.SlabKey); err != nil { - return api.Object{}, err - } - partialSlabs = append(partialSlabs, object.PartialSlab{ - Key: key, - Offset: pss.SliceOffset, - Length: pss.SliceLength, - }) - } - // return object return api.Object{ ObjectMetadata: api.ObjectMetadata{ @@ -487,9 +470,8 @@ func (raw rawObject) convert() (api.Object, error) { Size: raw[0].ObjectSize, }, Object: object.Object{ - Key: key, - PartialSlabs: partialSlabs, - Slabs: slabs, + Key: key, + Slabs: slabs, }, }, nil } @@ -1486,7 +1468,7 @@ func (s *SQLStore) FetchPartialSlab(ctx context.Context, ec object.EncryptionKey return s.slabBufferMgr.FetchPartialSlab(ctx, ec, offset, length) } -func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) ([]object.PartialSlab, int64, error) { +func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) ([]object.SlabSlice, int64, error) { var contractSetID uint if err := s.db.Raw("SELECT id FROM contract_sets WHERE name = ?", contractSet).Scan(&contractSetID).Error; err != nil { return nil, 0, err @@ -1712,7 +1694,7 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, } // Create all slices. This also creates any missing slabs or sectors. - if err := s.createSlices(tx, &obj.ID, nil, cs.ID, contracts, o.Slabs, o.PartialSlabs); err != nil { + if err := s.createSlices(tx, &obj.ID, nil, cs.ID, contracts, o.Slabs); err != nil { return fmt.Errorf("failed to create slices: %w", err) } return nil @@ -2004,12 +1986,18 @@ func (s *SQLStore) UnhealthySlabs(ctx context.Context, healthCutoff float64, set return slabs, nil } -func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.FileContractID]dbContract, slices []object.SlabSlice, partialSlabs []object.PartialSlab) error { +func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.FileContractID]dbContract, slices []object.SlabSlice) error { if (objID == nil && multiPartID == nil) || (objID != nil && multiPartID != nil) { return fmt.Errorf("either objID or multiPartID must be set") } + var partialSlabs []object.SlabSlice for i, ss := range slices { + // Handle the partial slabs later. + if ss.IsPartial() { + partialSlabs = append(partialSlabs, ss) + continue + } // Create Slab if it doesn't exist yet. slabKey, err := ss.Key.MarshalBinary() if err != nil { diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 5c8d74220..dc89e18c3 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -2561,8 +2561,8 @@ func TestPartialSlab(t *testing.T) { assertBuffer(buffer1Name, 4, false, false) // Use the added partial slab to create an object. - testObject := func(partialSlabs []object.PartialSlab) object.Object { - return object.Object{ + testObject := func(partialSlabs []object.SlabSlice) object.Object { + obj := object.Object{ Key: object.GenerateEncryptionKey(), Slabs: []object.SlabSlice{ { @@ -2579,8 +2579,9 @@ func TestPartialSlab(t *testing.T) { Length: rhpv2.SectorSize, }, }, - PartialSlabs: slabs, } + obj.Slabs = append(obj.Slabs, partialSlabs...) + return obj } obj := testObject(slabs) err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "key", testContractSet, testETag, testMimeType, obj) @@ -3296,9 +3297,8 @@ func TestMarkSlabUploadedAfterRenew(t *testing.T) { // add it to an object to prevent it from getting pruned. err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, "", "", object.Object{ - Key: object.GenerateEncryptionKey(), - Slabs: nil, - PartialSlabs: ps, + Key: object.GenerateEncryptionKey(), + Slabs: ps, }) if err != nil { t.Fatal(err) diff --git a/stores/multipart.go b/stores/multipart.go index 118297732..95f26ebbd 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -85,7 +85,7 @@ func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path strin }, err } -func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlabs []object.PartialSlab) (err error) { +func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) { // collect all used contracts usedContracts := make(map[types.PublicKey]map[types.FileContractID]struct{}) for _, s := range slices { @@ -131,9 +131,6 @@ func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractS for _, slice := range slices { size += uint64(slice.Length) } - for _, ps := range partialSlabs { - size += uint64(ps.Length) - } // Create a new part. part := dbMultipartPart{ Etag: eTag, @@ -146,7 +143,7 @@ func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractS return fmt.Errorf("failed to create part: %w", err) } // Create the slices. - err = s.createSlices(tx, nil, &part.ID, cs.ID, contracts, slices, partialSlabs) + err = s.createSlices(tx, nil, &part.ID, cs.ID, contracts, slices) if err != nil { return fmt.Errorf("failed to create slices: %w", err) } diff --git a/stores/multipart_test.go b/stores/multipart_test.go index 3db14ea3d..eb534dca7 100644 --- a/stores/multipart_test.go +++ b/stores/multipart_test.go @@ -58,7 +58,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { t.Fatal(err) } etag := hex.EncodeToString(frand.Bytes(16)) - err = ss.AddMultipartPart(ctx, api.DefaultBucketName, objName, testContractSet, etag, resp.UploadID, i, []object.SlabSlice{}, partialSlabs) + err = ss.AddMultipartPart(ctx, api.DefaultBucketName, objName, testContractSet, etag, resp.UploadID, i, partialSlabs) if err != nil { t.Fatal(err) } @@ -132,10 +132,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { t.Fatalf("expected object size to be %v, got %v", totalSize, obj.Size) } else if obj.TotalSize() != totalSize { for _, f := range obj.Slabs { - fmt.Println("slice", f.Length) - } - for _, f := range obj.PartialSlabs { - fmt.Println("ps", f.Length) + fmt.Println("slice", f.Length, f.IsPartial()) } t.Fatalf("expected object total size to be %v, got %v", totalSize, obj.TotalSize()) } diff --git a/stores/slabbuffer.go b/stores/slabbuffer.go index 40e7c944f..5cf666a2c 100644 --- a/stores/slabbuffer.go +++ b/stores/slabbuffer.go @@ -149,7 +149,7 @@ func (mgr *SlabBufferManager) Close() error { return errors.Join(errs...) } -func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet uint) ([]object.PartialSlab, int64, error) { +func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet uint) ([]object.SlabSlice, int64, error) { gid := bufferGID(minShards, totalShards, uint32(contractSet)) // Sanity check input. @@ -171,8 +171,8 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m // Find a buffer to use. We use at most 1 existing buffer + either 1 buffer // that can fit the remainder of the data or 1 new buffer to avoid splitting // the data over too many slabs. - var slab object.PartialSlab - var slabs []object.PartialSlab + var slab object.SlabSlice + var slabs []object.SlabSlice var err error var usedBuffers []*SlabBuffer for _, buffer := range buffers { @@ -432,19 +432,22 @@ func (buf *SlabBuffer) acquireForUpload(lockingDuration time.Duration) bool { return true } -func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool) (object.PartialSlab, []byte, bool, error) { +func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool) (object.SlabSlice, []byte, bool, error) { buf.mu.Lock() defer buf.mu.Unlock() remainingSpace := buf.maxSize - buf.size if remainingSpace == 0 { - return object.PartialSlab{}, data, false, nil + return object.SlabSlice{}, data, false, nil } else if int64(len(data)) <= remainingSpace { _, err := buf.file.WriteAt(data, buf.size) if err != nil { - return object.PartialSlab{}, nil, true, err + return object.SlabSlice{}, nil, true, err } - slab := object.PartialSlab{ - Key: buf.slabKey, + slab := object.SlabSlice{ + Slab: object.Slab{ + Health: 1, + Key: buf.slabKey, + }, Offset: uint32(buf.size), Length: uint32(len(data)), } @@ -453,17 +456,20 @@ func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool) (object.PartialSl } else if !mustFit { _, err := buf.file.WriteAt(data[:remainingSpace], buf.size) if err != nil { - return object.PartialSlab{}, nil, true, err + return object.SlabSlice{}, nil, true, err } - slab := object.PartialSlab{ - Key: buf.slabKey, + slab := object.SlabSlice{ + Slab: object.Slab{ + Health: 1, + Key: buf.slabKey, + }, Offset: uint32(buf.size), Length: uint32(remainingSpace), } buf.size += remainingSpace return slab, data[remainingSpace:], true, nil } else { - return object.PartialSlab{}, data, false, nil + return object.SlabSlice{}, data, false, nil } } diff --git a/worker/download.go b/worker/download.go index 425bcbcd5..aa3f5e8ce 100644 --- a/worker/download.go +++ b/worker/download.go @@ -213,20 +213,7 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o for _, s := range o.Slabs { ss = append(ss, slabSlice{ SlabSlice: s, - PartialSlab: false, - }) - } - for _, ps := range o.PartialSlabs { - // add fake slab for partial slabs - ss = append(ss, slabSlice{ - SlabSlice: object.SlabSlice{ - Slab: object.Slab{ - Key: ps.Key, - }, - Offset: ps.Offset, - Length: ps.Length, - }, - PartialSlab: true, + PartialSlab: s.IsPartial(), }) } slabs := slabsForDownload(ss, offset, length) @@ -245,7 +232,7 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o } if slab != nil { slabs[i].SlabSlice.Slab = *slab - slabs[i].PartialSlab = false + slabs[i].PartialSlab = slab.IsPartial() } else { slabs[i].Data = data } diff --git a/worker/upload.go b/worker/upload.go index 258c4d2ab..62c9f6b08 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -261,10 +261,12 @@ func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, o // add partial slabs var bufferSizeLimitReached bool if len(partialSlabData) > 0 { - obj.PartialSlabs, bufferSizeLimitReached, err = w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) + var pss []object.SlabSlice + pss, bufferSizeLimitReached, err = w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) if err != nil { return "", err } + obj.Slabs = append(obj.Slabs, pss...) } // persist the object @@ -298,14 +300,16 @@ func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, bucket, path, // add parital slabs var bufferSizeLimitReached bool if len(partialSlabData) > 0 { - obj.PartialSlabs, bufferSizeLimitReached, err = w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) + var pss []object.SlabSlice + pss, bufferSizeLimitReached, err = w.bus.AddPartialSlab(ctx, partialSlabData, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet) if err != nil { return "", err } + obj.Slabs = append(obj.Slabs, pss...) } // persist the part - err = w.bus.AddMultipartPart(ctx, bucket, path, up.contractSet, eTag, uploadID, partNumber, obj.Slabs, obj.PartialSlabs) + err = w.bus.AddMultipartPart(ctx, bucket, path, up.contractSet, eTag, uploadID, partNumber, obj.Slabs) if err != nil { return "", fmt.Errorf("couldn't add multi part: %w", err) } diff --git a/worker/worker.go b/worker/worker.go index 97d366c03..ed4a81b61 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -146,10 +146,10 @@ type Bus interface { AddObject(ctx context.Context, bucket, path, contractSet string, o object.Object, opts api.AddObjectOptions) error DeleteObject(ctx context.Context, bucket, path string, opts api.DeleteObjectOptions) error - AddMultipartPart(ctx context.Context, bucket, path, contractSet, ETag, uploadID string, partNumber int, slices []object.SlabSlice, partialSlabs []object.PartialSlab) (err error) + AddMultipartPart(ctx context.Context, bucket, path, contractSet, ETag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, err error) - AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, slabBufferMaxSizeSoftReached bool, err error) + AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, slabBufferMaxSizeSoftReached bool, err error) FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error) Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error) @@ -1035,7 +1035,7 @@ func (w *worker) objectsHandlerGET(jc jape.Context) { } // return early if the object is empty - if len(res.Object.Slabs) == 0 && len(res.Object.PartialSlabs) == 0 { + if len(res.Object.Slabs) == 0 { return } From 27c99557119b92a83b17d52fe03ba7acb8e3f7f9 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 14:10:54 +0100 Subject: [PATCH 04/11] stores: fix TestPartialSlab --- object/slab.go | 10 ++++++++++ stores/metadata.go | 23 +++++++++++++++-------- stores/slabbuffer.go | 16 +++++----------- 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/object/slab.go b/object/slab.go index bf7e5500f..f4176fae9 100644 --- a/object/slab.go +++ b/object/slab.go @@ -40,6 +40,16 @@ func NewSlab(minShards uint8) Slab { } } +// NewPartialSlab returns a new partial slab. +func NewPartialSlab(ec EncryptionKey, minShards uint8) Slab { + return Slab{ + Health: 1, + Key: ec, + MinShards: minShards, + Shards: nil, + } +} + // ContractsFromShards is a helper to extract all contracts used by a set of // shards. func ContractsFromShards(shards []Sector) map[types.PublicKey]map[types.FileContractID]struct{} { diff --git a/stores/metadata.go b/stores/metadata.go index 8bbceeaf5..90d35eaeb 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -186,6 +186,7 @@ type ( rawObjectSector struct { // object ObjectID uint + ObjectIndex uint64 ObjectKey []byte ObjectName string ObjectSize int64 @@ -428,9 +429,6 @@ func (raw rawObject) convert() (api.Object, error) { var start int // create a helper function to add a slab and update the state addSlab := func(end int) error { - if filtered[start].SlabBuffered { - return nil // ignore partial slabs - } if slab, err := filtered[start:end].toSlabSlice(); err != nil { return err } else { @@ -442,12 +440,10 @@ func (raw rawObject) convert() (api.Object, error) { curr := filtered[0] for j, sector := range filtered { - if sector.SectorIndex == 0 { + if sector.ObjectIndex == 0 { return api.Object{}, api.ErrObjectCorrupted } - if sector.SlabID != curr.SlabID || - sector.SliceOffset != curr.SliceOffset || - sector.SliceLength != curr.SliceLength { + if sector.ObjectIndex != curr.ObjectIndex { if err := addSlab(j); err != nil { return api.Object{}, err } @@ -479,6 +475,8 @@ func (raw rawObject) convert() (api.Object, error) { func (raw rawObject) toSlabSlice() (slice object.SlabSlice, _ error) { if len(raw) == 0 { return object.SlabSlice{}, errors.New("no sectors found") + } else if raw[0].SlabBuffered && len(raw) != 1 { + return object.SlabSlice{}, errors.New("buffered slab with multiple sectors") } // unmarshal key @@ -486,6 +484,15 @@ func (raw rawObject) toSlabSlice() (slice object.SlabSlice, _ error) { return object.SlabSlice{}, err } + // handle partial slab + if raw[0].SlabBuffered { + slice.Offset = raw[0].SliceOffset + slice.Length = raw[0].SliceLength + slice.Slab.MinShards = raw[0].SlabMinShards + slice.Slab.Health = raw[0].SlabHealth + return + } + // hydrate all sectors slabID := raw[0].SlabID sectors := make([]object.Sector, 0, len(raw)) @@ -2104,7 +2111,7 @@ func (s *SQLStore) object(ctx context.Context, txn *gorm.DB, bucket string, path // accordingly var rows rawObject tx := s.db. - Select("o.id as ObjectID, o.health as ObjectHealth, o.key as ObjectKey, o.object_id as ObjectName, o.size as ObjectSize, o.mime_type as ObjectMimeType, o.created_at as ObjectModTime, o.etag as ObjectETag, sli.object_index, sli.offset as SliceOffset, sli.length as SliceLength, sla.id as SlabID, sla.health as SlabHealth, sla.key as SlabKey, sla.min_shards as SlabMinShards, bs.id IS NOT NULL AS SlabBuffered, sec.slab_index as SectorIndex, sec.root as SectorRoot, sec.latest_host as LatestHost, c.fcid as FCID, h.public_key as HostKey"). + Select("o.id as ObjectID, o.health as ObjectHealth, sli.object_index as ObjectIndex, o.key as ObjectKey, o.object_id as ObjectName, o.size as ObjectSize, o.mime_type as ObjectMimeType, o.created_at as ObjectModTime, o.etag as ObjectETag, sli.object_index, sli.offset as SliceOffset, sli.length as SliceLength, sla.id as SlabID, sla.health as SlabHealth, sla.key as SlabKey, sla.min_shards as SlabMinShards, bs.id IS NOT NULL AS SlabBuffered, sec.slab_index as SectorIndex, sec.root as SectorRoot, sec.latest_host as LatestHost, c.fcid as FCID, h.public_key as HostKey"). Model(&dbObject{}). Table("objects o"). Joins("INNER JOIN buckets b ON o.db_bucket_id = b.id"). diff --git a/stores/slabbuffer.go b/stores/slabbuffer.go index 5cf666a2c..d894ffbea 100644 --- a/stores/slabbuffer.go +++ b/stores/slabbuffer.go @@ -177,7 +177,7 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m var usedBuffers []*SlabBuffer for _, buffer := range buffers { var used bool - slab, data, used, err = buffer.recordAppend(data, len(usedBuffers) > 0) + slab, data, used, err = buffer.recordAppend(data, len(usedBuffers) > 0, minShards) if err != nil { return nil, 0, err } @@ -201,7 +201,7 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m return nil, 0, err } var used bool - slab, data, used, err = sb.recordAppend(data, true) + slab, data, used, err = sb.recordAppend(data, true, minShards) if err != nil { return nil, 0, err } @@ -432,7 +432,7 @@ func (buf *SlabBuffer) acquireForUpload(lockingDuration time.Duration) bool { return true } -func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool) (object.SlabSlice, []byte, bool, error) { +func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool, minShards uint8) (object.SlabSlice, []byte, bool, error) { buf.mu.Lock() defer buf.mu.Unlock() remainingSpace := buf.maxSize - buf.size @@ -444,10 +444,7 @@ func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool) (object.SlabSlice return object.SlabSlice{}, nil, true, err } slab := object.SlabSlice{ - Slab: object.Slab{ - Health: 1, - Key: buf.slabKey, - }, + Slab: object.NewPartialSlab(buf.slabKey, minShards), Offset: uint32(buf.size), Length: uint32(len(data)), } @@ -459,10 +456,7 @@ func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool) (object.SlabSlice return object.SlabSlice{}, nil, true, err } slab := object.SlabSlice{ - Slab: object.Slab{ - Health: 1, - Key: buf.slabKey, - }, + Slab: object.NewPartialSlab(buf.slabKey, minShards), Offset: uint32(buf.size), Length: uint32(remainingSpace), } From 076e64a39f23997b493dd0ef1d96ec8782099d4b Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 14:19:27 +0100 Subject: [PATCH 05/11] stores: fix TestObjectBasic --- stores/metadata.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stores/metadata.go b/stores/metadata.go index 90d35eaeb..7527f5e76 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -442,6 +442,8 @@ func (raw rawObject) convert() (api.Object, error) { for j, sector := range filtered { if sector.ObjectIndex == 0 { return api.Object{}, api.ErrObjectCorrupted + } else if sector.SectorIndex == 0 && !sector.SlabBuffered { + return api.Object{}, api.ErrObjectCorrupted } if sector.ObjectIndex != curr.ObjectIndex { if err := addSlab(j); err != nil { From f5aacd93aa4b054b69502a5f2ff4e72894245d85 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 14:48:33 +0100 Subject: [PATCH 06/11] testing: fix TestUploadPacking --- internal/testing/cluster_test.go | 9 +++++---- object/slab.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index ce8e70d8d..31edd22d2 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -2352,11 +2352,12 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { // Download the object and verify its integrity. dst := new(bytes.Buffer) - expectedData := append(part1Data, append(part2Data, part3Data...)...) tt.OK(w.DownloadObject(context.Background(), dst, api.DefaultBucketName, objPath, api.DownloadObjectOptions{})) - if dst.Len() != len(expectedData) { - t.Fatalf("expected %v bytes, got %v", len(expectedData), dst.Len()) - } else if !bytes.Equal(dst.Bytes(), expectedData) { + expectedData := append(part1Data, append(part2Data, part3Data...)...) + receivedData := dst.Bytes() + if len(receivedData) != len(expectedData) { + t.Fatalf("expected %v bytes, got %v", len(expectedData), len(receivedData)) + } else if !bytes.Equal(receivedData, expectedData) { t.Fatal("unexpected data") } } diff --git a/object/slab.go b/object/slab.go index f4176fae9..367e4cf54 100644 --- a/object/slab.go +++ b/object/slab.go @@ -29,7 +29,7 @@ type Slab struct { } func (s Slab) IsPartial() bool { - return s.MinShards == 0 && len(s.Shards) == 0 + return len(s.Shards) == 0 } // NewSlab returns a new slab for the shards. From e3aacc2ba877542ba59628056934114032c6a0ca Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 14:59:28 +0100 Subject: [PATCH 07/11] testing: fix TestMultipartUploadWrappedByPartialSlabs --- internal/testing/cluster_test.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index 31edd22d2..3672e1720 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -2311,7 +2311,7 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { slabSize := testRedundancySettings.SlabSizeNoRedundancy() tt := cluster.tt - // Start a new multipart upload. + // start a new multipart upload. We upload the parts in reverse order objPath := "/foo" mpr, err := b.CreateMultipartUpload(context.Background(), api.DefaultBucketName, objPath, api.CreateMultipartOptions{Key: object.GenerateEncryptionKey()}) tt.OK(err) @@ -2319,22 +2319,28 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { t.Fatal("expected non-empty upload ID") } - // Upload a part that is a partial slab. - part1Data := frand.Bytes(int(slabSize / 4)) - resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{}) + // upload a part that is a partial slab + part3Data := bytes.Repeat([]byte{3}, int(slabSize)/4) + resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{ + EncryptionOffset: int(slabSize + slabSize/4), + }) tt.OK(err) - // Upload a part that is exactly a full slab. - part2Data := frand.Bytes(int(slabSize)) - resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{}) + // upload a part that is exactly a full slab + part2Data := bytes.Repeat([]byte{2}, int(slabSize)) + resp2, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part2Data), api.DefaultBucketName, objPath, mpr.UploadID, 2, api.UploadMultipartUploadPartOptions{ + EncryptionOffset: int(slabSize / 4), + }) tt.OK(err) - // Upload another part the same size as the first one. - part3Data := frand.Bytes(int(slabSize / 4)) - resp3, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part3Data), api.DefaultBucketName, objPath, mpr.UploadID, 3, api.UploadMultipartUploadPartOptions{}) + // upload another part the same size as the first one + part1Data := bytes.Repeat([]byte{1}, int(slabSize)/4) + resp1, err := w.UploadMultipartUploadPart(context.Background(), bytes.NewReader(part1Data), api.DefaultBucketName, objPath, mpr.UploadID, 1, api.UploadMultipartUploadPartOptions{ + EncryptionOffset: 0, + }) tt.OK(err) - // Finish the upload. + // finish the upload tt.OKAll(b.CompleteMultipartUpload(context.Background(), api.DefaultBucketName, objPath, mpr.UploadID, []api.MultipartCompletedPart{ { PartNumber: 1, @@ -2350,7 +2356,7 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { }, })) - // Download the object and verify its integrity. + // download the object and verify its integrity dst := new(bytes.Buffer) tt.OK(w.DownloadObject(context.Background(), dst, api.DefaultBucketName, objPath, api.DownloadObjectOptions{})) expectedData := append(part1Data, append(part2Data, part3Data...)...) From 97883c4c0803fabeb6994dd5b044d1baf8bb9e8f Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 15:04:35 +0100 Subject: [PATCH 08/11] stores: address comment --- stores/multipart.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stores/multipart.go b/stores/multipart.go index 95f26ebbd..681f456a3 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -396,9 +396,9 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str return fmt.Errorf("failed to create object: %w", err) } - // Assign the right object id and unassign the multipart upload. Also - // clear the ID to make sure new slices are created with IDs in - // ascending order. + // Assign the right object id and unassign the multipart upload. Also + // set the right object_index to make sure the slices are sorted + // correctly when retrieving the object later. for i := range slices { err = tx.Model(&dbSlice{}). Where("id", slices[i].ID). From 9b9be200140a7c5a6a0ae469bf288d7a6cd41604 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 15:20:03 +0100 Subject: [PATCH 09/11] testing: fix TestNewTestCluster --- internal/testing/cluster_test.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index 3672e1720..aabe25a9f 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -31,18 +31,12 @@ import ( "lukechampine.com/frand" ) -const ( - testEtag = "d34db33f" - testMimeType = "application/octet-stream" -) - // TestNewTestCluster is a test for creating a cluster of Nodes for testing, // making sure that it forms contracts, renews contracts and shuts down. func TestNewTestCluster(t *testing.T) { cluster := newTestCluster(t, clusterOptsDefault) defer cluster.Shutdown() b := cluster.Bus - w := cluster.Worker tt := cluster.tt // Upload packing should be disabled by default. @@ -52,27 +46,6 @@ func TestNewTestCluster(t *testing.T) { t.Fatalf("expected upload packing to be disabled by default, got %v", ups.Enabled) } - // Try talking to the bus API by adding an object. - err = b.AddObject(context.Background(), api.DefaultBucketName, "foo", testAutopilotConfig.Contracts.Set, object.Object{ - Key: object.GenerateEncryptionKey(), - Slabs: []object.SlabSlice{ - { - Slab: object.Slab{ - Key: object.GenerateEncryptionKey(), - MinShards: 1, - Shards: []object.Sector{}, // slab without sectors - }, - Offset: 0, - Length: 0, - }, - }, - }, api.AddObjectOptions{MimeType: testMimeType, ETag: testEtag}) - tt.OK(err) - - // Try talking to the worker and request the object. - err = w.DeleteObject(context.Background(), api.DefaultBucketName, "foo", api.DeleteObjectOptions{}) - tt.OK(err) - // See if autopilot is running by triggering the loop. _, err = cluster.Autopilot.Trigger(false) tt.OK(err) From b2ed2eb0818508f246211fac0b9683e967b3dd8e Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 16:01:17 +0100 Subject: [PATCH 10/11] stores: simplify createSlices --- stores/metadata.go | 49 ++++++---------------------------------------- 1 file changed, 6 insertions(+), 43 deletions(-) diff --git a/stores/metadata.go b/stores/metadata.go index 7527f5e76..180bda6ad 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -2000,30 +2000,24 @@ func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractS return fmt.Errorf("either objID or multiPartID must be set") } - var partialSlabs []object.SlabSlice for i, ss := range slices { - // Handle the partial slabs later. - if ss.IsPartial() { - partialSlabs = append(partialSlabs, ss) - continue - } // Create Slab if it doesn't exist yet. slabKey, err := ss.Key.MarshalBinary() if err != nil { return fmt.Errorf("failed to marshal slab key: %w", err) } slab := &dbSlab{ - Key: slabKey, - MinShards: ss.MinShards, - TotalShards: uint8(len(ss.Shards)), + Key: slabKey, + DBContractSetID: contractSetID, + MinShards: ss.MinShards, + TotalShards: uint8(len(ss.Shards)), } err = tx.Where(dbSlab{Key: slabKey}). - Assign(dbSlab{ - DBContractSetID: contractSetID, - }). FirstOrCreate(&slab).Error if err != nil { return fmt.Errorf("failed to create slab %v/%v: %w", i+1, len(slices), err) + } else if slab.DBContractSetID != contractSetID { + return fmt.Errorf("slab already exists in another contract set %v != %v", slab.DBContractSetID, contractSetID) } // Create Slice. @@ -2071,37 +2065,6 @@ func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractS } } } - - // Handle partial slabs. We create a slice for each partial slab. - if len(partialSlabs) == 0 { - return nil - } - - for i, partialSlab := range partialSlabs { - key, err := partialSlab.Key.MarshalBinary() - if err != nil { - return err - } - var buffer dbBufferedSlab - err = tx.Joins("DBSlab"). - Take(&buffer, "DBSlab.key = ?", key). - Error - if err != nil { - return fmt.Errorf("failed to fetch buffered slab: %w", err) - } - - err = tx.Create(&dbSlice{ - DBObjectID: objID, - ObjectIndex: uint(len(slices) + i + 1), - DBMultipartPartID: multiPartID, - DBSlabID: buffer.DBSlab.ID, - Offset: partialSlab.Offset, - Length: partialSlab.Length, - }).Error - if err != nil { - return fmt.Errorf("failed to create slice for partial slab: %w", err) - } - } return nil } From 89e6b727052c29441bc317808fc89e2e0fa1f524 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 30 Nov 2023 16:37:36 +0100 Subject: [PATCH 11/11] stores: address comments --- object/slab.go | 2 +- stores/metadata_test.go | 4 ++-- stores/multipart_test.go | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/object/slab.go b/object/slab.go index 367e4cf54..9c3afa608 100644 --- a/object/slab.go +++ b/object/slab.go @@ -24,7 +24,7 @@ type Sector struct { type Slab struct { Health float64 `json:"health"` Key EncryptionKey `json:"key"` - MinShards uint8 `json:"minShards,omitempty"` + MinShards uint8 `json:"minShards"` Shards []Sector `json:"shards,omitempty"` } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index dc89e18c3..73b5a648d 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -3290,7 +3290,7 @@ func TestMarkSlabUploadedAfterRenew(t *testing.T) { // create a full buffered slab. completeSize := bufferedSlabSize(1) - ps, _, err := ss.AddPartialSlab(context.Background(), frand.Bytes(completeSize), 1, 1, testContractSet) + slabs, _, err := ss.AddPartialSlab(context.Background(), frand.Bytes(completeSize), 1, 1, testContractSet) if err != nil { t.Fatal(err) } @@ -3298,7 +3298,7 @@ func TestMarkSlabUploadedAfterRenew(t *testing.T) { // add it to an object to prevent it from getting pruned. err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, "", "", object.Object{ Key: object.GenerateEncryptionKey(), - Slabs: ps, + Slabs: slabs, }) if err != nil { t.Fatal(err) diff --git a/stores/multipart_test.go b/stores/multipart_test.go index eb534dca7..775c674e5 100644 --- a/stores/multipart_test.go +++ b/stores/multipart_test.go @@ -3,7 +3,6 @@ package stores import ( "context" "encoding/hex" - "fmt" "testing" "time" @@ -132,7 +131,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { t.Fatalf("expected object size to be %v, got %v", totalSize, obj.Size) } else if obj.TotalSize() != totalSize { for _, f := range obj.Slabs { - fmt.Println("slice", f.Length, f.IsPartial()) + t.Log("slice", f.Length, f.IsPartial()) } t.Fatalf("expected object total size to be %v, got %v", totalSize, obj.TotalSize()) }