Skip to content

Commit

Permalink
Merge pull request #990 from SiaFoundation/chris/upload-benchmark
Browse files Browse the repository at this point in the history
Add upload benchmark for uploader with mocked hosts
  • Loading branch information
ChrisSchinnerl authored Feb 27, 2024
2 parents 97b017e + 89b4729 commit 75451b0
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 128 deletions.
17 changes: 17 additions & 0 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package object
import (
"bytes"
"crypto/cipher"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -142,6 +143,22 @@ func (o Object) Contracts() map[types.PublicKey]map[types.FileContractID]struct{
return usedContracts
}

func (o *Object) ComputeETag() string {
// calculate the eTag using the precomputed sector roots to avoid having to
// hash the entire object again.
h := md5.New()
b := make([]byte, 8)
for _, slab := range o.Slabs {
binary.LittleEndian.PutUint32(b[:4], slab.Offset)
binary.LittleEndian.PutUint32(b[4:], slab.Length)
h.Write(b)
for _, shard := range slab.Shards {
h.Write(shard.Root[:])
}
}
return string(hex.EncodeToString(h.Sum(nil)))
}

// TotalSize returns the total size of the object.
func (o Object) TotalSize() int64 {
var n int64
Expand Down
31 changes: 22 additions & 9 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package object
import (
"bytes"
"io"
"sync"

"github.com/klauspost/reedsolomon"
rhpv2 "go.sia.tech/core/rhp/v2"
Expand Down Expand Up @@ -79,11 +80,17 @@ func (s Slab) Length() int {
// Encrypt xors shards with the keystream derived from s.Key, using a
// different nonce for each shard.
func (s Slab) Encrypt(shards [][]byte) {
for i, shard := range shards {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:])
c.XORKeyStream(shard, shard)
var wg sync.WaitGroup
for i := range shards {
wg.Add(1)
go func(i int) {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:])
c.XORKeyStream(shards[i], shards[i])
wg.Done()
}(i)
}
wg.Wait()
}

// Encode encodes slab data into sector-sized shards. The supplied shards should
Expand Down Expand Up @@ -151,12 +158,18 @@ func (ss SlabSlice) SectorRegion() (offset, length uint32) {
// slice offset), using a different nonce for each shard.
func (ss SlabSlice) Decrypt(shards [][]byte) {
offset := ss.Offset / (rhpv2.LeafSize * uint32(ss.MinShards))
for i, shard := range shards {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:])
c.SetCounter(offset)
c.XORKeyStream(shard, shard)
var wg sync.WaitGroup
for i := range shards {
wg.Add(1)
go func(i int) {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:])
c.SetCounter(offset)
c.XORKeyStream(shards[i], shards[i])
wg.Done()
}(i)
}
wg.Wait()
}

// Recover recovers a slice of slab data from the supplied shards.
Expand Down
101 changes: 101 additions & 0 deletions worker/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package worker

import (
"bytes"
"context"
"io"
"testing"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/renterd/api"
"lukechampine.com/frand"
)

// zeroReader is a reader that leaves the buffer unchanged and returns no error.
// It's useful for benchmarks that need to produce data for uploading and should
// be used together with a io.LimitReader.
type zeroReader struct{}

func (z *zeroReader) Read(p []byte) (n int, err error) {
return len(p), nil
}

// BenchmarkDownlaoderSingleObject benchmarks downloading a single, slab-sized
// object.
// 1036.74 MB/s | M2 Pro | c9dc1b6
func BenchmarkDownloaderSingleObject(b *testing.B) {
w := newMockWorker()

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

data := bytes.NewReader(frand.Bytes(int(up.rs.SlabSizeNoRedundancy())))
_, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
o, err := w.os.Object(context.Background(), testBucket, up.path, api.GetObjectOptions{})
if err != nil {
b.Fatal(err)
}

b.SetBytes(o.Object.Size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = w.dl.DownloadObject(context.Background(), io.Discard, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts())
if err != nil {
b.Fatal(err)
}
}
}

// BenchmarkUploaderSingleObject benchmarks uploading a single object.
//
// Speed | CPU | Commit
// 433.86 MB/s | M2 Pro | bae6e77
func BenchmarkUploaderSingleObject(b *testing.B) {
w := newMockWorker()

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

data := io.LimitReader(&zeroReader{}, int64(b.N*rhpv2.SectorSize*up.rs.MinShards))
b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards))
b.ResetTimer()

_, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
}

// BenchmarkUploaderSingleObject benchmarks uploading one object per slab.
//
// Speed | CPU | Commit
// 282.47 MB/s | M2 Pro | bae6e77
func BenchmarkUploaderMultiObject(b *testing.B) {
w := newMockWorker()

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards))
b.ResetTimer()

for i := 0; i < b.N; i++ {
data := io.LimitReader(&zeroReader{}, int64(rhpv2.SectorSize*up.rs.MinShards))
_, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
}
}
15 changes: 8 additions & 7 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o
hosts[c.HostKey] = struct{}{}
}

// buffer the writer
bw := bufio.NewWriter(w)
defer bw.Flush()

// create the cipher writer
cw := o.Key.Decrypt(bw, offset)
cw := o.Key.Decrypt(w, offset)

// buffer the writer we recover to making sure that we don't hammer the
// response writer with tiny writes
bw := bufio.NewWriter(cw)
defer bw.Flush()

// create response chan and ensure it's closed properly
var wg sync.WaitGroup
Expand Down Expand Up @@ -314,15 +315,15 @@ outer:
s := slabs[respIndex]
if s.PartialSlab {
// Partial slab.
_, err = cw.Write(s.Data)
_, err = bw.Write(s.Data)
if err != nil {
mgr.logger.Errorf("failed to send partial slab", respIndex, err)
return err
}
} else {
// Regular slab.
slabs[respIndex].Decrypt(next.shards)
err := slabs[respIndex].Recover(cw, next.shards)
err := slabs[respIndex].Recover(bw, next.shards)
if err != nil {
mgr.logger.Errorf("failed to recover slab %v: %v", respIndex, err)
return err
Expand Down
18 changes: 9 additions & 9 deletions worker/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type (
PublicKey() types.PublicKey

DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error
UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error)
UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error

FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error)
FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error)
Expand Down Expand Up @@ -121,11 +121,11 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2
})
}

func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (root types.Hash256, err error) {
func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (err error) {
// fetch price table
pt, err := h.priceTable(ctx, nil)
if err != nil {
return types.Hash256{}, err
return err
}

// prepare payment
Expand All @@ -134,28 +134,28 @@ func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte,
// insufficient balance error
expectedCost, _, _, err := uploadSectorCost(pt, rev.WindowEnd)
if err != nil {
return types.Hash256{}, err
return err
}
if rev.RevisionNumber == math.MaxUint64 {
return types.Hash256{}, fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
return fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
}
payment, ok := rhpv3.PayByContract(&rev, expectedCost, h.acc.id, h.renterKey)
if !ok {
return types.Hash256{}, errors.New("failed to create payment")
return errors.New("failed to create payment")
}

var cost types.Currency
err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error {
root, cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sector)
cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sectorRoot, sector)
return err
})
if err != nil {
return types.Hash256{}, err
return err
}

// record spending
h.contractSpendingRecorder.Record(rev, api.ContractSpending{Uploads: cost})
return root, nil
return nil
}

func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) {
Expand Down
4 changes: 1 addition & 3 deletions worker/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ func TestHost(t *testing.T) {
sector, root := newMockSector()

// upload the sector
uploaded, err := h.UploadSector(context.Background(), sector, types.FileContractRevision{})
err := h.UploadSector(context.Background(), rhpv2.SectorRoot(sector), sector, types.FileContractRevision{})
if err != nil {
t.Fatal(err)
} else if uploaded != root {
t.Fatal("root mismatch")
}

// download entire sector
Expand Down
11 changes: 5 additions & 6 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,9 @@ func (h *mockHost) DownloadSector(ctx context.Context, w io.Writer, root types.H
return err
}

func (h *mockHost) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) {
return h.contract().addSector(sector), nil
func (h *mockHost) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error {
h.contract().addSector(sectorRoot, sector)
return nil
}

func (h *mockHost) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (rev types.FileContractRevision, _ error) {
Expand Down Expand Up @@ -448,12 +449,10 @@ func newMockContract(hk types.PublicKey, fcid types.FileContractID) *mockContrac
}
}

func (c *mockContract) addSector(sector *[rhpv2.SectorSize]byte) (root types.Hash256) {
root = rhpv2.SectorRoot(sector)
func (c *mockContract) addSector(sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) {
c.mu.Lock()
c.sectors[root] = sector
c.sectors[sectorRoot] = sector
c.mu.Unlock()
return
}

func (c *mockContract) sector(root types.Hash256) (sector *[rhpv2.SectorSize]byte, found bool) {
Expand Down
21 changes: 10 additions & 11 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,17 +764,17 @@ func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.Ho
return
}

func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sector *[rhpv2.SectorSize]byte) (sectorRoot types.Hash256, cost types.Currency, err error) {
func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) (cost types.Currency, err error) {
defer wrapErr(&err, "AppendSector")

// sanity check revision first
if rev.RevisionNumber == math.MaxUint64 {
return types.Hash256{}, types.ZeroCurrency, errMaxRevisionReached
return types.ZeroCurrency, errMaxRevisionReached
}

s, err := t.DialStream(ctx)
if err != nil {
return types.Hash256{}, types.ZeroCurrency, err
return types.ZeroCurrency, err
}
defer s.Close()

Expand Down Expand Up @@ -804,7 +804,7 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat
// compute expected collateral and refund
expectedCost, expectedCollateral, expectedRefund, err := uploadSectorCost(pt, rev.WindowEnd)
if err != nil {
return types.Hash256{}, types.ZeroCurrency, err
return types.ZeroCurrency, err
}

// apply leeways.
Expand All @@ -815,13 +815,13 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat

// check if the cost, collateral and refund match our expectation.
if executeResp.TotalCost.Cmp(expectedCost) > 0 {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String())
return types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String())
}
if executeResp.FailureRefund.Cmp(expectedRefund) < 0 {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String())
return types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String())
}
if executeResp.AdditionalCollateral.Cmp(expectedCollateral) < 0 {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String())
return types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String())
}

// set the cost and refund
Expand All @@ -845,26 +845,25 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat
collateral := executeResp.AdditionalCollateral.Add(executeResp.FailureRefund)

// check proof
sectorRoot = rhpv2.SectorRoot(sector)
if rev.Filesize == 0 {
// For the first upload to a contract we don't get a proof. So we just
// assert that the new contract root matches the root of the sector.
if rev.Filesize == 0 && executeResp.NewMerkleRoot != sectorRoot {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot)
return types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot)
}
} else {
// Otherwise we make sure the proof was transmitted and verify it.
actions := []rhpv2.RPCWriteAction{{Type: rhpv2.RPCWriteActionAppend}} // TODO: change once rhpv3 support is available
if !rhpv2.VerifyDiffProof(actions, rev.Filesize/rhpv2.SectorSize, executeResp.Proof, []types.Hash256{}, rev.FileMerkleRoot, executeResp.NewMerkleRoot, []types.Hash256{sectorRoot}) {
return types.Hash256{}, types.ZeroCurrency, errors.New("proof verification failed")
return types.ZeroCurrency, errors.New("proof verification failed")
}
}

// finalize the program with a new revision.
newRevision := *rev
newValid, newMissed, err := updateRevisionOutputs(&newRevision, types.ZeroCurrency, collateral)
if err != nil {
return types.Hash256{}, types.ZeroCurrency, err
return types.ZeroCurrency, err
}
newRevision.Filesize += rhpv2.SectorSize
newRevision.RevisionNumber++
Expand Down
Loading

0 comments on commit 75451b0

Please sign in to comment.