Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:SiaFoundation/renterd into pj/mock-bus
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Feb 27, 2024
2 parents 145b505 + fc72fa9 commit de4b8c8
Show file tree
Hide file tree
Showing 16 changed files with 254 additions and 144 deletions.
6 changes: 4 additions & 2 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type (
ContractSizes(ctx context.Context) (map[types.FileContractID]api.ContractSize, error)
ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error)

DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error
DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error)

Bucket(_ context.Context, bucketName string) (api.Bucket, error)
CreateBucket(_ context.Context, bucketName string, policy api.BucketPolicy) error
Expand Down Expand Up @@ -1409,9 +1409,11 @@ func (b *bus) sectorsHostRootHandlerDELETE(jc jape.Context) {
} else if jc.DecodeParam("root", &root) != nil {
return
}
err := b.ms.DeleteHostSector(jc.Request.Context(), hk, root)
n, err := b.ms.DeleteHostSector(jc.Request.Context(), hk, root)
if jc.Check("failed to mark sector as lost", err) != nil {
return
} else if n > 0 {
b.logger.Infow("successfully marked sector as lost", "hk", hk, "root", root)
}
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ require (
github.com/montanaflynn/stats v0.7.1
gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe
go.sia.tech/core v0.2.1
go.sia.tech/coreutils v0.0.1
go.sia.tech/coreutils v0.0.3
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2
go.sia.tech/hostd v1.0.2
go.sia.tech/jape v0.11.2-0.20240124024603-93559895d640
go.sia.tech/mux v1.2.0
go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca
go.sia.tech/web/renterd v0.46.0
go.uber.org/zap v1.26.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.19.0
golang.org/x/term v0.17.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.sia.tech/core v0.2.1 h1:CqmMd+T5rAhC+Py3NxfvGtvsj/GgwIqQHHVrdts/LqY=
go.sia.tech/core v0.2.1/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q=
go.sia.tech/coreutils v0.0.1 h1:Th8iiF9fjkBaxlKRgPJfRtsD3Pb8U4d2m/OahB6wffg=
go.sia.tech/coreutils v0.0.1/go.mod h1:3Mb206QDd3NtRiaHZ2kN87/HKXhcBF6lHVatS7PkViY=
go.sia.tech/coreutils v0.0.3 h1:ZxuzovRpQMvfy/pCOV4om1cPF6sE15GyJyK36kIrF1Y=
go.sia.tech/coreutils v0.0.3/go.mod h1:UBFc77wXiE//eyilO5HLOncIEj7F69j0Nv2OkFujtP0=
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2 h1:ulzfJNjxN5DjXHClkW2pTiDk+eJ+0NQhX87lFDZ03t0=
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2/go.mod h1:PlsiVCn6+wssrR7bsOIlZm0DahsVrDydrlbjY4F14sg=
go.sia.tech/hostd v1.0.2 h1:GjzNIAlwg3/dViF6258Xn5DI3+otQLRqmkoPDugP+9Y=
Expand All @@ -258,14 +258,14 @@ go.sia.tech/web v0.0.0-20231213145933-3f175a86abff/go.mod h1:RKODSdOmR3VtObPAcGw
go.sia.tech/web/renterd v0.46.0 h1:BMVg4i7LxSlc8wZ4T0EG1k3EK4JxVIzCfD3/cjmwH0k=
go.sia.tech/web/renterd v0.46.0/go.mod h1:FgXrdmAnu591a3h96RB/15pMZ74xO9457g902uE06BM=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
17 changes: 17 additions & 0 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package object
import (
"bytes"
"crypto/cipher"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -142,6 +143,22 @@ func (o Object) Contracts() map[types.PublicKey]map[types.FileContractID]struct{
return usedContracts
}

func (o *Object) ComputeETag() string {
// calculate the eTag using the precomputed sector roots to avoid having to
// hash the entire object again.
h := md5.New()
b := make([]byte, 8)
for _, slab := range o.Slabs {
binary.LittleEndian.PutUint32(b[:4], slab.Offset)
binary.LittleEndian.PutUint32(b[4:], slab.Length)
h.Write(b)
for _, shard := range slab.Shards {
h.Write(shard.Root[:])
}
}
return string(hex.EncodeToString(h.Sum(nil)))
}

// TotalSize returns the total size of the object.
func (o Object) TotalSize() int64 {
var n int64
Expand Down
31 changes: 22 additions & 9 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package object
import (
"bytes"
"io"
"sync"

"github.com/klauspost/reedsolomon"
rhpv2 "go.sia.tech/core/rhp/v2"
Expand Down Expand Up @@ -79,11 +80,17 @@ func (s Slab) Length() int {
// Encrypt xors shards with the keystream derived from s.Key, using a
// different nonce for each shard.
func (s Slab) Encrypt(shards [][]byte) {
for i, shard := range shards {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:])
c.XORKeyStream(shard, shard)
var wg sync.WaitGroup
for i := range shards {
wg.Add(1)
go func(i int) {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(s.Key.entropy[:], nonce[:])
c.XORKeyStream(shards[i], shards[i])
wg.Done()
}(i)
}
wg.Wait()
}

// Encode encodes slab data into sector-sized shards. The supplied shards should
Expand Down Expand Up @@ -151,12 +158,18 @@ func (ss SlabSlice) SectorRegion() (offset, length uint32) {
// slice offset), using a different nonce for each shard.
func (ss SlabSlice) Decrypt(shards [][]byte) {
offset := ss.Offset / (rhpv2.LeafSize * uint32(ss.MinShards))
for i, shard := range shards {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:])
c.SetCounter(offset)
c.XORKeyStream(shard, shard)
var wg sync.WaitGroup
for i := range shards {
wg.Add(1)
go func(i int) {
nonce := [24]byte{1: byte(i)}
c, _ := chacha20.NewUnauthenticatedCipher(ss.Key.entropy[:], nonce[:])
c.SetCounter(offset)
c.XORKeyStream(shards[i], shards[i])
wg.Done()
}(i)
}
wg.Wait()
}

// Recover recovers a slice of slab data from the supplied shards.
Expand Down
7 changes: 5 additions & 2 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1620,8 +1620,9 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath
return
}

func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error {
return s.retryTransaction(func(tx *gorm.DB) error {
func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error) {
var deletedSectors int
err := s.retryTransaction(func(tx *gorm.DB) error {
// Fetch contract_sectors to delete.
var sectors []dbContractSector
err := tx.Raw(`
Expand Down Expand Up @@ -1660,6 +1661,7 @@ func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, roo
} else if res.RowsAffected != int64(len(sectors)) {
return fmt.Errorf("expected %v affected rows but got %v", len(sectors), res.RowsAffected)
}
deletedSectors = len(sectors)

// Increment the host's lostSectors by the number of lost sectors.
if err := tx.Exec("UPDATE hosts SET lost_sectors = lost_sectors + ? WHERE public_key = ?", len(sectors), publicKey(hk)).Error; err != nil {
Expand Down Expand Up @@ -1687,6 +1689,7 @@ func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, roo
}
return nil
})
return deletedSectors, err
}

func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, metadata api.ObjectUserMetadata, o object.Object) error {
Expand Down
4 changes: 3 additions & 1 deletion stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3582,8 +3582,10 @@ func TestDeleteHostSector(t *testing.T) {
}

// Prune the sector from hk1.
if err := ss.DeleteHostSector(context.Background(), hk1, root); err != nil {
if n, err := ss.DeleteHostSector(context.Background(), hk1, root); err != nil {
t.Fatal(err)
} else if n != 2 {
t.Fatal("no sectors were pruned", n)
}

// Make sure 2 contractSector entries exist.
Expand Down
101 changes: 101 additions & 0 deletions worker/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package worker

import (
"bytes"
"context"
"io"
"testing"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/renterd/api"
"lukechampine.com/frand"
)

// zeroReader is a reader that leaves the buffer unchanged and returns no error.
// It's useful for benchmarks that need to produce data for uploading and should
// be used together with a io.LimitReader.
type zeroReader struct{}

func (z *zeroReader) Read(p []byte) (n int, err error) {
return len(p), nil
}

// BenchmarkDownlaoderSingleObject benchmarks downloading a single, slab-sized
// object.
// 1036.74 MB/s | M2 Pro | c9dc1b6
func BenchmarkDownloaderSingleObject(b *testing.B) {
w := newTestWorker(b)

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

data := bytes.NewReader(frand.Bytes(int(up.rs.SlabSizeNoRedundancy())))
_, _, err := w.uploadManager.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
o, err := w.os.Object(context.Background(), testBucket, up.path, api.GetObjectOptions{})
if err != nil {
b.Fatal(err)
}

b.SetBytes(o.Object.Size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = w.downloadManager.DownloadObject(context.Background(), io.Discard, *o.Object.Object, 0, uint64(o.Object.Size), w.contracts())
if err != nil {
b.Fatal(err)
}
}
}

// BenchmarkUploaderSingleObject benchmarks uploading a single object.
//
// Speed | CPU | Commit
// 433.86 MB/s | M2 Pro | bae6e77
func BenchmarkUploaderSingleObject(b *testing.B) {
w := newTestWorker(b)

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

data := io.LimitReader(&zeroReader{}, int64(b.N*rhpv2.SectorSize*up.rs.MinShards))
b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards))
b.ResetTimer()

_, _, err := w.uploadManager.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
}

// BenchmarkUploaderSingleObject benchmarks uploading one object per slab.
//
// Speed | CPU | Commit
// 282.47 MB/s | M2 Pro | bae6e77
func BenchmarkUploaderMultiObject(b *testing.B) {
w := newTestWorker(b)

up := testParameters(b.TempDir())
up.rs.MinShards = 10
up.rs.TotalShards = 30
up.packing = false
w.addHosts(up.rs.TotalShards)

b.SetBytes(int64(rhpv2.SectorSize * up.rs.MinShards))
b.ResetTimer()

for i := 0; i < b.N; i++ {
data := io.LimitReader(&zeroReader{}, int64(rhpv2.SectorSize*up.rs.MinShards))
_, _, err := w.uploadManager.Upload(context.Background(), data, w.contracts(), up, lockingPriorityUpload)
if err != nil {
b.Fatal(err)
}
}
}
17 changes: 8 additions & 9 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o
hosts[c.HostKey] = struct{}{}
}

// buffer the writer
bw := bufio.NewWriter(w)
defer bw.Flush()

// create the cipher writer
cw := o.Key.Decrypt(bw, offset)
cw := o.Key.Decrypt(w, offset)

// buffer the writer we recover to making sure that we don't hammer the
// response writer with tiny writes
bw := bufio.NewWriter(cw)
defer bw.Flush()

// create response chan and ensure it's closed properly
var wg sync.WaitGroup
Expand Down Expand Up @@ -314,15 +315,15 @@ outer:
s := slabs[respIndex]
if s.PartialSlab {
// Partial slab.
_, err = cw.Write(s.Data)
_, err = bw.Write(s.Data)
if err != nil {
mgr.logger.Errorf("failed to send partial slab", respIndex, err)
return err
}
} else {
// Regular slab.
slabs[respIndex].Decrypt(next.shards)
err := slabs[respIndex].Recover(cw, next.shards)
err := slabs[respIndex].Recover(bw, next.shards)
if err != nil {
mgr.logger.Errorf("failed to recover slab %v: %v", respIndex, err)
return err
Expand Down Expand Up @@ -761,8 +762,6 @@ loop:
if isSectorNotFound(resp.err) {
if err := s.mgr.os.DeleteHostSector(ctx, resp.req.host.PublicKey(), resp.req.root); err != nil {
s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.host.PublicKey(), "root", resp.req.root, zap.Error(err))
} else {
s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.host.PublicKey(), "root", resp.req.root)
}
} else if isPriceTableGouging(resp.err) && s.overpay && !resp.req.overpay {
resp.req.overpay = true // ensures we don't retry the same request over and over again
Expand Down
18 changes: 9 additions & 9 deletions worker/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type (
PublicKey() types.PublicKey

DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error
UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error)
UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error

FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error)
FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error)
Expand Down Expand Up @@ -117,11 +117,11 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2
})
}

func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (root types.Hash256, err error) {
func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (err error) {
// fetch price table
pt, err := h.priceTable(ctx, nil)
if err != nil {
return types.Hash256{}, err
return err
}

// prepare payment
Expand All @@ -130,28 +130,28 @@ func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte,
// insufficient balance error
expectedCost, _, _, err := uploadSectorCost(pt, rev.WindowEnd)
if err != nil {
return types.Hash256{}, err
return err
}
if rev.RevisionNumber == math.MaxUint64 {
return types.Hash256{}, fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
return fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
}
payment, ok := rhpv3.PayByContract(&rev, expectedCost, h.acc.id, h.renterKey)
if !ok {
return types.Hash256{}, errors.New("failed to create payment")
return errors.New("failed to create payment")
}

var cost types.Currency
err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error {
root, cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sector)
cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sectorRoot, sector)
return err
})
if err != nil {
return types.Hash256{}, err
return err
}

// record spending
h.contractSpendingRecorder.Record(rev, api.ContractSpending{Uploads: cost})
return root, nil
return nil
}

func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) {
Expand Down
Loading

0 comments on commit de4b8c8

Please sign in to comment.