Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upload benchmark for uploader with mocked hosts #990

Merged
merged 15 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package object
import (
"bytes"
"crypto/cipher"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -142,6 +143,22 @@ func (o Object) Contracts() map[types.PublicKey]map[types.FileContractID]struct{
return usedContracts
}

func (o *Object) ComputeETag() string {
// calculate the eTag using the precomputed sector roots to avoid having to
// hash the entire object again.
h := md5.New()
b := make([]byte, 8)
for _, slab := range o.Slabs {
binary.LittleEndian.PutUint32(b[:4], slab.Offset)
binary.LittleEndian.PutUint32(b[4:], slab.Length)
h.Write(b)
for _, shard := range slab.Shards {
h.Write(shard.Root[:])
}
}
return string(hex.EncodeToString(h.Sum(nil)))
}

// TotalSize returns the total size of the object.
func (o Object) TotalSize() int64 {
var n int64
Expand Down
31 changes: 22 additions & 9 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package object
import (
"bytes"
"io"
"sync"

"github.com/klauspost/reedsolomon"
rhpv2 "go.sia.tech/core/rhp/v2"
Expand Down Expand Up @@ -79,11 +80,17 @@ func (s Slab) Length() int {
// Encrypt xors shards with the keystream derived from s.Key, using a
// different nonce for each shard.
func (s Slab) Encrypt(shards [][]byte) {
for i, shard := range shards {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:])
c.XORKeyStream(shard, shard)
var wg sync.WaitGroup
for i := range shards {
wg.Add(1)
go func(i int) {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:])
c.XORKeyStream(shards[i], shards[i])
wg.Done()
}(i)
}
wg.Wait()
}

// Encode encodes slab data into sector-sized shards. The supplied shards should
Expand Down Expand Up @@ -151,12 +158,18 @@ func (ss SlabSlice) SectorRegion() (offset, length uint32) {
// slice offset), using a different nonce for each shard.
func (ss SlabSlice) Decrypt(shards [][]byte) {
offset := ss.Offset / (rhpv2.LeafSize * uint32(ss.MinShards))
for i, shard := range shards {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:])
c.SetCounter(offset)
c.XORKeyStream(shard, shard)
var wg sync.WaitGroup
for i := range shards {
wg.Add(1)
go func(i int) {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:])
c.SetCounter(offset)
c.XORKeyStream(shards[i], shards[i])
wg.Done()
}(i)
}
wg.Wait()
}

// Recover recovers a slice of slab data from the supplied shards.
Expand Down
101 changes: 101 additions & 0 deletions worker/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package worker

import (
"bytes"
"context"
"io"
"testing"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/renterd/api"
"lukechampine.com/frand"
)

// zeroReader is a reader that leaves the buffer unchanged and returns no error.
// It's useful for benchmarks that need to produce data for uploading and should
// be used together with a io.LimitReader.
type zeroReader struct{}

func (z *zeroReader) Read(p []byte) (n int, err error) {
return len(p), nil
}

// BenchmarkDownlaoderSingleObject benchmarks downloading a single, slab-sized
// object.
// 1036.74 MB/s | M2 Pro | c9dc1b6
func BenchmarkDownloaderSingleObject(b *testing.B) {
w := newMockWorker()

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

data := bytes.NewReader(frand.Bytes(int(up.rs.SlabSizeNoRedundancy())))
_, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
o, err := w.os.Object(context.Background(), testBucket, up.path, api.GetObjectOptions{})
if err != nil {
b.Fatal(err)
}

b.SetBytes(o.Object.Size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = w.dl.DownloadObject(context.Background(), io.Discard, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts())
if err != nil {
b.Fatal(err)
}
}
}

// BenchmarkUploaderSingleObject benchmarks uploading a single object.
//
// Speed | CPU | Commit
// 433.86 MB/s | M2 Pro | bae6e77
func BenchmarkUploaderSingleObject(b *testing.B) {
w := newMockWorker()

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

data := io.LimitReader(&zeroReader{}, int64(b.N*rhpv2.SectorSize*up.rs.MinShards))
b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards))
b.ResetTimer()

_, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
}

// BenchmarkUploaderSingleObject benchmarks uploading one object per slab.
//
// Speed | CPU | Commit
// 282.47 MB/s | M2 Pro | bae6e77
func BenchmarkUploaderMultiObject(b *testing.B) {
w := newMockWorker()

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards))
b.ResetTimer()

for i := 0; i < b.N; i++ {
data := io.LimitReader(&zeroReader{}, int64(rhpv2.SectorSize*up.rs.MinShards))
_, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
}
}
15 changes: 8 additions & 7 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o
hosts[c.HostKey] = struct{}{}
}

// buffer the writer
bw := bufio.NewWriter(w)
defer bw.Flush()

// create the cipher writer
cw := o.Key.Decrypt(bw, offset)
cw := o.Key.Decrypt(w, offset)

// buffer the writer we recover to making sure that we don't hammer the
// response writer with tiny writes
bw := bufio.NewWriter(cw)
defer bw.Flush()

// create response chan and ensure it's closed properly
var wg sync.WaitGroup
Expand Down Expand Up @@ -314,15 +315,15 @@ outer:
s := slabs[respIndex]
if s.PartialSlab {
// Partial slab.
_, err = cw.Write(s.Data)
_, err = bw.Write(s.Data)
if err != nil {
mgr.logger.Errorf("failed to send partial slab", respIndex, err)
return err
}
} else {
// Regular slab.
slabs[respIndex].Decrypt(next.shards)
err := slabs[respIndex].Recover(cw, next.shards)
err := slabs[respIndex].Recover(bw, next.shards)
if err != nil {
mgr.logger.Errorf("failed to recover slab %v: %v", respIndex, err)
return err
Expand Down
18 changes: 9 additions & 9 deletions worker/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type (
PublicKey() types.PublicKey

DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error
UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error)
UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error

FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error)
FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error)
Expand Down Expand Up @@ -121,11 +121,11 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2
})
}

func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (root types.Hash256, err error) {
func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (err error) {
// fetch price table
pt, err := h.priceTable(ctx, nil)
if err != nil {
return types.Hash256{}, err
return err
}

// prepare payment
Expand All @@ -134,28 +134,28 @@ func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte,
// insufficient balance error
expectedCost, _, _, err := uploadSectorCost(pt, rev.WindowEnd)
if err != nil {
return types.Hash256{}, err
return err
}
if rev.RevisionNumber == math.MaxUint64 {
return types.Hash256{}, fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
return fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
}
payment, ok := rhpv3.PayByContract(&rev, expectedCost, h.acc.id, h.renterKey)
if !ok {
return types.Hash256{}, errors.New("failed to create payment")
return errors.New("failed to create payment")
}

var cost types.Currency
err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error {
root, cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sector)
cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sectorRoot, sector)
return err
})
if err != nil {
return types.Hash256{}, err
return err
}

// record spending
h.contractSpendingRecorder.Record(rev, api.ContractSpending{Uploads: cost})
return root, nil
return nil
}

func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) {
Expand Down
4 changes: 1 addition & 3 deletions worker/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ func TestHost(t *testing.T) {
sector, root := newMockSector()

// upload the sector
uploaded, err := h.UploadSector(context.Background(), sector, types.FileContractRevision{})
err := h.UploadSector(context.Background(), rhpv2.SectorRoot(sector), sector, types.FileContractRevision{})
if err != nil {
t.Fatal(err)
} else if uploaded != root {
t.Fatal("root mismatch")
}

// download entire sector
Expand Down
11 changes: 5 additions & 6 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,9 @@ func (h *mockHost) DownloadSector(ctx context.Context, w io.Writer, root types.H
return err
}

func (h *mockHost) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) {
return h.contract().addSector(sector), nil
func (h *mockHost) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error {
h.contract().addSector(sectorRoot, sector)
return nil
}

func (h *mockHost) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (rev types.FileContractRevision, _ error) {
Expand Down Expand Up @@ -448,12 +449,10 @@ func newMockContract(hk types.PublicKey, fcid types.FileContractID) *mockContrac
}
}

func (c *mockContract) addSector(sector *[rhpv2.SectorSize]byte) (root types.Hash256) {
root = rhpv2.SectorRoot(sector)
func (c *mockContract) addSector(sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) {
c.mu.Lock()
c.sectors[root] = sector
c.sectors[sectorRoot] = sector
c.mu.Unlock()
return
}

func (c *mockContract) sector(root types.Hash256) (sector *[rhpv2.SectorSize]byte, found bool) {
Expand Down
21 changes: 10 additions & 11 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,17 +789,17 @@ func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.Ho
return
}

func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sector *[rhpv2.SectorSize]byte) (sectorRoot types.Hash256, cost types.Currency, err error) {
func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) (cost types.Currency, err error) {
defer wrapErr(&err, "AppendSector")

// sanity check revision first
if rev.RevisionNumber == math.MaxUint64 {
return types.Hash256{}, types.ZeroCurrency, errMaxRevisionReached
return types.ZeroCurrency, errMaxRevisionReached
}

s, err := t.DialStream(ctx)
if err != nil {
return types.Hash256{}, types.ZeroCurrency, err
return types.ZeroCurrency, err
}
defer s.Close()

Expand Down Expand Up @@ -829,7 +829,7 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat
// compute expected collateral and refund
expectedCost, expectedCollateral, expectedRefund, err := uploadSectorCost(pt, rev.WindowEnd)
if err != nil {
return types.Hash256{}, types.ZeroCurrency, err
return types.ZeroCurrency, err
}

// apply leeways.
Expand All @@ -840,13 +840,13 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat

// check if the cost, collateral and refund match our expectation.
if executeResp.TotalCost.Cmp(expectedCost) > 0 {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String())
return types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String())
}
if executeResp.FailureRefund.Cmp(expectedRefund) < 0 {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String())
return types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String())
}
if executeResp.AdditionalCollateral.Cmp(expectedCollateral) < 0 {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String())
return types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String())
}

// set the cost and refund
Expand All @@ -870,26 +870,25 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat
collateral := executeResp.AdditionalCollateral.Add(executeResp.FailureRefund)

// check proof
sectorRoot = rhpv2.SectorRoot(sector)
if rev.Filesize == 0 {
// For the first upload to a contract we don't get a proof. So we just
// assert that the new contract root matches the root of the sector.
if rev.Filesize == 0 && executeResp.NewMerkleRoot != sectorRoot {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot)
return types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot)
}
} else {
// Otherwise we make sure the proof was transmitted and verify it.
actions := []rhpv2.RPCWriteAction{{Type: rhpv2.RPCWriteActionAppend}} // TODO: change once rhpv3 support is available
if !rhpv2.VerifyDiffProof(actions, rev.Filesize/rhpv2.SectorSize, executeResp.Proof, []types.Hash256{}, rev.FileMerkleRoot, executeResp.NewMerkleRoot, []types.Hash256{sectorRoot}) {
return types.Hash256{}, types.ZeroCurrency, errors.New("proof verification failed")
return types.ZeroCurrency, errors.New("proof verification failed")
}
}

// finalize the program with a new revision.
newRevision := *rev
newValid, newMissed, err := updateRevisionOutputs(&newRevision, types.ZeroCurrency, collateral)
if err != nil {
return types.Hash256{}, types.ZeroCurrency, err
return types.ZeroCurrency, err
}
newRevision.Filesize += rhpv2.SectorSize
newRevision.RevisionNumber++
Expand Down
Loading
Loading