Skip to content

Commit

Permalink
worker: add TestMigrateLostSector
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Mar 25, 2024
1 parent e317b0f commit f0eef4f
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 4 deletions.
44 changes: 41 additions & 3 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,21 @@ func (os *objectStoreMock) TrackUpload(ctx context.Context, uID api.UploadID) er
func (os *objectStoreMock) FinishUpload(ctx context.Context, uID api.UploadID) error { return nil }

func (os *objectStoreMock) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error {
os.mu.Lock()
defer os.mu.Unlock()

for _, objects := range os.objects {
for _, object := range objects {
for _, slab := range object.Slabs {
for _, shard := range slab.Slab.Shards {
if shard.Root == root {
delete(shard.Contracts, hk)
}
}
}
}
}

return nil
}

Expand Down Expand Up @@ -503,10 +518,33 @@ func (os *objectStoreMock) UpdateSlab(ctx context.Context, s object.Slab, contra

os.forEachObject(func(bucket, path string, o object.Object) {
for i, slab := range o.Slabs {
if slab.Key.String() == s.Key.String() {
os.objects[bucket][path].Slabs[i].Slab = s
return
if slab.Key.String() != s.Key.String() {
continue
}
// update slab
shards := os.objects[bucket][path].Slabs[i].Slab.Shards
for sI := range shards {
// overwrite latest host
shards[sI].LatestHost = s.Shards[sI].LatestHost

// merge contracts for each shard
existingContracts := make(map[types.FileContractID]struct{})
for _, fcids := range shards[sI].Contracts {
for _, fcid := range fcids {
existingContracts[fcid] = struct{}{}
}
}
for hk, fcids := range s.Shards[sI].Contracts {
for _, fcid := range fcids {
if _, exists := existingContracts[fcid]; exists {
continue
}
shards[sI].Contracts[hk] = append(shards[sI].Contracts[hk], fcids...)
}
}
}
os.objects[bucket][path].Slabs[i].Slab.Shards = shards
return
}
})

Expand Down
108 changes: 107 additions & 1 deletion worker/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,107 @@ func TestUploadPackedSlab(t *testing.T) {
}
}

func TestMigrateLostSector(t *testing.T) {
// create test worker
w := newTestWorker(t)

// add hosts to worker
w.AddHosts(testRedundancySettings.TotalShards * 2)

// convenience variables
os := w.os
mm := w.ulmm
dl := w.downloadManager
ul := w.uploadManager

// create test data
data := frand.Bytes(128)

// create upload params
params := testParameters(t.Name())

// upload data
_, _, err := ul.Upload(context.Background(), bytes.NewReader(data), w.Contracts(), params, lockingPriorityUpload)
if err != nil {
t.Fatal(err)
}

// grab the slab
o, err := os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{})
if err != nil {
t.Fatal(err)
} else if len(o.Object.Object.Slabs) != 1 {
t.Fatal("expected 1 slab")
}
slab := o.Object.Object.Slabs[0]

// build usedHosts hosts
usedHosts := make(map[types.PublicKey]struct{})
for _, shard := range slab.Shards {
usedHosts[shard.LatestHost] = struct{}{}
}

// assume the host of the first shard lost its sector
badHost := slab.Shards[0].LatestHost
badContract := slab.Shards[0].Contracts[badHost][0]
err = os.DeleteHostSector(context.Background(), badHost, slab.Shards[0].Root)
if err != nil {
t.Fatal(err)
}

// download the slab
shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts())
if err != nil {
t.Fatal(err)
}

// encrypt the shards
o.Object.Object.Slabs[0].Slab.Encrypt(shards)

// filter it down to the shards we need to migrate
shards = shards[:1]

// recreate upload contracts
contracts := make([]api.ContractMetadata, 0)
for _, c := range w.Contracts() {
_, used := usedHosts[c.HostKey]
if !used && c.HostKey != badHost {
contracts = append(contracts, c)
}
}

// migrate the shard away from the bad host
mem := mm.AcquireMemory(context.Background(), rhpv2.SectorSize)
err = ul.UploadShards(context.Background(), o.Object.Object.Slabs[0].Slab, []int{0}, shards, testContractSet, contracts, 0, lockingPriorityUpload, mem)
if err != nil {
t.Fatal(err)
}

// re-grab the slab
o, err = os.Object(context.Background(), testBucket, t.Name(), api.GetObjectOptions{})
if err != nil {
t.Fatal(err)
} else if len(o.Object.Object.Slabs) != 1 {
t.Fatal("expected 1 slab")
}
slab = o.Object.Object.Slabs[0]

// assert the bad shard is on a good host now
shard := slab.Shards[0]
if shard.LatestHost == badHost {
t.Fatal("latest host is bad")
} else if len(shard.Contracts) != 1 {
t.Fatal("expected 1 contract")
}
for _, fcids := range shard.Contracts {
for _, fcid := range fcids {
if fcid == badContract {
t.Fatal("contract belongs to bad host")
}
}
}
}

func TestUploadShards(t *testing.T) {
// create test worker
w := newTestWorker(t)
Expand Down Expand Up @@ -355,7 +456,12 @@ func TestUploadShards(t *testing.T) {
slab = o.Object.Object.Slabs[0]

// assert none of the shards are on bad hosts
for _, shard := range slab.Shards {
for i, shard := range slab.Shards {
if i%2 == 0 && len(shard.Contracts) != 1 {
t.Fatalf("expected 1 contract, got %v", len(shard.Contracts))
} else if i%2 != 0 && len(shard.Contracts) != 2 {
t.Fatalf("expected 2 contracts, got %v", len(shard.Contracts))
}
if _, bad := badHosts[shard.LatestHost]; bad {
t.Fatal("shard is on bad host", shard.LatestHost)
}
Expand Down

0 comments on commit f0eef4f

Please sign in to comment.