Skip to content

Commit

Permalink
Merge pull request #699 from SiaFoundation/chris/fix-rs-upload
Browse files Browse the repository at this point in the history
Fix erasure coding related slab corruption
  • Loading branch information
ChrisSchinnerl authored Oct 31, 2023
2 parents 3ea0663 + ef00fc9 commit 51b3e3b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 70 deletions.
39 changes: 11 additions & 28 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,43 +72,26 @@ func (s Slab) Encode(buf []byte, shards [][]byte) {
}
}

// ReconstructSome reconstructs the required shards of a slab.
func (s Slab) ReconstructSome(shards [][]byte, required []bool) error {
// Reconstruct reconstructs the missing shards of a slab. Missing shards must
// have a len of zero. All shards should have a capacity of at least
// rhpv2.SectorSize, or they will be reallocated.
func (s Slab) Reconstruct(shards [][]byte) error {
for i := range shards {
// Make sure shards are either empty or full.
if len(shards[i]) != rhpv2.SectorSize && len(shards[i]) != 0 {
panic("shards must have a len of either 0 or rhpv2.SectorSize")
}
// Every required shard needs to have a sector worth of capacity.
if required[i] && cap(shards[i]) < rhpv2.SectorSize {
shards[i] = reedsolomon.AllocAligned(1, rhpv2.SectorSize)[0][:0]
if cap(shards[i]) < rhpv2.SectorSize {
shards[i] = make([]byte, 0, rhpv2.SectorSize)
}
}
// The size of the batch per shard that gets reconstructed.
var buf [rhpv2.SectorSize]byte
rsc, _ := reedsolomon.New(int(s.MinShards), len(shards)-int(s.MinShards))

dstShards := make([][]byte, len(shards))
for i, shard := range shards {
if len(shard) != 0 {
// keep shards that are already present
dstShards[i] = shards[i]
} else if required[i] {
// reconstruct required shards into 'shards'
dstShards[i] = shards[i][:0]
} else {
// reconstruct non-required shards into a temporary buffer
dstShards[i] = buf[:0]
if len(shards[i]) != 0 {
shards[i] = shards[i][:rhpv2.SectorSize]
}
}
if err := rsc.Reconstruct(dstShards); err != nil {

rsc, _ := reedsolomon.New(int(s.MinShards), len(shards)-int(s.MinShards))
if err := rsc.Reconstruct(shards); err != nil {
return err
}
for i := range shards {
if required[i] {
shards[i] = shards[i][:rhpv2.SectorSize]
}
}
return nil
}

Expand Down
31 changes: 2 additions & 29 deletions object/slab_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ func TestReedSolomon(t *testing.T) {
partialShards[i] = nil
}
// reconstruct
required := make([]bool, len(partialShards))
for i := range required {
required[i] = partialShards[i] == nil
}
if err := s.ReconstructSome(partialShards, required); err != nil {
if err := s.Reconstruct(partialShards); err != nil {
t.Fatal(err)
}
for i := range shards {
Expand All @@ -47,25 +43,6 @@ func TestReedSolomon(t *testing.T) {
break
}
}
// reconstruct one-by-one
for _, i := range frand.Perm(len(partialShards))[:7] {
partialShards[i] = nil
}
for i := 0; i < len(partialShards); i++ {
required := make([]bool, len(partialShards))
required[i] = true
if err := s.ReconstructSome(partialShards, required); err != nil {
t.Fatal(err)
} else if len(partialShards[i]) == 0 {
t.Error("failed to reconstruct shard", i)
}
}
for i := range shards {
if !bytes.Equal(shards[i], partialShards[i]) {
t.Fatal("failed to reconstruct shards")
break
}
}

// delete 7 random shards
for _, i := range frand.Perm(len(partialShards))[:7] {
Expand Down Expand Up @@ -132,18 +109,14 @@ func BenchmarkReedSolomon(b *testing.B) {
benchReconstruct := func(m, n, r uint8) func(*testing.B) {
s, data, shards := makeSlab(m, n)
s.Encode(data, shards)
required := make([]bool, len(shards))
return func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(len(shards[0])) * int64(r))
for i := 0; i < b.N; i++ {
for j := range shards[:r] {
shards[j] = shards[j][:0]
}
for j := range required {
required[j] = len(shards[j]) == 0
}
if err := s.ReconstructSome(shards, required); err != nil {
if err := s.Reconstruct(shards); err != nil {
b.Fatal(err)
}
}
Expand Down
13 changes: 4 additions & 9 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ outer:
return nil
}

func (mgr *downloadManager) DownloadMissingShards(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata, missing []bool) ([][]byte, error) {
func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, error) {
// refresh the downloaders
mgr.refreshDownloaders(contracts)

Expand Down Expand Up @@ -441,17 +441,12 @@ func (mgr *downloadManager) DownloadMissingShards(ctx context.Context, slab obje

// decrypt and recover
slice.Decrypt(resp.shards)
err := slice.ReconstructSome(resp.shards, missing)
err := slice.Reconstruct(resp.shards)
if err != nil {
return nil, err
}
missingShards := make([][]byte, 0, len(resp.shards))
for i, shard := range resp.shards {
if missing[i] {
missingShards = append(missingShards, shard)
}
}
return missingShards, nil

return resp.shards, err
}

func (mgr *downloadManager) Stats() downloadManagerStats {
Expand Down
11 changes: 7 additions & 4 deletions worker/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,17 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o
// collect indices of shards that need to be migrated
usedMap := make(map[types.FileContractID]struct{})
var shardIndices []int
requiredShards := make([]bool, len(s.Shards))
for i, shard := range s.Shards {
// bad host
if _, exists := goodHosts[shard.Host]; !exists {
shardIndices = append(shardIndices, i)
requiredShards[i] = true
continue
}

// reused host
_, exists := usedMap[h2c[shard.Host]]
if exists {
shardIndices = append(shardIndices, i)
requiredShards[i] = true
continue
}
usedMap[h2c[shard.Host]] = struct{}{}
Expand Down Expand Up @@ -77,12 +74,18 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o
}

// download the slab
shards, err := d.DownloadMissingShards(ctx, *s, dlContracts, requiredShards)
shards, err := d.DownloadSlab(ctx, *s, dlContracts)
if err != nil {
return nil, 0, fmt.Errorf("failed to download slab for migration: %w", err)
}
s.Encrypt(shards)

// filter it down to the shards we need to migrate
for i, si := range shardIndices {
shards[i] = shards[si]
}
shards = shards[:len(shardIndices)]

// filter upload contracts to the ones we haven't used yet
var allowed []api.ContractMetadata
for c := range ulContracts {
Expand Down

0 comments on commit 51b3e3b

Please sign in to comment.