Skip to content

Commit

Permalink
Merge branch 'pj/mock-bus' into pj/fix-packed-slab-uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Feb 27, 2024
2 parents 716091b + de4b8c8 commit 04de30b
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 136 deletions.
6 changes: 4 additions & 2 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type (
ContractSizes(ctx context.Context) (map[types.FileContractID]api.ContractSize, error)
ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error)

DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error
DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error)

Bucket(_ context.Context, bucketName string) (api.Bucket, error)
CreateBucket(_ context.Context, bucketName string, policy api.BucketPolicy) error
Expand Down Expand Up @@ -1409,9 +1409,11 @@ func (b *bus) sectorsHostRootHandlerDELETE(jc jape.Context) {
} else if jc.DecodeParam("root", &root) != nil {
return
}
err := b.ms.DeleteHostSector(jc.Request.Context(), hk, root)
n, err := b.ms.DeleteHostSector(jc.Request.Context(), hk, root)
if jc.Check("failed to mark sector as lost", err) != nil {
return
} else if n > 0 {
b.logger.Infow("successfully marked sector as lost", "hk", hk, "root", root)
}
}

Expand Down
17 changes: 17 additions & 0 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package object
import (
"bytes"
"crypto/cipher"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -142,6 +143,22 @@ func (o Object) Contracts() map[types.PublicKey]map[types.FileContractID]struct{
return usedContracts
}

func (o *Object) ComputeETag() string {
// calculate the eTag using the precomputed sector roots to avoid having to
// hash the entire object again.
h := md5.New()
b := make([]byte, 8)
for _, slab := range o.Slabs {
binary.LittleEndian.PutUint32(b[:4], slab.Offset)
binary.LittleEndian.PutUint32(b[4:], slab.Length)
h.Write(b)
for _, shard := range slab.Shards {
h.Write(shard.Root[:])
}
}
return string(hex.EncodeToString(h.Sum(nil)))
}

// TotalSize returns the total size of the object.
func (o Object) TotalSize() int64 {
var n int64
Expand Down
31 changes: 22 additions & 9 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package object
import (
"bytes"
"io"
"sync"

"github.com/klauspost/reedsolomon"
rhpv2 "go.sia.tech/core/rhp/v2"
Expand Down Expand Up @@ -79,11 +80,17 @@ func (s Slab) Length() int {
// Encrypt xors shards with the keystream derived from s.Key, using a
// different nonce for each shard.
func (s Slab) Encrypt(shards [][]byte) {
for i, shard := range shards {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:])
c.XORKeyStream(shard, shard)
var wg sync.WaitGroup
for i := range shards {
wg.Add(1)
go func(i int) {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:])
c.XORKeyStream(shards[i], shards[i])
wg.Done()
}(i)
}
wg.Wait()
}

// Encode encodes slab data into sector-sized shards. The supplied shards should
Expand Down Expand Up @@ -151,12 +158,18 @@ func (ss SlabSlice) SectorRegion() (offset, length uint32) {
// slice offset), using a different nonce for each shard.
func (ss SlabSlice) Decrypt(shards [][]byte) {
offset := ss.Offset / (rhpv2.LeafSize * uint32(ss.MinShards))
for i, shard := range shards {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:])
c.SetCounter(offset)
c.XORKeyStream(shard, shard)
var wg sync.WaitGroup
for i := range shards {
wg.Add(1)
go func(i int) {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:])
c.SetCounter(offset)
c.XORKeyStream(shards[i], shards[i])
wg.Done()
}(i)
}
wg.Wait()
}

// Recover recovers a slice of slab data from the supplied shards.
Expand Down
7 changes: 5 additions & 2 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1620,8 +1620,9 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath
return
}

func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error {
return s.retryTransaction(func(tx *gorm.DB) error {
func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error) {
var deletedSectors int
err := s.retryTransaction(func(tx *gorm.DB) error {
// Fetch contract_sectors to delete.
var sectors []dbContractSector
err := tx.Raw(`
Expand Down Expand Up @@ -1660,6 +1661,7 @@ func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, roo
} else if res.RowsAffected != int64(len(sectors)) {
return fmt.Errorf("expected %v affected rows but got %v", len(sectors), res.RowsAffected)
}
deletedSectors = len(sectors)

// Increment the host's lostSectors by the number of lost sectors.
if err := tx.Exec("UPDATE hosts SET lost_sectors = lost_sectors + ? WHERE public_key = ?", len(sectors), publicKey(hk)).Error; err != nil {
Expand Down Expand Up @@ -1687,6 +1689,7 @@ func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, roo
}
return nil
})
return deletedSectors, err
}

func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, metadata api.ObjectUserMetadata, o object.Object) error {
Expand Down
4 changes: 3 additions & 1 deletion stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3582,8 +3582,10 @@ func TestDeleteHostSector(t *testing.T) {
}

// Prune the sector from hk1.
if err := ss.DeleteHostSector(context.Background(), hk1, root); err != nil {
if n, err := ss.DeleteHostSector(context.Background(), hk1, root); err != nil {
t.Fatal(err)
} else if n != 2 {
t.Fatal("no sectors were pruned", n)
}

// Make sure 2 contractSector entries exist.
Expand Down
101 changes: 101 additions & 0 deletions worker/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package worker

import (
"bytes"
"context"
"io"
"testing"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/renterd/api"
"lukechampine.com/frand"
)

// zeroReader is a reader that leaves the buffer unchanged and returns no error.
// It's useful for benchmarks that need to produce data for uploading and should
// be used together with a io.LimitReader.
type zeroReader struct{}

func (z *zeroReader) Read(p []byte) (n int, err error) {
return len(p), nil
}

// BenchmarkDownlaoderSingleObject benchmarks downloading a single, slab-sized
// object.
// 1036.74 MB/s | M2 Pro | c9dc1b6
func BenchmarkDownloaderSingleObject(b *testing.B) {
w := newTestWorker(b)

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

data := bytes.NewReader(frand.Bytes(int(up.rs.SlabSizeNoRedundancy())))
_, _, err := w.uploadManager.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
o, err := w.os.Object(context.Background(), testBucket, up.path, api.GetObjectOptions{})
if err != nil {
b.Fatal(err)
}

b.SetBytes(o.Object.Size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = w.downloadManager.DownloadObject(context.Background(), io.Discard, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts())
if err != nil {
b.Fatal(err)
}
}
}

// BenchmarkUploaderSingleObject benchmarks uploading a single object.
//
// Speed | CPU | Commit
// 433.86 MB/s | M2 Pro | bae6e77
func BenchmarkUploaderSingleObject(b *testing.B) {
w := newTestWorker(b)

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

data := io.LimitReader(&zeroReader{}, int64(b.N*rhpv2.SectorSize*up.rs.MinShards))
b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards))
b.ResetTimer()

_, _, err := w.uploadManager.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
}

// BenchmarkUploaderSingleObject benchmarks uploading one object per slab.
//
// Speed | CPU | Commit
// 282.47 MB/s | M2 Pro | bae6e77
func BenchmarkUploaderMultiObject(b *testing.B) {
w := newTestWorker(b)

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards))
b.ResetTimer()

for i := 0; i < b.N; i++ {
data := io.LimitReader(&zeroReader{}, int64(rhpv2.SectorSize*up.rs.MinShards))
_, _, err := w.uploadManager.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
}
}
17 changes: 8 additions & 9 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o
hosts[c.HostKey] = struct{}{}
}

// buffer the writer
bw := bufio.NewWriter(w)
defer bw.Flush()

// create the cipher writer
cw := o.Key.Decrypt(bw, offset)
cw := o.Key.Decrypt(w, offset)

// buffer the writer we recover to making sure that we don't hammer the
// response writer with tiny writes
bw := bufio.NewWriter(cw)
defer bw.Flush()

// create response chan and ensure it's closed properly
var wg sync.WaitGroup
Expand Down Expand Up @@ -314,15 +315,15 @@ outer:
s := slabs[respIndex]
if s.PartialSlab {
// Partial slab.
_, err = cw.Write(s.Data)
_, err = bw.Write(s.Data)
if err != nil {
mgr.logger.Errorf("failed to send partial slab", respIndex, err)
return err
}
} else {
// Regular slab.
slabs[respIndex].Decrypt(next.shards)
err := slabs[respIndex].Recover(cw, next.shards)
err := slabs[respIndex].Recover(bw, next.shards)
if err != nil {
mgr.logger.Errorf("failed to recover slab %v: %v", respIndex, err)
return err
Expand Down Expand Up @@ -761,8 +762,6 @@ loop:
if isSectorNotFound(resp.err) {
if err := s.mgr.os.DeleteHostSector(ctx, resp.req.host.PublicKey(), resp.req.root); err != nil {
s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.host.PublicKey(), "root", resp.req.root, zap.Error(err))
} else {
s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.host.PublicKey(), "root", resp.req.root)
}
} else if isPriceTableGouging(resp.err) && s.overpay && !resp.req.overpay {
resp.req.overpay = true // ensures we don't retry the same request over and over again
Expand Down
18 changes: 9 additions & 9 deletions worker/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type (
PublicKey() types.PublicKey

DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error
UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error)
UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error

FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error)
FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error)
Expand Down Expand Up @@ -117,11 +117,11 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2
})
}

func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (root types.Hash256, err error) {
func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (err error) {
// fetch price table
pt, err := h.priceTable(ctx, nil)
if err != nil {
return types.Hash256{}, err
return err
}

// prepare payment
Expand All @@ -130,28 +130,28 @@ func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte,
// insufficient balance error
expectedCost, _, _, err := uploadSectorCost(pt, rev.WindowEnd)
if err != nil {
return types.Hash256{}, err
return err
}
if rev.RevisionNumber == math.MaxUint64 {
return types.Hash256{}, fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
return fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
}
payment, ok := rhpv3.PayByContract(&rev, expectedCost, h.acc.id, h.renterKey)
if !ok {
return types.Hash256{}, errors.New("failed to create payment")
return errors.New("failed to create payment")
}

var cost types.Currency
err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error {
root, cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sector)
cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sectorRoot, sector)
return err
})
if err != nil {
return types.Hash256{}, err
return err
}

// record spending
h.contractSpendingRecorder.Record(rev, api.ContractSpending{Uploads: cost})
return root, nil
return nil
}

func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) {
Expand Down
13 changes: 6 additions & 7 deletions worker/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ type (
}

testHostManager struct {
t *testing.T
t test

mu sync.Mutex
hosts map[types.PublicKey]*testHost
}
)

func newTestHostManager(t *testing.T) *testHostManager {
func newTestHostManager(t test) *testHostManager {
return &testHostManager{t: t, hosts: make(map[types.PublicKey]*testHost)}
}

Expand Down Expand Up @@ -90,8 +90,9 @@ func (h *testHost) DownloadSector(ctx context.Context, w io.Writer, root types.H
return err
}

func (h *testHost) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) {
return h.AddSector(sector), nil
func (h *testHost) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error {
h.AddSector(sector)
return nil
}

func (h *testHost) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (rev types.FileContractRevision, _ error) {
Expand Down Expand Up @@ -126,11 +127,9 @@ func TestHost(t *testing.T) {

// upload the sector
sector, root := newTestSector()
uploaded, err := h.UploadSector(context.Background(), sector, types.FileContractRevision{})
err := h.UploadSector(context.Background(), root, sector, types.FileContractRevision{})
if err != nil {
t.Fatal(err)
} else if uploaded != root {
t.Fatal("root mismatch")
}

// download entire sector
Expand Down
Loading

0 comments on commit 04de30b

Please sign in to comment.