From e317b0fdb5fb26bb3d62ec94e93a122f903b9451 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 25 Mar 2024 10:18:30 +0100 Subject: [PATCH 1/2] worker: when migrating a slab, only set contracts for new shards --- worker/migrations.go | 4 ++-- worker/upload.go | 21 ++++----------------- worker/upload_test.go | 2 +- worker/worker.go | 2 +- 4 files changed, 8 insertions(+), 21 deletions(-) diff --git a/worker/migrations.go b/worker/migrations.go index 6c25b789f..075642dd5 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -10,7 +10,7 @@ import ( "go.sia.tech/renterd/object" ) -func (w *worker) migrate(ctx context.Context, s *object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) { +func (w *worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) { // make a map of good hosts goodHosts := make(map[types.PublicKey]map[types.FileContractID]bool) for _, c := range ulContracts { @@ -86,7 +86,7 @@ SHARDS: defer mem.Release() // download the slab - shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, *s, dlContracts) + shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) if err != nil { return 0, false, fmt.Errorf("failed to download slab for migration: %w", err) } diff --git a/worker/upload.go b/worker/upload.go index d146b920e..bc419d703 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -604,7 +604,7 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc return nil } -func (mgr *uploadManager) UploadShards(ctx context.Context, s *object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) { +func (mgr *uploadManager) UploadShards(ctx context.Context, s object.Slab, shardIndices []int, shards [][]byte, contractSet string, contracts []api.ContractMetadata, bh uint64, lockPriority int, mem Memory) (err error) { // cancel all in-flight requests when the upload is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -642,27 +642,14 @@ func (mgr *uploadManager) UploadShards(ctx context.Context, s *object.Slab, shar // overwrite the shards with the newly uploaded ones for i, si := range shardIndices { s.Shards[si].LatestHost = uploaded[i].LatestHost - - knownContracts := make(map[types.FileContractID]struct{}) - for _, fcids := range s.Shards[si].Contracts { - for _, fcid := range fcids { - knownContracts[fcid] = struct{}{} - } - } + s.Shards[si].Contracts = make(map[types.PublicKey][]types.FileContractID) for hk, fcids := range uploaded[i].Contracts { - for _, fcid := range fcids { - if _, exists := knownContracts[fcid]; !exists { - if s.Shards[si].Contracts == nil { - s.Shards[si].Contracts = make(map[types.PublicKey][]types.FileContractID) - } - s.Shards[si].Contracts[hk] = append(s.Shards[si].Contracts[hk], fcid) - } - } + s.Shards[si].Contracts[hk] = append(s.Shards[si].Contracts[hk], fcids...) } } // update the slab - return mgr.os.UpdateSlab(ctx, *s, contractSet) + return mgr.os.UpdateSlab(ctx, s, contractSet) } func (mgr *uploadManager) candidates(allowed map[types.PublicKey]struct{}) (candidates []*uploader) { diff --git a/worker/upload_test.go b/worker/upload_test.go index 1d441693f..cb4a7ee7b 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -340,7 +340,7 @@ func TestUploadShards(t *testing.T) { // migrate those shards away from bad hosts mem := mm.AcquireMemory(context.Background(), uint64(len(badIndices))*rhpv2.SectorSize) - err = ul.UploadShards(context.Background(), &o.Object.Object.Slabs[0].Slab, badIndices, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem) + err = ul.UploadShards(context.Background(), o.Object.Object.Slabs[0].Slab, badIndices, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem) if err != nil { t.Fatal(err) } diff --git a/worker/worker.go b/worker/worker.go index 89fe37a14..c6ce7a67a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -784,7 +784,7 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { } // migrate the slab - numShardsMigrated, surchargeApplied, err := w.migrate(ctx, &slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) + numShardsMigrated, surchargeApplied, err := w.migrate(ctx, slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) if err != nil { jc.Encode(api.MigrateSlabResponse{ NumShardsMigrated: numShardsMigrated, From f0eef4fd331397f5ba801636f17041a23c0ac41a Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 25 Mar 2024 12:20:27 +0100 Subject: [PATCH 2/2] worker: add TestMigrateLostSector --- worker/mocks_test.go | 44 +++++++++++++++-- worker/upload_test.go | 108 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 148 insertions(+), 4 deletions(-) diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 7b3609c0b..b8257e95c 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -388,6 +388,21 @@ func (os *objectStoreMock) TrackUpload(ctx context.Context, uID api.UploadID) er func (os *objectStoreMock) FinishUpload(ctx context.Context, uID api.UploadID) error { return nil } func (os *objectStoreMock) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error { + os.mu.Lock() + defer os.mu.Unlock() + + for _, objects := range os.objects { + for _, object := range objects { + for _, slab := range object.Slabs { + for _, shard := range slab.Slab.Shards { + if shard.Root == root { + delete(shard.Contracts, hk) + } + } + } + } + } + return nil } @@ -503,10 +518,33 @@ func (os *objectStoreMock) UpdateSlab(ctx context.Context, s object.Slab, contra os.forEachObject(func(bucket, path string, o object.Object) { for i, slab := range o.Slabs { - if slab.Key.String() == s.Key.String() { - os.objects[bucket][path].Slabs[i].Slab = s - return + if slab.Key.String() != s.Key.String() { + continue + } + // update slab + shards := os.objects[bucket][path].Slabs[i].Slab.Shards + for sI := range shards { + // overwrite latest host + shards[sI].LatestHost = s.Shards[sI].LatestHost + + // merge contracts for each shard + existingContracts := make(map[types.FileContractID]struct{}) + for _, fcids := range shards[sI].Contracts { + for _, fcid := range fcids { + existingContracts[fcid] = struct{}{} + } + } + for hk, fcids := range s.Shards[sI].Contracts { + for _, fcid := range fcids { + if _, exists := existingContracts[fcid]; exists { + continue + } + shards[sI].Contracts[hk] = append(shards[sI].Contracts[hk], fcids...) + } + } } + os.objects[bucket][path].Slabs[i].Slab.Shards = shards + return } }) diff --git a/worker/upload_test.go b/worker/upload_test.go index cb4a7ee7b..0b6308ffe 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -263,6 +263,107 @@ func TestUploadPackedSlab(t *testing.T) { } } +func TestMigrateLostSector(t *testing.T) { + // create test worker + w := newTestWorker(t) + + // add hosts to worker + w.AddHosts(testRedundancySettings.TotalShards * 2) + + // convenience variables + os := w.os + mm := w.ulmm + dl := w.downloadManager + ul := w.uploadManager + + // create test data + data := frand.Bytes(128) + + // create upload params + params := testParameters(t.Name()) + + // upload data + _, _, err := ul.Upload(context.Background(), bytes.NewReader(data), w.Contracts(), params, lockingPriorityUpload) + if err != nil { + t.Fatal(err) + } + + // grab the slab + o, err := os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{}) + if err != nil { + t.Fatal(err) + } else if len(o.Object.Object.Slabs) != 1 { + t.Fatal("expected 1 slab") + } + slab := o.Object.Object.Slabs[0] + + // build usedHosts hosts + usedHosts := make(map[types.PublicKey]struct{}) + for _, shard := range slab.Shards { + usedHosts[shard.LatestHost] = struct{}{} + } + + // assume the host of the first shard lost its sector + badHost := slab.Shards[0].LatestHost + badContract := slab.Shards[0].Contracts[badHost][0] + err = os.DeleteHostSector(context.Background(), badHost, slab.Shards[0].Root) + if err != nil { + t.Fatal(err) + } + + // download the slab + shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + if err != nil { + t.Fatal(err) + } + + // encrypt the shards + o.Object.Object.Slabs[0].Slab.Encrypt(shards) + + // filter it down to the shards we need to migrate + shards = shards[:1] + + // recreate upload contracts + contracts := make([]api.ContractMetadata, 0) + for _, c := range w.Contracts() { + _, used := usedHosts[c.HostKey] + if !used && c.HostKey != badHost { + contracts = append(contracts, c) + } + } + + // migrate the shard away from the bad host + mem := mm.AcquireMemory(context.Background(), rhpv2.SectorSize) + err = ul.UploadShards(context.Background(), o.Object.Object.Slabs[0].Slab, []int{0}, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem) + if err != nil { + t.Fatal(err) + } + + // re-grab the slab + o, err = os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{}) + if err != nil { + t.Fatal(err) + } else if len(o.Object.Object.Slabs) != 1 { + t.Fatal("expected 1 slab") + } + slab = o.Object.Object.Slabs[0] + + // assert the bad shard is on a good host now + shard := slab.Shards[0] + if shard.LatestHost == badHost { + t.Fatal("latest host is bad") + } else if len(shard.Contracts) != 1 { + t.Fatal("expected 1 contract") + } + for _, fcids := range shard.Contracts { + for _, fcid := range fcids { + if fcid == badContract { + t.Fatal("contract belongs to bad host") + } + } + } +} + func TestUploadShards(t *testing.T) { // create test worker w := newTestWorker(t) @@ -355,7 +456,12 @@ func TestUploadShards(t *testing.T) { slab = o.Object.Object.Slabs[0] // assert none of the shards are on bad hosts - for _, shard := range slab.Shards { + for i, shard := range slab.Shards { + if i%2 == 0 && len(shard.Contracts) != 1 { + t.Fatalf("expected 1 contract, got %v", len(shard.Contracts)) + } else if i%2 != 0 && len(shard.Contracts) != 2 { + t.Fatalf("expected 2 contracts, got %v", len(shard.Contracts)) + } if _, bad := badHosts[shard.LatestHost]; bad { t.Fatal("shard is on bad host", shard.LatestHost) }