Skip to content

Commit

Permalink
worker: pass expected root to UploadSector
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Feb 22, 2024
1 parent 5ceccfc commit cfb8cb6
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 65 deletions.
18 changes: 9 additions & 9 deletions worker/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type (
PublicKey() types.PublicKey

DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error
UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error)
UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error

FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error)
FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error)
Expand Down Expand Up @@ -121,11 +121,11 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2
})
}

func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (root types.Hash256, err error) {
func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (err error) {
// fetch price table
pt, err := h.priceTable(ctx, nil)
if err != nil {
return types.Hash256{}, err
return err
}

// prepare payment
Expand All @@ -134,28 +134,28 @@ func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte,
// insufficient balance error
expectedCost, _, _, err := uploadSectorCost(pt, rev.WindowEnd)
if err != nil {
return types.Hash256{}, err
return err
}
if rev.RevisionNumber == math.MaxUint64 {
return types.Hash256{}, fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
return fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID)
}
payment, ok := rhpv3.PayByContract(&rev, expectedCost, h.acc.id, h.renterKey)
if !ok {
return types.Hash256{}, errors.New("failed to create payment")
return errors.New("failed to create payment")
}

var cost types.Currency
err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error {
root, cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sector)
cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sectorRoot, sector)
return err
})
if err != nil {
return types.Hash256{}, err
return err
}

// record spending
h.contractSpendingRecorder.Record(rev, api.ContractSpending{Uploads: cost})
return root, nil
return nil
}

func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) {
Expand Down
4 changes: 1 addition & 3 deletions worker/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ func TestHost(t *testing.T) {
sector, root := newMockSector()

// upload the sector
uploaded, err := h.UploadSector(context.Background(), sector, types.FileContractRevision{})
err := h.UploadSector(context.Background(), rhpv2.SectorRoot(sector), sector, types.FileContractRevision{})
if err != nil {
t.Fatal(err)
} else if uploaded != root {
t.Fatal("root mismatch")
}

// download entire sector
Expand Down
11 changes: 5 additions & 6 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,9 @@ func (h *mockHost) DownloadSector(ctx context.Context, w io.Writer, root types.H
return err
}

func (h *mockHost) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) {
return h.contract().addSector(sector), nil
func (h *mockHost) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error {
h.contract().addSector(sectorRoot, sector)
return nil
}

func (h *mockHost) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (rev types.FileContractRevision, _ error) {
Expand Down Expand Up @@ -448,12 +449,10 @@ func newMockContract(hk types.PublicKey, fcid types.FileContractID) *mockContrac
}
}

func (c *mockContract) addSector(sector *[rhpv2.SectorSize]byte) (root types.Hash256) {
root = rhpv2.SectorRoot(sector)
func (c *mockContract) addSector(sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) {
c.mu.Lock()
c.sectors[root] = sector
c.sectors[sectorRoot] = sector
c.mu.Unlock()
return
}

func (c *mockContract) sector(root types.Hash256) (sector *[rhpv2.SectorSize]byte, found bool) {
Expand Down
21 changes: 10 additions & 11 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,17 +789,17 @@ func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.Ho
return
}

func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sector *[rhpv2.SectorSize]byte) (sectorRoot types.Hash256, cost types.Currency, err error) {
func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) (cost types.Currency, err error) {
defer wrapErr(&err, "AppendSector")

// sanity check revision first
if rev.RevisionNumber == math.MaxUint64 {
return types.Hash256{}, types.ZeroCurrency, errMaxRevisionReached
return types.ZeroCurrency, errMaxRevisionReached
}

s, err := t.DialStream(ctx)
if err != nil {
return types.Hash256{}, types.ZeroCurrency, err
return types.ZeroCurrency, err
}
defer s.Close()

Expand Down Expand Up @@ -829,7 +829,7 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat
// compute expected collateral and refund
expectedCost, expectedCollateral, expectedRefund, err := uploadSectorCost(pt, rev.WindowEnd)
if err != nil {
return types.Hash256{}, types.ZeroCurrency, err
return types.ZeroCurrency, err
}

// apply leeways.
Expand All @@ -840,13 +840,13 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat

// check if the cost, collateral and refund match our expectation.
if executeResp.TotalCost.Cmp(expectedCost) > 0 {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String())
return types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String())
}
if executeResp.FailureRefund.Cmp(expectedRefund) < 0 {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String())
return types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String())
}
if executeResp.AdditionalCollateral.Cmp(expectedCollateral) < 0 {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String())
return types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String())
}

// set the cost and refund
Expand All @@ -870,26 +870,25 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat
collateral := executeResp.AdditionalCollateral.Add(executeResp.FailureRefund)

// check proof
sectorRoot = rhpv2.SectorRoot(sector)
if rev.Filesize == 0 {
// For the first upload to a contract we don't get a proof. So we just
// assert that the new contract root matches the root of the sector.
if rev.Filesize == 0 && executeResp.NewMerkleRoot != sectorRoot {
return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot)
return types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot)
}
} else {
// Otherwise we make sure the proof was transmitted and verify it.
actions := []rhpv2.RPCWriteAction{{Type: rhpv2.RPCWriteActionAppend}} // TODO: change once rhpv3 support is available
if !rhpv2.VerifyDiffProof(actions, rev.Filesize/rhpv2.SectorSize, executeResp.Proof, []types.Hash256{}, rev.FileMerkleRoot, executeResp.NewMerkleRoot, []types.Hash256{sectorRoot}) {
return types.Hash256{}, types.ZeroCurrency, errors.New("proof verification failed")
return types.ZeroCurrency, errors.New("proof verification failed")
}
}

// finalize the program with a new revision.
newRevision := *rev
newValid, newMissed, err := updateRevisionOutputs(&newRevision, types.ZeroCurrency, collateral)
if err != nil {
return types.Hash256{}, types.ZeroCurrency, err
return types.ZeroCurrency, err
}
newRevision.Filesize += rhpv2.SectorSize
newRevision.RevisionNumber++
Expand Down
25 changes: 4 additions & 21 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,8 @@ type (
}

sectorUploadResp struct {
req *sectorUploadReq
root types.Hash256
err error
req *sectorUploadReq
err error
}
)

Expand Down Expand Up @@ -1065,12 +1064,6 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) {
return false, false
}

// sanity check we receive the expected root
if resp.root != req.sector.root {
s.errs[req.hk] = fmt.Errorf("root mismatch, %v != %v", resp.root, req.sector.root)
return false, false
}

// redundant sectors can't complete the upload
if sector.uploaded.Root != (types.Hash256{}) {
return false, false
Expand All @@ -1080,7 +1073,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) {
sector.finish(object.Sector{
Contracts: map[types.PublicKey][]types.FileContractID{req.hk: {req.fcid}},
LatestHost: req.hk,
Root: resp.root,
Root: req.sector.root,
})

// update uploaded sectors
Expand Down Expand Up @@ -1127,7 +1120,7 @@ func (req *sectorUploadReq) done() bool {
}
}

func (req *sectorUploadReq) fail(err error) {
func (req *sectorUploadReq) finish(err error) {
select {
case <-req.sector.ctx.Done():
case req.responseChan <- sectorUploadResp{
Expand All @@ -1136,13 +1129,3 @@ func (req *sectorUploadReq) fail(err error) {
}:
}
}

func (req *sectorUploadReq) succeed(root types.Hash256) {
select {
case <-req.sector.ctx.Done():
case req.responseChan <- sectorUploadResp{
req: req,
root: root,
}:
}
}
32 changes: 17 additions & 15 deletions worker/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ outer:
}

// execute it
root, elapsed, err := u.execute(req)
elapsed, err := u.execute(req)

// the uploader's contract got renewed, requeue the request
if errors.Is(err, errMaxRevisionReached) {
Expand All @@ -125,10 +125,12 @@ outer:
}

// send the response
if err != nil {
req.fail(err)
} else {
req.succeed(root)
select {
case <-req.sector.ctx.Done():
case req.responseChan <- sectorUploadResp{
req: req,
err: err,
}:
}

// track the error, ignore gracefully closed streams and canceled overdrives
Expand All @@ -151,7 +153,7 @@ func (u *uploader) Stop(err error) {
break
}
if !upload.done() {
upload.fail(err)
upload.finish(err)
}
}
}
Expand All @@ -161,7 +163,7 @@ func (u *uploader) enqueue(req *sectorUploadReq) {
// check for stopped
if u.stopped {
u.mu.Unlock()
go req.fail(errUploaderStopped) // don't block the caller
go req.finish(errUploaderStopped) // don't block the caller
return
}

Expand Down Expand Up @@ -192,7 +194,7 @@ func (u *uploader) estimate() float64 {
return numSectors * estimateP90
}

func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, error) {
func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) {
// grab fields
u.mu.Lock()
host := u.host
Expand All @@ -202,7 +204,7 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration,
// acquire contract lock
lockID, err := u.cs.AcquireContract(req.sector.ctx, fcid, req.contractLockPriority, req.contractLockDuration)
if err != nil {
return types.Hash256{}, 0, err
return 0, err
}

// defer the release
Expand All @@ -220,26 +222,26 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration,
// fetch the revision
rev, err := host.FetchRevision(ctx, defaultRevisionFetchTimeout)
if err != nil {
return types.Hash256{}, 0, err
return 0, err
} else if rev.RevisionNumber == math.MaxUint64 {
return types.Hash256{}, 0, errMaxRevisionReached
return 0, errMaxRevisionReached
}

// update the bus
if err := u.os.AddUploadingSector(ctx, req.uploadID, fcid, req.sector.root); err != nil {
return types.Hash256{}, 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err)
return 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err)
}

// upload the sector
start := time.Now()
root, err := host.UploadSector(ctx, req.sector.sectorData(), rev)
err = host.UploadSector(ctx, req.sector.root, req.sector.sectorData(), rev)
if err != nil {
return types.Hash256{}, 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err)
return 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err)
}

// calculate elapsed time
elapsed := time.Since(start)
return root, elapsed, nil
return elapsed, nil
}

func (u *uploader) pop() *sectorUploadReq {
Expand Down

0 comments on commit cfb8cb6

Please sign in to comment.