From fb5bc3519bbd4f3321457aca056d8ad3cd90800a Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 21 Feb 2024 16:12:46 +0100 Subject: [PATCH 01/15] worker: add benchmarks for upload through uploader --- worker/bench_test.go | 88 ++++++++++++++++++++++++++++++++++++++++++++ worker/upload.go | 39 +++++++++++++------- 2 files changed, 113 insertions(+), 14 deletions(-) create mode 100644 worker/bench_test.go diff --git a/worker/bench_test.go b/worker/bench_test.go new file mode 100644 index 000000000..d5db86c0a --- /dev/null +++ b/worker/bench_test.go @@ -0,0 +1,88 @@ +package worker + +import ( + "context" + "io" + "sync" + "testing" + + rhpv2 "go.sia.tech/core/rhp/v2" +) + +// zeroReader is a reader that leaves the buffer unchanged and returns no error. +// It's useful for benchmarks that need to produce data for uploading and should +// be used together with a io.LimitReader. +type zeroReader struct{} + +func (z *zeroReader) Read(p []byte) (n int, err error) { + return len(p), nil +} + +// BenchmarkUploaderPacking benchmarks the Upload function with packing +// disabled. +func BenchmarkUploaderNoPacking(b *testing.B) { + w := newMockWorker() + + minDataPieces := 10 + totalDataPieces := 30 + + w.addHosts(totalDataPieces) + + // create a reader that returns dev/null + data := io.LimitReader(&zeroReader{}, int64(b.N*rhpv2.SectorSize*minDataPieces)) + + up := testParameters(b.TempDir()) + up.rs.MinShards = minDataPieces + up.rs.TotalShards = totalDataPieces + up.packing = false + + b.ResetTimer() + + _, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload) + if err != nil { + b.Fatal(err) + } + b.SetBytes(int64(rhpv2.SectorSize * minDataPieces)) +} + +// BenchmarkSectorRoot30Goroutines benchmarks the SectorRoot function with 30 +// goroutines processing roots in parallel to simulate sequential uploads of +// slabs. +func BenchmarkSectorRoot30Goroutines(b *testing.B) { + data := make([]byte, rhpv2.SectorSize) + b.SetBytes(int64(rhpv2.SectorSize)) + + // spin up workers + c := make(chan struct{}) + work := func() { + for range c { + rhpv2.SectorRoot((*[rhpv2.SectorSize]byte)(data)) + } + } + var wg sync.WaitGroup + for i := 0; i < 30; i++ { + wg.Add(1) + go func() { + work() + wg.Done() + }() + } + b.ResetTimer() + + // run the benchmark + for i := 0; i < b.N; i++ { + c <- struct{}{} + } + close(c) + wg.Wait() +} + +// BenchmarkSectorRootSingleGoroutine benchmarks the SectorRoot function. +func BenchmarkSectorRootSingleGoroutine(b *testing.B) { + data := make([]byte, rhpv2.SectorSize) + b.SetBytes(rhpv2.SectorSize) + b.ResetTimer() + for i := 0; i < b.N; i++ { + rhpv2.SectorRoot((*[rhpv2.SectorSize]byte)(data)) + } +} diff --git a/worker/upload.go b/worker/upload.go index 72c65bf07..911593929 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -765,20 +765,26 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [ responseChan := make(chan sectorUploadResp) // prepare sectors + var wg sync.WaitGroup sectors := make([]*sectorUpload, len(shards)) - for sI, shard := range shards { - // create the ctx - sCtx, sCancel := context.WithCancel(ctx) - - // create the sector - sectors[sI] = §orUpload{ - data: (*[rhpv2.SectorSize]byte)(shard), - index: sI, - root: rhpv2.SectorRoot((*[rhpv2.SectorSize]byte)(shard)), - ctx: sCtx, - cancel: sCancel, - } + for sI := range shards { + wg.Add(1) + go func(idx int) { + // create the ctx + sCtx, sCancel := context.WithCancel(ctx) + + // create the sector + sectors[idx] = §orUpload{ + data: (*[rhpv2.SectorSize]byte)(shards[idx]), + index: idx, + root: rhpv2.SectorRoot((*[rhpv2.SectorSize]byte)(shards[idx])), + ctx: sCtx, + cancel: sCancel, + } + wg.Done() + }(sI) } + wg.Wait() // prepare candidates candidates := make([]*candidate, len(uploaders)) @@ -833,8 +839,6 @@ func (u *upload) uploadSlab(ctx context.Context, rs api.RedundancySettings, data } func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates []*uploader, mem Memory, maxOverdrive uint64, overdriveTimeout time.Duration) (sectors []object.Sector, uploadSpeed int64, overdrivePct float64, err error) { - start := time.Now() - // ensure inflight uploads get cancelled ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -871,6 +875,10 @@ func (u *upload) uploadShards(ctx context.Context, shards [][]byte, candidates [ // create a request buffer var buffer []*sectorUploadReq + // start the timer after the upload has started + // newSlabUpload is quite slow due to computing the sector roots + start := time.Now() + // collect responses var used bool var done bool @@ -930,6 +938,9 @@ loop: // calculate the upload speed bytes := slab.numUploaded * rhpv2.SectorSize ms := time.Since(start).Milliseconds() + if ms == 0 { + ms = 1 + } uploadSpeed = int64(bytes) / ms // calculate overdrive pct From e1d45a46c643bd65dd87c130f51fc049b0df2401 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 21 Feb 2024 16:24:15 +0100 Subject: [PATCH 02/15] worker: add benchmark results --- worker/bench_test.go | 61 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/worker/bench_test.go b/worker/bench_test.go index d5db86c0a..d1f31d2a0 100644 --- a/worker/bench_test.go +++ b/worker/bench_test.go @@ -18,36 +18,66 @@ func (z *zeroReader) Read(p []byte) (n int, err error) { return len(p), nil } -// BenchmarkUploaderPacking benchmarks the Upload function with packing -// disabled. -func BenchmarkUploaderNoPacking(b *testing.B) { +// BenchmarkUploaderSingleObjectNoPacking benchmarks uploading a single object +// without packing. +// +// Speed | CPU | Commit +// 201.59 MB/s | M2 Pro | c31245f +func BenchmarkUploaderSingleObjectNoPacking(b *testing.B) { w := newMockWorker() - minDataPieces := 10 - totalDataPieces := 30 + up := testParameters(b.TempDir()) + up.rs.MinShards = 10 + up.rs.TotalShards = 30 + up.packing = false - w.addHosts(totalDataPieces) + w.addHosts(up.rs.TotalShards) // create a reader that returns dev/null - data := io.LimitReader(&zeroReader{}, int64(b.N*rhpv2.SectorSize*minDataPieces)) - - up := testParameters(b.TempDir()) - up.rs.MinShards = minDataPieces - up.rs.TotalShards = totalDataPieces - up.packing = false + data := io.LimitReader(&zeroReader{}, int64(b.N*rhpv2.SectorSize*up.rs.MinShards)) + b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards)) b.ResetTimer() - _, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload) if err != nil { b.Fatal(err) } - b.SetBytes(int64(rhpv2.SectorSize * minDataPieces)) +} + +// BenchmarkUploaderSingleObjectNoPacking benchmarks uploading one object per +// slab without packing. +// +// Speed | CPU | Commit +// 116.40 MB/s | M2 Pro | c31245f +func BenchmarkUploaderMultiObjectNoPacking(b *testing.B) { + w := newMockWorker() + + up := testParameters(b.TempDir()) + up.rs.MinShards = 10 + up.rs.TotalShards = 30 + up.packing = false + + w.addHosts(up.rs.TotalShards) + + // create a reader that returns dev/null + b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards)) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + data := io.LimitReader(&zeroReader{}, int64(rhpv2.SectorSize*up.rs.MinShards)) + _, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload) + if err != nil { + b.Fatal(err) + } + } } // BenchmarkSectorRoot30Goroutines benchmarks the SectorRoot function with 30 // goroutines processing roots in parallel to simulate sequential uploads of // slabs. +// +// Speed | CPU | Commit +// 1671.26 MB/s | M2 Pro | c31245f func BenchmarkSectorRoot30Goroutines(b *testing.B) { data := make([]byte, rhpv2.SectorSize) b.SetBytes(int64(rhpv2.SectorSize)) @@ -78,6 +108,9 @@ func BenchmarkSectorRoot30Goroutines(b *testing.B) { } // BenchmarkSectorRootSingleGoroutine benchmarks the SectorRoot function. +// +// Speed | CPU | Commit +// 176.43 MB/s | M2 Pro | c31245f func BenchmarkSectorRootSingleGoroutine(b *testing.B) { data := make([]byte, rhpv2.SectorSize) b.SetBytes(rhpv2.SectorSize) From fd166d80f1c791f879a8ed0801e5e2ff4d60a52c Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 21 Feb 2024 17:10:00 +0100 Subject: [PATCH 03/15] worker: docstring update --- worker/bench_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/worker/bench_test.go b/worker/bench_test.go index d1f31d2a0..84a3aaa8a 100644 --- a/worker/bench_test.go +++ b/worker/bench_test.go @@ -18,12 +18,11 @@ func (z *zeroReader) Read(p []byte) (n int, err error) { return len(p), nil } -// BenchmarkUploaderSingleObjectNoPacking benchmarks uploading a single object -// without packing. +// BenchmarkUploaderSingleObject benchmarks uploading a single object. // // Speed | CPU | Commit // 201.59 MB/s | M2 Pro | c31245f -func BenchmarkUploaderSingleObjectNoPacking(b *testing.B) { +func BenchmarkUploaderSingleObject(b *testing.B) { w := newMockWorker() up := testParameters(b.TempDir()) @@ -44,12 +43,11 @@ func BenchmarkUploaderSingleObjectNoPacking(b *testing.B) { } } -// BenchmarkUploaderSingleObjectNoPacking benchmarks uploading one object per -// slab without packing. +// BenchmarkUploaderSingleObject benchmarks uploading one object per slab. // // Speed | CPU | Commit // 116.40 MB/s | M2 Pro | c31245f -func BenchmarkUploaderMultiObjectNoPacking(b *testing.B) { +func BenchmarkUploaderMultiObject(b *testing.B) { w := newMockWorker() up := testParameters(b.TempDir()) From 9f27c17d597252de29e74d6c4d248f65c42e62a3 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 21 Feb 2024 17:26:14 +0100 Subject: [PATCH 04/15] worker: remove hash reader --- worker/upload.go | 15 ++++++++++----- worker/upload_utils.go | 27 --------------------------- 2 files changed, 10 insertions(+), 32 deletions(-) diff --git a/worker/upload.go b/worker/upload.go index 911593929..56444cded 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -2,6 +2,8 @@ package worker import ( "context" + "crypto/md5" + "encoding/hex" "errors" "fmt" "io" @@ -400,11 +402,8 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a // create the object o := object.NewObject(up.ec) - // create the hash reader - hr := newHashReader(r) - // create the cipher reader - cr, err := o.Encrypt(hr, up.encryptionOffset) + cr, err := o.Encrypt(r, up.encryptionOffset) if err != nil { return false, "", err } @@ -533,7 +532,13 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a } // calculate the eTag - eTag = hr.Hash() + h := md5.New() + for _, slab := range o.Slabs { + for _, shard := range slab.Shards { + h.Write(shard.Root[:]) + } + } + eTag = string(hex.EncodeToString(h.Sum(nil))) // add partial slabs if len(partialSlab) > 0 { diff --git a/worker/upload_utils.go b/worker/upload_utils.go index 4b5241b4d..306e1774f 100644 --- a/worker/upload_utils.go +++ b/worker/upload_utils.go @@ -2,11 +2,9 @@ package worker import ( "bytes" - "encoding/hex" "io" "github.com/gabriel-vasile/mimetype" - "go.sia.tech/core/types" "go.sia.tech/renterd/object" ) @@ -28,28 +26,3 @@ func newMimeReader(r io.Reader) (mimeType string, recycled io.Reader, err error) recycled = io.MultiReader(buf, r) return mtype.String(), recycled, err } - -type hashReader struct { - r io.Reader - h *types.Hasher -} - -func newHashReader(r io.Reader) *hashReader { - return &hashReader{ - r: r, - h: types.NewHasher(), - } -} - -func (e *hashReader) Read(p []byte) (int, error) { - n, err := e.r.Read(p) - if _, wErr := e.h.E.Write(p[:n]); wErr != nil { - return 0, wErr - } - return n, err -} - -func (e *hashReader) Hash() string { - sum := e.h.Sum() - return hex.EncodeToString(sum[:]) -} From 24fb81a4c249be120d8a59a22f39919f8dbd48e4 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 21 Feb 2024 17:28:32 +0100 Subject: [PATCH 05/15] worker: update benchmark result --- worker/bench_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/worker/bench_test.go b/worker/bench_test.go index 84a3aaa8a..d9264eddc 100644 --- a/worker/bench_test.go +++ b/worker/bench_test.go @@ -21,7 +21,7 @@ func (z *zeroReader) Read(p []byte) (n int, err error) { // BenchmarkUploaderSingleObject benchmarks uploading a single object. // // Speed | CPU | Commit -// 201.59 MB/s | M2 Pro | c31245f +// 217.35 MB/s | M2 Pro | afee1ac func BenchmarkUploaderSingleObject(b *testing.B) { w := newMockWorker() @@ -46,7 +46,7 @@ func BenchmarkUploaderSingleObject(b *testing.B) { // BenchmarkUploaderSingleObject benchmarks uploading one object per slab. // // Speed | CPU | Commit -// 116.40 MB/s | M2 Pro | c31245f +// 139.74 MB/s | M2 Pro | afee1ac func BenchmarkUploaderMultiObject(b *testing.B) { w := newMockWorker() @@ -75,7 +75,7 @@ func BenchmarkUploaderMultiObject(b *testing.B) { // slabs. // // Speed | CPU | Commit -// 1671.26 MB/s | M2 Pro | c31245f +// 1611.98 MB/s | M2 Pro | afee1ac func BenchmarkSectorRoot30Goroutines(b *testing.B) { data := make([]byte, rhpv2.SectorSize) b.SetBytes(int64(rhpv2.SectorSize)) @@ -108,7 +108,7 @@ func BenchmarkSectorRoot30Goroutines(b *testing.B) { // BenchmarkSectorRootSingleGoroutine benchmarks the SectorRoot function. // // Speed | CPU | Commit -// 176.43 MB/s | M2 Pro | c31245f +// 174.71 MB/s | M2 Pro | afee1ac func BenchmarkSectorRootSingleGoroutine(b *testing.B) { data := make([]byte, rhpv2.SectorSize) b.SetBytes(rhpv2.SectorSize) From 11d0152ba37b47c3c6c5867e8a1a43bc36366317 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 21 Feb 2024 18:02:49 +0100 Subject: [PATCH 06/15] worker: encrypt using multiple goroutines --- object/slab.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/object/slab.go b/object/slab.go index 9c3afa608..aa8bb7d45 100644 --- a/object/slab.go +++ b/object/slab.go @@ -3,6 +3,7 @@ package object import ( "bytes" "io" + "sync" "github.com/klauspost/reedsolomon" rhpv2 "go.sia.tech/core/rhp/v2" @@ -79,11 +80,17 @@ func (s Slab) Length() int { // Encrypt xors shards with the keystream derived from s.Key, using a // different nonce for each shard. func (s Slab) Encrypt(shards [][]byte) { - for i, shard := range shards { - nonce := [24]byte{1: byte(i)} - c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:]) - c.XORKeyStream(shard, shard) + var wg sync.WaitGroup + for i := range shards { + wg.Add(1) + go func(i int) { + nonce := [24]byte{1: byte(i)} + c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:]) + c.XORKeyStream(shards[i], shards[i]) + wg.Done() + }(i) } + wg.Wait() } // Encode encodes slab data into sector-sized shards. The supplied shards should From c91892506bab43f0163e14fa4bf509d0324f25d6 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 21 Feb 2024 18:05:31 +0100 Subject: [PATCH 07/15] worker: update benchmark results --- worker/bench_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/worker/bench_test.go b/worker/bench_test.go index d9264eddc..f864df9ca 100644 --- a/worker/bench_test.go +++ b/worker/bench_test.go @@ -21,7 +21,7 @@ func (z *zeroReader) Read(p []byte) (n int, err error) { // BenchmarkUploaderSingleObject benchmarks uploading a single object. // // Speed | CPU | Commit -// 217.35 MB/s | M2 Pro | afee1ac +// 232.97 MB/s | M2 Pro | 26d3119 func BenchmarkUploaderSingleObject(b *testing.B) { w := newMockWorker() @@ -46,7 +46,7 @@ func BenchmarkUploaderSingleObject(b *testing.B) { // BenchmarkUploaderSingleObject benchmarks uploading one object per slab. // // Speed | CPU | Commit -// 139.74 MB/s | M2 Pro | afee1ac +// 185.10 MB/s | M2 Pro | 26d3119 func BenchmarkUploaderMultiObject(b *testing.B) { w := newMockWorker() @@ -75,7 +75,7 @@ func BenchmarkUploaderMultiObject(b *testing.B) { // slabs. // // Speed | CPU | Commit -// 1611.98 MB/s | M2 Pro | afee1ac +// 1668.87 MB/s | M2 Pro | 26d3119 func BenchmarkSectorRoot30Goroutines(b *testing.B) { data := make([]byte, rhpv2.SectorSize) b.SetBytes(int64(rhpv2.SectorSize)) @@ -108,7 +108,7 @@ func BenchmarkSectorRoot30Goroutines(b *testing.B) { // BenchmarkSectorRootSingleGoroutine benchmarks the SectorRoot function. // // Speed | CPU | Commit -// 174.71 MB/s | M2 Pro | afee1ac +// 176.91 MB/s | M2 Pro | 26d3119 func BenchmarkSectorRootSingleGoroutine(b *testing.B) { data := make([]byte, rhpv2.SectorSize) b.SetBytes(rhpv2.SectorSize) From 5ceccfc411a614b57e7ef4faa995815f4be04f76 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 22 Feb 2024 09:40:54 +0100 Subject: [PATCH 08/15] worker: ComputeEtag method --- object/object.go | 17 +++++++++++++++++ worker/upload.go | 12 ++---------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/object/object.go b/object/object.go index 7c74c1c23..2331f6251 100644 --- a/object/object.go +++ b/object/object.go @@ -3,6 +3,7 @@ package object import ( "bytes" "crypto/cipher" + "crypto/md5" "encoding/binary" "encoding/hex" "fmt" @@ -142,6 +143,22 @@ func (o Object) Contracts() map[types.PublicKey]map[types.FileContractID]struct{ return usedContracts } +func (o *Object) ComputeETag() string { + // calculate the eTag using the precomputed sector roots to avoid having to + // hash the entire object again. + h := md5.New() + b := make([]byte, 8) + for _, slab := range o.Slabs { + binary.LittleEndian.PutUint32(b[:4], slab.Offset) + binary.LittleEndian.PutUint32(b[4:], slab.Length) + h.Write(b) + for _, shard := range slab.Shards { + h.Write(shard.Root[:]) + } + } + return string(hex.EncodeToString(h.Sum(nil))) +} + // TotalSize returns the total size of the object. func (o Object) TotalSize() int64 { var n int64 diff --git a/worker/upload.go b/worker/upload.go index 56444cded..232e05981 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -2,8 +2,6 @@ package worker import ( "context" - "crypto/md5" - "encoding/hex" "errors" "fmt" "io" @@ -531,14 +529,8 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a o.Slabs = append(o.Slabs, resp.slab) } - // calculate the eTag - h := md5.New() - for _, slab := range o.Slabs { - for _, shard := range slab.Shards { - h.Write(shard.Root[:]) - } - } - eTag = string(hex.EncodeToString(h.Sum(nil))) + // compute etag + eTag = o.ComputeETag() // add partial slabs if len(partialSlab) > 0 { From cfb8cb600273f92209e7df1706707bf3c2c741b8 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 22 Feb 2024 13:28:04 +0100 Subject: [PATCH 09/15] worker: pass expected root to UploadSector --- worker/host.go | 18 +++++++++--------- worker/host_test.go | 4 +--- worker/mocks_test.go | 11 +++++------ worker/rhpv3.go | 21 ++++++++++----------- worker/upload.go | 25 ++++--------------------- worker/uploader.go | 32 +++++++++++++++++--------------- 6 files changed, 46 insertions(+), 65 deletions(-) diff --git a/worker/host.go b/worker/host.go index 43e0891af..e5642efdd 100644 --- a/worker/host.go +++ b/worker/host.go @@ -21,7 +21,7 @@ type ( PublicKey() types.PublicKey DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error - UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) + UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error) @@ -121,11 +121,11 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 }) } -func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (root types.Hash256, err error) { +func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (err error) { // fetch price table pt, err := h.priceTable(ctx, nil) if err != nil { - return types.Hash256{}, err + return err } // prepare payment @@ -134,28 +134,28 @@ func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, // insufficient balance error expectedCost, _, _, err := uploadSectorCost(pt, rev.WindowEnd) if err != nil { - return types.Hash256{}, err + return err } if rev.RevisionNumber == math.MaxUint64 { - return types.Hash256{}, fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID) + return fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID) } payment, ok := rhpv3.PayByContract(&rev, expectedCost, h.acc.id, h.renterKey) if !ok { - return types.Hash256{}, errors.New("failed to create payment") + return errors.New("failed to create payment") } var cost types.Currency err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { - root, cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sector) + cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sectorRoot, sector) return err }) if err != nil { - return types.Hash256{}, err + return err } // record spending h.contractSpendingRecorder.Record(rev, api.ContractSpending{Uploads: cost}) - return root, nil + return nil } func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) { diff --git a/worker/host_test.go b/worker/host_test.go index 87d35fb36..78ce6b74e 100644 --- a/worker/host_test.go +++ b/worker/host_test.go @@ -16,11 +16,9 @@ func TestHost(t *testing.T) { sector, root := newMockSector() // upload the sector - uploaded, err := h.UploadSector(context.Background(), sector, types.FileContractRevision{}) + err := h.UploadSector(context.Background(), rhpv2.SectorRoot(sector), sector, types.FileContractRevision{}) if err != nil { t.Fatal(err) - } else if uploaded != root { - t.Fatal("root mismatch") } // download entire sector diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 2490941af..a28e9256c 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -396,8 +396,9 @@ func (h *mockHost) DownloadSector(ctx context.Context, w io.Writer, root types.H return err } -func (h *mockHost) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) { - return h.contract().addSector(sector), nil +func (h *mockHost) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error { + h.contract().addSector(sectorRoot, sector) + return nil } func (h *mockHost) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (rev types.FileContractRevision, _ error) { @@ -448,12 +449,10 @@ func newMockContract(hk types.PublicKey, fcid types.FileContractID) *mockContrac } } -func (c *mockContract) addSector(sector *[rhpv2.SectorSize]byte) (root types.Hash256) { - root = rhpv2.SectorRoot(sector) +func (c *mockContract) addSector(sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) { c.mu.Lock() - c.sectors[root] = sector + c.sectors[sectorRoot] = sector c.mu.Unlock() - return } func (c *mockContract) sector(root types.Hash256) (sector *[rhpv2.SectorSize]byte, found bool) { diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 03f67c6f6..b7ccfd69a 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -789,17 +789,17 @@ func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.Ho return } -func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sector *[rhpv2.SectorSize]byte) (sectorRoot types.Hash256, cost types.Currency, err error) { +func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) (cost types.Currency, err error) { defer wrapErr(&err, "AppendSector") // sanity check revision first if rev.RevisionNumber == math.MaxUint64 { - return types.Hash256{}, types.ZeroCurrency, errMaxRevisionReached + return types.ZeroCurrency, errMaxRevisionReached } s, err := t.DialStream(ctx) if err != nil { - return types.Hash256{}, types.ZeroCurrency, err + return types.ZeroCurrency, err } defer s.Close() @@ -829,7 +829,7 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat // compute expected collateral and refund expectedCost, expectedCollateral, expectedRefund, err := uploadSectorCost(pt, rev.WindowEnd) if err != nil { - return types.Hash256{}, types.ZeroCurrency, err + return types.ZeroCurrency, err } // apply leeways. @@ -840,13 +840,13 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat // check if the cost, collateral and refund match our expectation. if executeResp.TotalCost.Cmp(expectedCost) > 0 { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String()) + return types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String()) } if executeResp.FailureRefund.Cmp(expectedRefund) < 0 { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String()) + return types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String()) } if executeResp.AdditionalCollateral.Cmp(expectedCollateral) < 0 { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String()) + return types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String()) } // set the cost and refund @@ -870,18 +870,17 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat collateral := executeResp.AdditionalCollateral.Add(executeResp.FailureRefund) // check proof - sectorRoot = rhpv2.SectorRoot(sector) if rev.Filesize == 0 { // For the first upload to a contract we don't get a proof. So we just // assert that the new contract root matches the root of the sector. if rev.Filesize == 0 && executeResp.NewMerkleRoot != sectorRoot { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot) + return types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot) } } else { // Otherwise we make sure the proof was transmitted and verify it. actions := []rhpv2.RPCWriteAction{{Type: rhpv2.RPCWriteActionAppend}} // TODO: change once rhpv3 support is available if !rhpv2.VerifyDiffProof(actions, rev.Filesize/rhpv2.SectorSize, executeResp.Proof, []types.Hash256{}, rev.FileMerkleRoot, executeResp.NewMerkleRoot, []types.Hash256{sectorRoot}) { - return types.Hash256{}, types.ZeroCurrency, errors.New("proof verification failed") + return types.ZeroCurrency, errors.New("proof verification failed") } } @@ -889,7 +888,7 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat newRevision := *rev newValid, newMissed, err := updateRevisionOutputs(&newRevision, types.ZeroCurrency, collateral) if err != nil { - return types.Hash256{}, types.ZeroCurrency, err + return types.ZeroCurrency, err } newRevision.Filesize += rhpv2.SectorSize newRevision.RevisionNumber++ diff --git a/worker/upload.go b/worker/upload.go index 232e05981..63be07b2b 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -137,9 +137,8 @@ type ( } sectorUploadResp struct { - req *sectorUploadReq - root types.Hash256 - err error + req *sectorUploadReq + err error } ) @@ -1065,12 +1064,6 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { return false, false } - // sanity check we receive the expected root - if resp.root != req.sector.root { - s.errs[req.hk] = fmt.Errorf("root mismatch, %v != %v", resp.root, req.sector.root) - return false, false - } - // redundant sectors can't complete the upload if sector.uploaded.Root != (types.Hash256{}) { return false, false @@ -1080,7 +1073,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { sector.finish(object.Sector{ Contracts: map[types.PublicKey][]types.FileContractID{req.hk: {req.fcid}}, LatestHost: req.hk, - Root: resp.root, + Root: req.sector.root, }) // update uploaded sectors @@ -1127,7 +1120,7 @@ func (req *sectorUploadReq) done() bool { } } -func (req *sectorUploadReq) fail(err error) { +func (req *sectorUploadReq) finish(err error) { select { case <-req.sector.ctx.Done(): case req.responseChan <- sectorUploadResp{ @@ -1136,13 +1129,3 @@ func (req *sectorUploadReq) fail(err error) { }: } } - -func (req *sectorUploadReq) succeed(root types.Hash256) { - select { - case <-req.sector.ctx.Done(): - case req.responseChan <- sectorUploadResp{ - req: req, - root: root, - }: - } -} diff --git a/worker/uploader.go b/worker/uploader.go index dcff27eaf..fa1d04651 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -114,7 +114,7 @@ outer: } // execute it - root, elapsed, err := u.execute(req) + elapsed, err := u.execute(req) // the uploader's contract got renewed, requeue the request if errors.Is(err, errMaxRevisionReached) { @@ -125,10 +125,12 @@ outer: } // send the response - if err != nil { - req.fail(err) - } else { - req.succeed(root) + select { + case <-req.sector.ctx.Done(): + case req.responseChan <- sectorUploadResp{ + req: req, + err: err, + }: } // track the error, ignore gracefully closed streams and canceled overdrives @@ -151,7 +153,7 @@ func (u *uploader) Stop(err error) { break } if !upload.done() { - upload.fail(err) + upload.finish(err) } } } @@ -161,7 +163,7 @@ func (u *uploader) enqueue(req *sectorUploadReq) { // check for stopped if u.stopped { u.mu.Unlock() - go req.fail(errUploaderStopped) // don't block the caller + go req.finish(errUploaderStopped) // don't block the caller return } @@ -192,7 +194,7 @@ func (u *uploader) estimate() float64 { return numSectors * estimateP90 } -func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, error) { +func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) { // grab fields u.mu.Lock() host := u.host @@ -202,7 +204,7 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, // acquire contract lock lockID, err := u.cs.AcquireContract(req.sector.ctx, fcid, req.contractLockPriority, req.contractLockDuration) if err != nil { - return types.Hash256{}, 0, err + return 0, err } // defer the release @@ -220,26 +222,26 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, // fetch the revision rev, err := host.FetchRevision(ctx, defaultRevisionFetchTimeout) if err != nil { - return types.Hash256{}, 0, err + return 0, err } else if rev.RevisionNumber == math.MaxUint64 { - return types.Hash256{}, 0, errMaxRevisionReached + return 0, errMaxRevisionReached } // update the bus if err := u.os.AddUploadingSector(ctx, req.uploadID, fcid, req.sector.root); err != nil { - return types.Hash256{}, 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err) + return 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err) } // upload the sector start := time.Now() - root, err := host.UploadSector(ctx, req.sector.sectorData(), rev) + err = host.UploadSector(ctx, req.sector.root, req.sector.sectorData(), rev) if err != nil { - return types.Hash256{}, 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err) + return 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err) } // calculate elapsed time elapsed := time.Since(start) - return root, elapsed, nil + return elapsed, nil } func (u *uploader) pop() *sectorUploadReq { From 67a6c871389bc422bd72df7a15bc93d10ac0f627 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 22 Feb 2024 13:38:45 +0100 Subject: [PATCH 10/15] worker: update benchmark results --- worker/bench_test.go | 8 ++++---- worker/upload.go | 6 ++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/worker/bench_test.go b/worker/bench_test.go index f864df9ca..4748f3d85 100644 --- a/worker/bench_test.go +++ b/worker/bench_test.go @@ -21,7 +21,7 @@ func (z *zeroReader) Read(p []byte) (n int, err error) { // BenchmarkUploaderSingleObject benchmarks uploading a single object. // // Speed | CPU | Commit -// 232.97 MB/s | M2 Pro | 26d3119 +// 433.86 MB/s | M2 Pro | bae6e77 func BenchmarkUploaderSingleObject(b *testing.B) { w := newMockWorker() @@ -46,7 +46,7 @@ func BenchmarkUploaderSingleObject(b *testing.B) { // BenchmarkUploaderSingleObject benchmarks uploading one object per slab. // // Speed | CPU | Commit -// 185.10 MB/s | M2 Pro | 26d3119 +// 282.47 MB/s | M2 Pro | bae6e77 func BenchmarkUploaderMultiObject(b *testing.B) { w := newMockWorker() @@ -75,7 +75,7 @@ func BenchmarkUploaderMultiObject(b *testing.B) { // slabs. // // Speed | CPU | Commit -// 1668.87 MB/s | M2 Pro | 26d3119 +// 1658.49 MB/s | M2 Pro | bae6e77 func BenchmarkSectorRoot30Goroutines(b *testing.B) { data := make([]byte, rhpv2.SectorSize) b.SetBytes(int64(rhpv2.SectorSize)) @@ -108,7 +108,7 @@ func BenchmarkSectorRoot30Goroutines(b *testing.B) { // BenchmarkSectorRootSingleGoroutine benchmarks the SectorRoot function. // // Speed | CPU | Commit -// 176.91 MB/s | M2 Pro | 26d3119 +// 177.33 MB/s | M2 Pro | bae6e77 func BenchmarkSectorRootSingleGoroutine(b *testing.B) { data := make([]byte, rhpv2.SectorSize) b.SetBytes(rhpv2.SectorSize) diff --git a/worker/upload.go b/worker/upload.go index 63be07b2b..8bd87f881 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -770,6 +770,12 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders [ sCtx, sCancel := context.WithCancel(ctx) // create the sector + // NOTE: we are computing the sector root here and pass it all the + // way down to the RPC to avoid having to recompute it for the proof + // verification. This is necessary because we need it ahead of time + // for the call to AddUploadingSector in uploader.go + // Once we upload to temp storage we don't need AddUploadingSector + // anymore and can move it back to the RPC. sectors[idx] = §orUpload{ data: (*[rhpv2.SectorSize]byte)(shards[idx]), index: idx, From 9af03a3d007545954baa34b0cbd350ed51ea2081 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 22 Feb 2024 15:53:05 +0100 Subject: [PATCH 11/15] worker: add BenchmarkDownloaderSingleObjecdt --- worker/bench_test.go | 43 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/worker/bench_test.go b/worker/bench_test.go index 4748f3d85..f896ce993 100644 --- a/worker/bench_test.go +++ b/worker/bench_test.go @@ -1,12 +1,15 @@ package worker import ( + "bytes" "context" "io" "sync" "testing" rhpv2 "go.sia.tech/core/rhp/v2" + "go.sia.tech/renterd/api" + "lukechampine.com/frand" ) // zeroReader is a reader that leaves the buffer unchanged and returns no error. @@ -18,6 +21,38 @@ func (z *zeroReader) Read(p []byte) (n int, err error) { return len(p), nil } +// BenchmarkDownlaoderSingleObject benchmarks downloading a single, slab-sized +// object. +// 485.48 MB/s | M2 Pro | bae6e77 +func BenchmarkDownloaderSingleObject(b *testing.B) { + w := newMockWorker() + + up := testParameters(b.TempDir()) + up.rs.MinShards = 10 + up.rs.TotalShards = 30 + up.packing = false + w.addHosts(up.rs.TotalShards) + + data := bytes.NewReader(frand.Bytes(int(up.rs.SlabSizeNoRedundancy()))) + _, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload) + if err != nil { + b.Fatal(err) + } + o, err := w.os.Object(context.Background(), testBucket, up.path, api.GetObjectOptions{}) + if err != nil { + b.Fatal(err) + } + + b.SetBytes(o.Object.Size) + b.ResetTimer() + for i := 0; i < b.N; i++ { + err = w.dl.DownloadObject(context.Background(), io.Discard, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts()) + if err != nil { + b.Fatal(err) + } + } +} + // BenchmarkUploaderSingleObject benchmarks uploading a single object. // // Speed | CPU | Commit @@ -29,14 +64,12 @@ func BenchmarkUploaderSingleObject(b *testing.B) { up.rs.MinShards = 10 up.rs.TotalShards = 30 up.packing = false - w.addHosts(up.rs.TotalShards) - // create a reader that returns dev/null data := io.LimitReader(&zeroReader{}, int64(b.N*rhpv2.SectorSize*up.rs.MinShards)) b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards)) - b.ResetTimer() + _, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload) if err != nil { b.Fatal(err) @@ -54,13 +87,11 @@ func BenchmarkUploaderMultiObject(b *testing.B) { up.rs.MinShards = 10 up.rs.TotalShards = 30 up.packing = false - w.addHosts(up.rs.TotalShards) - // create a reader that returns dev/null b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards)) - b.ResetTimer() + for i := 0; i < b.N; i++ { data := io.LimitReader(&zeroReader{}, int64(rhpv2.SectorSize*up.rs.MinShards)) _, _, err := w.ul.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload) From c9dc1b6cf2e09e7678db1de4eb2b2b7db1ad01f5 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 22 Feb 2024 16:33:09 +0100 Subject: [PATCH 12/15] worker: decrypt in parallel --- object/slab.go | 16 +++++++++++----- worker/download.go | 13 +++++++------ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/object/slab.go b/object/slab.go index aa8bb7d45..f2762abf3 100644 --- a/object/slab.go +++ b/object/slab.go @@ -158,12 +158,18 @@ func (ss SlabSlice) SectorRegion() (offset, length uint32) { // slice offset), using a different nonce for each shard. func (ss SlabSlice) Decrypt(shards [][]byte) { offset := ss.Offset / (rhpv2.LeafSize * uint32(ss.MinShards)) - for i, shard := range shards { - nonce := [24]byte{1: byte(i)} - c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:]) - c.SetCounter(offset) - c.XORKeyStream(shard, shard) + var wg sync.WaitGroup + for i := range shards { + wg.Add(1) + go func(i int) { + nonce := [24]byte{1: byte(i)} + c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:]) + c.SetCounter(offset) + c.XORKeyStream(shards[i], shards[i]) + wg.Done() + }(i) } + wg.Wait() } // Recover recovers a slice of slab data from the supplied shards. diff --git a/worker/download.go b/worker/download.go index 462a2292d..a1cc8f501 100644 --- a/worker/download.go +++ b/worker/download.go @@ -195,12 +195,13 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o hosts[c.HostKey] = struct{}{} } - // buffer the writer - bw := bufio.NewWriter(w) - defer bw.Flush() - // create the cipher writer - cw := o.Key.Decrypt(bw, offset) + cw := o.Key.Decrypt(w, offset) + + // buffer the writer we recover to making sure that we don't hammer the + // response writer with tiny writes + bw := bufio.NewWriter(cw) + defer bw.Flush() // create response chan and ensure it's closed properly var wg sync.WaitGroup @@ -322,7 +323,7 @@ outer: } else { // Regular slab. slabs[respIndex].Decrypt(next.shards) - err := slabs[respIndex].Recover(cw, next.shards) + err := slabs[respIndex].Recover(bw, next.shards) if err != nil { mgr.logger.Errorf("failed to recover slab %v: %v", respIndex, err) return err From aa08202054f0b32a0a622cc988094c0cac1a720a Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 22 Feb 2024 16:34:28 +0100 Subject: [PATCH 13/15] worker: update download benchmark results --- worker/bench_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/bench_test.go b/worker/bench_test.go index f896ce993..575e4640f 100644 --- a/worker/bench_test.go +++ b/worker/bench_test.go @@ -23,7 +23,7 @@ func (z *zeroReader) Read(p []byte) (n int, err error) { // BenchmarkDownlaoderSingleObject benchmarks downloading a single, slab-sized // object. -// 485.48 MB/s | M2 Pro | bae6e77 +// 1036.74 MB/s | M2 Pro | c9dc1b6 func BenchmarkDownloaderSingleObject(b *testing.B) { w := newMockWorker() From 29f4201d2d48802329474607fb3427e8adcebd07 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 22 Feb 2024 16:47:37 +0100 Subject: [PATCH 14/15] worker: remove irrelevant benchmarks --- worker/bench_test.go | 49 -------------------------------------------- 1 file changed, 49 deletions(-) diff --git a/worker/bench_test.go b/worker/bench_test.go index 575e4640f..552eca17c 100644 --- a/worker/bench_test.go +++ b/worker/bench_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "io" - "sync" "testing" rhpv2 "go.sia.tech/core/rhp/v2" @@ -100,51 +99,3 @@ func BenchmarkUploaderMultiObject(b *testing.B) { } } } - -// BenchmarkSectorRoot30Goroutines benchmarks the SectorRoot function with 30 -// goroutines processing roots in parallel to simulate sequential uploads of -// slabs. -// -// Speed | CPU | Commit -// 1658.49 MB/s | M2 Pro | bae6e77 -func BenchmarkSectorRoot30Goroutines(b *testing.B) { - data := make([]byte, rhpv2.SectorSize) - b.SetBytes(int64(rhpv2.SectorSize)) - - // spin up workers - c := make(chan struct{}) - work := func() { - for range c { - rhpv2.SectorRoot((*[rhpv2.SectorSize]byte)(data)) - } - } - var wg sync.WaitGroup - for i := 0; i < 30; i++ { - wg.Add(1) - go func() { - work() - wg.Done() - }() - } - b.ResetTimer() - - // run the benchmark - for i := 0; i < b.N; i++ { - c <- struct{}{} - } - close(c) - wg.Wait() -} - -// BenchmarkSectorRootSingleGoroutine benchmarks the SectorRoot function. -// -// Speed | CPU | Commit -// 177.33 MB/s | M2 Pro | bae6e77 -func BenchmarkSectorRootSingleGoroutine(b *testing.B) { - data := make([]byte, rhpv2.SectorSize) - b.SetBytes(rhpv2.SectorSize) - b.ResetTimer() - for i := 0; i < b.N; i++ { - rhpv2.SectorRoot((*[rhpv2.SectorSize]byte)(data)) - } -} From 89b4729f083145606dedec152c28ebac409c6b3d Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 23 Feb 2024 10:15:34 +0100 Subject: [PATCH 15/15] testing: fix TestUploadPacking --- worker/download.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/download.go b/worker/download.go index a1cc8f501..9048c033b 100644 --- a/worker/download.go +++ b/worker/download.go @@ -315,7 +315,7 @@ outer: s := slabs[respIndex] if s.PartialSlab { // Partial slab. - _, err = cw.Write(s.Data) + _, err = bw.Write(s.Data) if err != nil { mgr.logger.Errorf("failed to send partial slab", respIndex, err) return err