diff --git a/worker/host.go b/worker/host.go index 43e0891af..e5642efdd 100644 --- a/worker/host.go +++ b/worker/host.go @@ -21,7 +21,7 @@ type ( PublicKey() types.PublicKey DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error - UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) + UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error) @@ -121,11 +121,11 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 }) } -func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (root types.Hash256, err error) { +func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (err error) { // fetch price table pt, err := h.priceTable(ctx, nil) if err != nil { - return types.Hash256{}, err + return err } // prepare payment @@ -134,28 +134,28 @@ func (h *host) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, // insufficient balance error expectedCost, _, _, err := uploadSectorCost(pt, rev.WindowEnd) if err != nil { - return types.Hash256{}, err + return err } if rev.RevisionNumber == math.MaxUint64 { - return types.Hash256{}, fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID) + return fmt.Errorf("revision number has reached max, fcid %v", rev.ParentID) } payment, ok := rhpv3.PayByContract(&rev, expectedCost, h.acc.id, h.renterKey) if !ok { - return types.Hash256{}, errors.New("failed to create payment") + return errors.New("failed to create payment") } var cost types.Currency err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { - root, cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sector) + cost, err = RPCAppendSector(ctx, t, h.renterKey, pt, &rev, &payment, sectorRoot, sector) return err }) if err != nil { - return types.Hash256{}, err + return err } // record spending h.contractSpendingRecorder.Record(rev, api.ContractSpending{Uploads: cost}) - return root, nil + return nil } func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) { diff --git a/worker/host_test.go b/worker/host_test.go index 87d35fb36..78ce6b74e 100644 --- a/worker/host_test.go +++ b/worker/host_test.go @@ -16,11 +16,9 @@ func TestHost(t *testing.T) { sector, root := newMockSector() // upload the sector - uploaded, err := h.UploadSector(context.Background(), sector, types.FileContractRevision{}) + err := h.UploadSector(context.Background(), rhpv2.SectorRoot(sector), sector, types.FileContractRevision{}) if err != nil { t.Fatal(err) - } else if uploaded != root { - t.Fatal("root mismatch") } // download entire sector diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 2490941af..a28e9256c 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -396,8 +396,9 @@ func (h *mockHost) DownloadSector(ctx context.Context, w io.Writer, root types.H return err } -func (h *mockHost) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) { - return h.contract().addSector(sector), nil +func (h *mockHost) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error { + h.contract().addSector(sectorRoot, sector) + return nil } func (h *mockHost) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (rev types.FileContractRevision, _ error) { @@ -448,12 +449,10 @@ func newMockContract(hk types.PublicKey, fcid types.FileContractID) *mockContrac } } -func (c *mockContract) addSector(sector *[rhpv2.SectorSize]byte) (root types.Hash256) { - root = rhpv2.SectorRoot(sector) +func (c *mockContract) addSector(sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) { c.mu.Lock() - c.sectors[root] = sector + c.sectors[sectorRoot] = sector c.mu.Unlock() - return } func (c *mockContract) sector(root types.Hash256) (sector *[rhpv2.SectorSize]byte, found bool) { diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 03f67c6f6..b7ccfd69a 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -789,17 +789,17 @@ func RPCReadSector(ctx context.Context, t *transportV3, w io.Writer, pt rhpv3.Ho return } -func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sector *[rhpv2.SectorSize]byte) (sectorRoot types.Hash256, cost types.Currency, err error) { +func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.PrivateKey, pt rhpv3.HostPriceTable, rev *types.FileContractRevision, payment rhpv3.PaymentMethod, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte) (cost types.Currency, err error) { defer wrapErr(&err, "AppendSector") // sanity check revision first if rev.RevisionNumber == math.MaxUint64 { - return types.Hash256{}, types.ZeroCurrency, errMaxRevisionReached + return types.ZeroCurrency, errMaxRevisionReached } s, err := t.DialStream(ctx) if err != nil { - return types.Hash256{}, types.ZeroCurrency, err + return types.ZeroCurrency, err } defer s.Close() @@ -829,7 +829,7 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat // compute expected collateral and refund expectedCost, expectedCollateral, expectedRefund, err := uploadSectorCost(pt, rev.WindowEnd) if err != nil { - return types.Hash256{}, types.ZeroCurrency, err + return types.ZeroCurrency, err } // apply leeways. @@ -840,13 +840,13 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat // check if the cost, collateral and refund match our expectation. if executeResp.TotalCost.Cmp(expectedCost) > 0 { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String()) + return types.ZeroCurrency, fmt.Errorf("cost exceeds expectation: %v > %v", executeResp.TotalCost.String(), expectedCost.String()) } if executeResp.FailureRefund.Cmp(expectedRefund) < 0 { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String()) + return types.ZeroCurrency, fmt.Errorf("insufficient refund: %v < %v", executeResp.FailureRefund.String(), expectedRefund.String()) } if executeResp.AdditionalCollateral.Cmp(expectedCollateral) < 0 { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String()) + return types.ZeroCurrency, fmt.Errorf("insufficient collateral: %v < %v", executeResp.AdditionalCollateral.String(), expectedCollateral.String()) } // set the cost and refund @@ -870,18 +870,17 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat collateral := executeResp.AdditionalCollateral.Add(executeResp.FailureRefund) // check proof - sectorRoot = rhpv2.SectorRoot(sector) if rev.Filesize == 0 { // For the first upload to a contract we don't get a proof. So we just // assert that the new contract root matches the root of the sector. if rev.Filesize == 0 && executeResp.NewMerkleRoot != sectorRoot { - return types.Hash256{}, types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot) + return types.ZeroCurrency, fmt.Errorf("merkle root doesn't match the sector root upon first upload to contract: %v != %v", executeResp.NewMerkleRoot, sectorRoot) } } else { // Otherwise we make sure the proof was transmitted and verify it. actions := []rhpv2.RPCWriteAction{{Type: rhpv2.RPCWriteActionAppend}} // TODO: change once rhpv3 support is available if !rhpv2.VerifyDiffProof(actions, rev.Filesize/rhpv2.SectorSize, executeResp.Proof, []types.Hash256{}, rev.FileMerkleRoot, executeResp.NewMerkleRoot, []types.Hash256{sectorRoot}) { - return types.Hash256{}, types.ZeroCurrency, errors.New("proof verification failed") + return types.ZeroCurrency, errors.New("proof verification failed") } } @@ -889,7 +888,7 @@ func RPCAppendSector(ctx context.Context, t *transportV3, renterKey types.Privat newRevision := *rev newValid, newMissed, err := updateRevisionOutputs(&newRevision, types.ZeroCurrency, collateral) if err != nil { - return types.Hash256{}, types.ZeroCurrency, err + return types.ZeroCurrency, err } newRevision.Filesize += rhpv2.SectorSize newRevision.RevisionNumber++ diff --git a/worker/upload.go b/worker/upload.go index 232e05981..63be07b2b 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -137,9 +137,8 @@ type ( } sectorUploadResp struct { - req *sectorUploadReq - root types.Hash256 - err error + req *sectorUploadReq + err error } ) @@ -1065,12 +1064,6 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { return false, false } - // sanity check we receive the expected root - if resp.root != req.sector.root { - s.errs[req.hk] = fmt.Errorf("root mismatch, %v != %v", resp.root, req.sector.root) - return false, false - } - // redundant sectors can't complete the upload if sector.uploaded.Root != (types.Hash256{}) { return false, false @@ -1080,7 +1073,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { sector.finish(object.Sector{ Contracts: map[types.PublicKey][]types.FileContractID{req.hk: {req.fcid}}, LatestHost: req.hk, - Root: resp.root, + Root: req.sector.root, }) // update uploaded sectors @@ -1127,7 +1120,7 @@ func (req *sectorUploadReq) done() bool { } } -func (req *sectorUploadReq) fail(err error) { +func (req *sectorUploadReq) finish(err error) { select { case <-req.sector.ctx.Done(): case req.responseChan <- sectorUploadResp{ @@ -1136,13 +1129,3 @@ func (req *sectorUploadReq) fail(err error) { }: } } - -func (req *sectorUploadReq) succeed(root types.Hash256) { - select { - case <-req.sector.ctx.Done(): - case req.responseChan <- sectorUploadResp{ - req: req, - root: root, - }: - } -} diff --git a/worker/uploader.go b/worker/uploader.go index dcff27eaf..fa1d04651 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -114,7 +114,7 @@ outer: } // execute it - root, elapsed, err := u.execute(req) + elapsed, err := u.execute(req) // the uploader's contract got renewed, requeue the request if errors.Is(err, errMaxRevisionReached) { @@ -125,10 +125,12 @@ outer: } // send the response - if err != nil { - req.fail(err) - } else { - req.succeed(root) + select { + case <-req.sector.ctx.Done(): + case req.responseChan <- sectorUploadResp{ + req: req, + err: err, + }: } // track the error, ignore gracefully closed streams and canceled overdrives @@ -151,7 +153,7 @@ func (u *uploader) Stop(err error) { break } if !upload.done() { - upload.fail(err) + upload.finish(err) } } } @@ -161,7 +163,7 @@ func (u *uploader) enqueue(req *sectorUploadReq) { // check for stopped if u.stopped { u.mu.Unlock() - go req.fail(errUploaderStopped) // don't block the caller + go req.finish(errUploaderStopped) // don't block the caller return } @@ -192,7 +194,7 @@ func (u *uploader) estimate() float64 { return numSectors * estimateP90 } -func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, error) { +func (u *uploader) execute(req *sectorUploadReq) (time.Duration, error) { // grab fields u.mu.Lock() host := u.host @@ -202,7 +204,7 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, // acquire contract lock lockID, err := u.cs.AcquireContract(req.sector.ctx, fcid, req.contractLockPriority, req.contractLockDuration) if err != nil { - return types.Hash256{}, 0, err + return 0, err } // defer the release @@ -220,26 +222,26 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, // fetch the revision rev, err := host.FetchRevision(ctx, defaultRevisionFetchTimeout) if err != nil { - return types.Hash256{}, 0, err + return 0, err } else if rev.RevisionNumber == math.MaxUint64 { - return types.Hash256{}, 0, errMaxRevisionReached + return 0, errMaxRevisionReached } // update the bus if err := u.os.AddUploadingSector(ctx, req.uploadID, fcid, req.sector.root); err != nil { - return types.Hash256{}, 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err) + return 0, fmt.Errorf("failed to add uploading sector to contract %v, err: %v", fcid, err) } // upload the sector start := time.Now() - root, err := host.UploadSector(ctx, req.sector.sectorData(), rev) + err = host.UploadSector(ctx, req.sector.root, req.sector.sectorData(), rev) if err != nil { - return types.Hash256{}, 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err) + return 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err) } // calculate elapsed time elapsed := time.Since(start) - return root, elapsed, nil + return elapsed, nil } func (u *uploader) pop() *sectorUploadReq {