diff --git a/api/bus.go b/api/bus.go index c79772f65..e245cdf79 100644 --- a/api/bus.go +++ b/api/bus.go @@ -59,6 +59,10 @@ var ( // from the database. ErrContractSetNotFound = errors.New("couldn't find contract set") + // ErrHostNotFound is returned when a host can't be retrieved from the + // database. + ErrHostNotFound = errors.New("host doesn't exist in hostdb") + // ErrSettingNotFound is returned if a requested setting is not present in the // database. ErrSettingNotFound = errors.New("setting not found") diff --git a/api/worker.go b/api/worker.go index aeddf7d38..f9081daf1 100644 --- a/api/worker.go +++ b/api/worker.go @@ -49,6 +49,19 @@ type RHPScanRequest struct { Timeout time.Duration `json:"timeout"` } +// RHPPruneContractRequest is the request type for the /rhp/contract/:id/prune +// endpoint. +type RHPPruneContractRequest struct { + Timeout time.Duration `json:"timeout"` +} + +// RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune +// endpoint. +type RHPPruneContractResponse struct { + Pruned uint64 `json:"pruned"` + Remaining uint64 `json:"remaining"` +} + // RHPPriceTableRequest is the request type for the /rhp/pricetable endpoint. type RHPPriceTableRequest struct { HostKey types.PublicKey `json:"hostKey"` diff --git a/bus/bus.go b/bus/bus.go index 5a9ed991f..808576258 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -716,6 +716,16 @@ func (b *bus) contractsPrunableDataHandlerGET(jc jape.Context) { // build the response for fcid, size := range sizes { + // adjust the amount of prunable data with the pending uploads, due to + // how we record contract spending a contract's size might already + // include pending sectors + pending := b.uploadingSectors.pending(fcid) + if pending > size.Prunable { + size.Prunable = 0 + } else { + size.Prunable -= pending + } + contracts = append(contracts, api.ContractPrunableData{ ID: fcid, ContractSize: size, @@ -726,6 +736,9 @@ func (b *bus) contractsPrunableDataHandlerGET(jc jape.Context) { // sort contracts by the amount of prunable data sort.Slice(contracts, func(i, j int) bool { + if contracts[i].Prunable == contracts[j].Prunable { + return contracts[i].Size > contracts[j].Size + } return contracts[i].Prunable > contracts[j].Prunable }) @@ -746,9 +759,21 @@ func (b *bus) contractSizeHandlerGET(jc jape.Context) { if errors.Is(err, api.ErrContractNotFound) { jc.Error(err, http.StatusNotFound) return - } else if jc.Check("failed to fetch contract size", err) == nil { - jc.Encode(size) + } else if jc.Check("failed to fetch contract size", err) != nil { + return + } + + // adjust the amount of prunable data with the pending uploads, due to how + // we record contract spending a contract's size might already include + // pending sectors + pending := b.uploadingSectors.pending(id) + if pending > size.Prunable { + size.Prunable = 0 + } else { + size.Prunable -= pending } + + jc.Encode(size) } func (b *bus) contractReleaseHandlerPOST(jc jape.Context) { @@ -1773,14 +1798,13 @@ func (b *bus) Handler() http.Handler { "POST /search/hosts": b.searchHostsHandlerPOST, "GET /search/objects": b.searchObjectsHandlerGET, - "GET /stats/objects": b.objectsStatshandlerGET, - "GET /settings": b.settingsHandlerGET, "GET /setting/:key": b.settingKeyHandlerGET, "PUT /setting/:key": b.settingKeyHandlerPUT, "DELETE /setting/:key": b.settingKeyHandlerDELETE, - "GET /state": b.stateHandlerGET, + "GET /state": b.stateHandlerGET, + "GET /stats/objects": b.objectsStatshandlerGET, "POST /upload/:id": b.uploadTrackHandlerPOST, "POST /upload/:id/sector": b.uploadAddSectorHandlerPOST, diff --git a/bus/uploadingsectors.go b/bus/uploadingsectors.go index d0175f37a..6a3917d50 100644 --- a/bus/uploadingsectors.go +++ b/bus/uploadingsectors.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" ) @@ -65,6 +66,20 @@ func (usc *uploadingSectorsCache) addUploadingSector(uID api.UploadID, fcid type return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID) } +func (usc *uploadingSectorsCache) pending(fcid types.FileContractID) (size uint64) { + usc.mu.Lock() + var uploads []*ongoingUpload + for _, ongoing := range usc.uploads { + uploads = append(uploads, ongoing) + } + usc.mu.Unlock() + + for _, ongoing := range uploads { + size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize + } + return +} + func (usc *uploadingSectorsCache) sectors(fcid types.FileContractID) (roots []types.Hash256) { usc.mu.Lock() var uploads []*ongoingUpload diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index 4318be343..e5dbf89c0 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -2025,7 +2025,11 @@ func TestWallet(t *testing.T) { // to the amount of money sent as well as the miner fees used. spendableDiff := wallet.Spendable.Sub(updated.Spendable) if updated.Unconfirmed.Cmp(spendableDiff) > 0 { - t.Fatalf("unconfirmed balance can't be greater than the difference in spendable balance here, confirmed %v->%v unconfirmed %v->%v spendable %v->%v fee %v", wallet.Confirmed, updated.Confirmed, wallet.Unconfirmed, updated.Unconfirmed, wallet.Spendable, updated.Spendable, minerFee) + t.Fatalf("unconfirmed balance can't be greater than the difference in spendable balance here: \nconfirmed %v (%v) - >%v (%v) \nunconfirmed %v (%v) -> %v (%v) \nspendable %v (%v) -> %v (%v) \nfee %v (%v)", + wallet.Confirmed, wallet.Confirmed.ExactString(), updated.Confirmed, updated.Confirmed.ExactString(), + wallet.Unconfirmed, wallet.Unconfirmed.ExactString(), updated.Unconfirmed, updated.Unconfirmed.ExactString(), + wallet.Spendable, wallet.Spendable.ExactString(), updated.Spendable, updated.Spendable.ExactString(), + minerFee, minerFee.ExactString()) } withdrawnAmt := spendableDiff.Sub(updated.Unconfirmed) expectedWithdrawnAmt := sendAmt.Add(minerFee) diff --git a/internal/testing/pruning_test.go b/internal/testing/pruning_test.go index 1c35f371e..6e10380e9 100644 --- a/internal/testing/pruning_test.go +++ b/internal/testing/pruning_test.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "math" "strings" "testing" "time" @@ -192,6 +193,8 @@ func TestSectorPruning(t *testing.T) { w := cluster.Worker b := cluster.Bus + numObjects := 10 + // add hosts _, err = cluster.AddHostsBlocking(int(cfg.Contracts.Amount)) if err != nil { @@ -210,7 +213,7 @@ func TestSectorPruning(t *testing.T) { } // add several objects - for i := 0; i < 10; i++ { + for i := 0; i < numObjects; i++ { filename := fmt.Sprintf("obj_%d", i) if err := w.UploadObject(context.Background(), bytes.NewReader([]byte(filename)), filename); err != nil { t.Fatal(err) @@ -238,7 +241,7 @@ func TestSectorPruning(t *testing.T) { } n += len(cRoots) } - if n != rs.TotalShards*10 { + if n != rs.TotalShards*numObjects { t.Fatal("unexpected number of roots", n) } @@ -254,6 +257,7 @@ func TestSectorPruning(t *testing.T) { } }() b = cluster2.Bus + w = cluster2.Worker // assert prunable data is 0 if res, err := b.PrunableData(context.Background()); err != nil { @@ -262,8 +266,8 @@ func TestSectorPruning(t *testing.T) { t.Fatal("expected 0 prunable data", n) } - // delete a random number of objects - for i := 0; i < 10; i += 2 { + // delete every other object + for i := 0; i < numObjects; i += 2 { filename := fmt.Sprintf("obj_%d", i) if err := b.DeleteObject(context.Background(), filename, false); err != nil { t.Fatal(err) @@ -273,7 +277,40 @@ func TestSectorPruning(t *testing.T) { // assert amount of prunable data if res, err := b.PrunableData(context.Background()); err != nil { t.Fatal(err) - } else if res.TotalPrunable != 5*uint64(rs.TotalShards)*rhpv2.SectorSize { + } else if res.TotalPrunable != uint64(math.Ceil(float64(numObjects)/2))*uint64(rs.TotalShards)*rhpv2.SectorSize { t.Fatal("unexpected prunable data", n) } + + // prune all contracts + for _, c := range contracts { + if _, _, err := w.RHPPruneContract(context.Background(), c.ID, 0); err != nil { + t.Fatal(err) + } + } + + // assert spending records were updated and prunable data is 0 + if err = Retry(10, testBusFlushInterval, func() error { + if res, err := b.PrunableData(context.Background()); err != nil { + t.Fatal(err) + } else if res.TotalPrunable != 0 { + return fmt.Errorf("unexpected prunable data: %d", n) + } + return nil + }); err != nil { + t.Fatal(err) + } + + // assert spending was updated + for _, c := range contracts { + c, err := b.Contract(context.Background(), c.ID) + if err != nil { + t.Fatal(err) + } + if c.Spending.SectorRoots.IsZero() { + t.Fatal("spending record not updated") + } + if c.Spending.Deletions.IsZero() { + t.Fatal("spending record not updated") + } + } } diff --git a/stores/hostdb.go b/stores/hostdb.go index 71593fdb1..d15e192cf 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -41,7 +41,6 @@ const ( ) var ( - ErrHostNotFound = errors.New("host doesn't exist in hostdb") ErrNegativeOffset = errors.New("offset can not be negative") ErrNegativeMaxDowntime = errors.New("max downtime can not be negative") ) @@ -435,7 +434,7 @@ func (ss *SQLStore) Host(ctx context.Context, hostKey types.PublicKey) (hostdb.H Preload("Blocklist"). Take(&h) if errors.Is(tx.Error, gorm.ErrRecordNotFound) { - return hostdb.HostInfo{}, ErrHostNotFound + return hostdb.HostInfo{}, api.ErrHostNotFound } else if tx.Error != nil { return hostdb.HostInfo{}, tx.Error } diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index 8a21f4c7e..e781d22de 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -45,7 +45,7 @@ func TestSQLHostDB(t *testing.T) { ctx := context.Background() hk := types.GeneratePrivateKey().PublicKey() _, err = hdb.Host(ctx, hk) - if !errors.Is(err, ErrHostNotFound) { + if !errors.Is(err, api.ErrHostNotFound) { t.Fatal(err) } diff --git a/stores/metadata.go b/stores/metadata.go index 78d3c7284..9bae19282 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -626,7 +626,6 @@ WHERE c.fcid = ? roots = append(roots, *(*types.Hash256)(&r)) } } - return } @@ -694,7 +693,7 @@ func (s *SQLStore) ContractSize(ctx context.Context, id types.FileContractID) (a if err := s.db. Raw(` -SELECT c.size, CASE WHEN c.size>(COUNT(cs.db_sector_id) * ?) THEN c.size-(COUNT(cs.db_sector_id) * ?) ELSE 0 END as prunable +SELECT MAX(c.size) as size, CASE WHEN MAX(c.size)>(COUNT(cs.db_sector_id) * ?) THEN MAX(c.size)-(COUNT(cs.db_sector_id) * ?) ELSE 0 END as prunable FROM contracts c LEFT JOIN contract_sectors cs ON cs.db_contract_id = c.id WHERE c.fcid = ? diff --git a/worker/client.go b/worker/client.go index d0c8dbe78..1f83a969f 100644 --- a/worker/client.go +++ b/worker/client.go @@ -36,6 +36,17 @@ func (c *Client) RHPBroadcast(ctx context.Context, fcid types.FileContractID) (e return } +// RHPPruneContract prunes deleted sectors from the contract with given id. +func (c *Client) RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) { + var res api.RHPPruneContractResponse + err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/prune", fcid), api.RHPPruneContractRequest{ + Timeout: timeout, + }, &res) + pruned = res.Pruned + remaining = res.Remaining + return +} + // RHPContractRoots fetches the roots of the contract with given id. func (c *Client) RHPContractRoots(ctx context.Context, fcid types.FileContractID) (roots []types.Hash256, err error) { err = c.c.WithContext(ctx).GET(fmt.Sprintf("/rhp/contract/%s/roots", fcid), &roots) diff --git a/worker/rhpv2.go b/worker/rhpv2.go index 58cecaa27..553f67ba3 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -6,11 +6,20 @@ import ( "errors" "fmt" "math" + "math/bits" + "sort" "strings" "time" rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.sia.tech/siad/build" +) + +const ( + // minMessageSize is the minimum size of an RPC message + minMessageSize = 4096 ) var ( @@ -201,7 +210,7 @@ func RPCFormContract(ctx context.Context, t *rhpv2.Transport, renterKey types.Pr // read the host's signatures and merge them with our own var hostSigs rhpv2.RPCFormContractSignatures - if err := t.ReadResponse(&hostSigs, 4096); err != nil { + if err := t.ReadResponse(&hostSigs, minMessageSize); err != nil { return rhpv2.ContractRevision{}, nil, err } @@ -262,120 +271,375 @@ func (w *worker) FetchSignedRevision(ctx context.Context, hostIP string, hostKey return rev, err } -func (w *worker) FetchContractRoots(ctx context.Context, hostIP string, hostKey types.PublicKey, renterKey types.PrivateKey, contractID types.FileContractID, lastKnownRevisionNumber uint64, timeout time.Duration) (roots []types.Hash256, err error) { - err = w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) error { - req := &rhpv2.RPCLockRequest{ - ContractID: contractID, - Signature: t.SignChallenge(renterKey), - Timeout: uint64(time.Minute.Milliseconds()), - } +func (w *worker) PruneContract(ctx context.Context, hostIP string, hostKey types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64) (deleted, remaining uint64, err error) { + err = w.withContractLock(ctx, fcid, lockingPriorityPruning, func() error { + return w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) error { + return w.withRevisionV2(ctx, defaultLockTimeout, t, hostKey, fcid, lastKnownRevisionNumber, func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) (err error) { + // delete roots + got, err := w.fetchContractRoots(t, &rev, settings) + if err != nil { + return err + } + + // fetch the roots from the bus + want, pending, err := w.bus.ContractRoots(ctx, fcid) + if err != nil { + return err + } + keep := make(map[types.Hash256]struct{}) + for _, root := range append(want, pending...) { + keep[root] = struct{}{} + } + + // collect indices for roots we want to prune + var indices []uint64 + for i, root := range got { + if _, wanted := keep[root]; wanted { + delete(keep, root) // prevent duplicates + continue + } + indices = append(indices, uint64(i)) + } + if len(indices) == 0 { + return fmt.Errorf("no sectors to prune, database holds %d (%d pending), contract contains %d", len(want)+len(pending), len(pending), len(got)) + } + + // delete the roots from the contract + deleted, err = w.deleteContractRoots(t, &rev, settings, indices) + if deleted < uint64(len(indices)) { + remaining = uint64(len(indices)) - deleted + } + return + }) + }) + }) + return +} - // execute lock RPC - var lockResp rhpv2.RPCLockResponse - if err := t.Call(rhpv2.RPCLockID, req, &lockResp); err != nil { - return err - } - t.SetChallenge(lockResp.NewChallenge) - revision := lockResp.Revision - sigs := lockResp.Signatures +func (w *worker) deleteContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings, indices []uint64) (deleted uint64, err error) { + w.logger.Debugw(fmt.Sprintf("deleting %d contract roots (%v)", len(indices), humanReadableSize(len(indices)*rhpv2.SectorSize)), "hk", rev.HostKey(), "fcid", rev.ID()) - // defer unlock RPC - defer t.WriteRequest(rhpv2.RPCUnlockID, nil) + // return early + if len(indices) == 0 { + return 0, nil + } - // sanity check the signature - var sig types.Signature - copy(sig[:], sigs[0].Signature) - if !renterKey.PublicKey().VerifyHash(hashRevision(revision), sig) { - return fmt.Errorf("unexpected renter signature on revision host revision") - } + // record contract spending + var totalCost types.Currency + defer func() { + w.contractSpendingRecorder.Record(rev.ID(), rev.Revision.RevisionNumber, rev.Revision.Filesize, api.ContractSpending{Deletions: totalCost}) + }() - // sanity check the revision number is not lower than our last known - // revision number, host might be slipping us an outdated revision - if revision.RevisionNumber < lastKnownRevisionNumber { - return fmt.Errorf("unexpected revision number, %v!=%v", revision.RevisionNumber, lastKnownRevisionNumber) - } + // sort in descending order so that we can use 'range' + sort.Slice(indices, func(i, j int) bool { + return indices[i] > indices[j] + }) - // extract the revision - rev := rhpv2.ContractRevision{ - Revision: revision, - Signatures: [2]types.TransactionSignature{sigs[0], sigs[1]}, - } + // decide on the batch size, defaults to ~20mib of sector data but for old + // hosts we use a much smaller batch size to ensure we nibble away at the + // problem rather than outright failing or timing out + batchSize := int(batchSizeDeleteSectors) + if build.VersionCmp(settings.Version, "1.6.0") < 0 { + batchSize = 100 + } - // execute settings RPC - var settingsResp rhpv2.RPCSettingsResponse - if err := t.Call(rhpv2.RPCSettingsID, nil, &settingsResp); err != nil { - return err + // split the indices into batches + var batches [][]uint64 + for { + if len(indices) < batchSize { + batchSize = len(indices) } - var settings rhpv2.HostSettings - if err := json.Unmarshal(settingsResp.Settings, &settings); err != nil { - return fmt.Errorf("couldn't unmarshal json: %w", err) + batches = append(batches, indices[:batchSize]) + indices = indices[batchSize:] + if len(indices) == 0 { + break } + } - // download all roots - numsectors := rev.NumSectors() - for offset := uint64(0); offset < numsectors; { - n := batchSizeFetchSectors - if offset+n > numsectors { - n = numsectors - offset + // derive the renter key + renterKey := w.deriveRenterKey(rev.HostKey()) + + // range over the batches and delete the sectors batch per batch + for i, batch := range batches { + if err = func() error { + var cost types.Currency + start := time.Now() + w.logger.Debugw(fmt.Sprintf("starting batch %d/%d of size %d", i+1, len(batches), len(batch))) + defer func() { + w.logger.Debugw(fmt.Sprintf("processing batch %d/%d of size %d took %v", i+1, len(batches), len(batch), time.Since(start)), "cost", cost) + }() + + numSectors := rev.NumSectors() + + // build a set of actions that move the sectors we want to delete + // towards the end of the contract, preparing them to be trimmed off + var actions []rhpv2.RPCWriteAction + cIndex := numSectors - 1 + for _, rIndex := range batch { + if cIndex != rIndex { + actions = append(actions, rhpv2.RPCWriteAction{ + Type: rhpv2.RPCWriteActionSwap, + A: uint64(cIndex), + B: uint64(rIndex), + }) + } + cIndex-- } + actions = append(actions, rhpv2.RPCWriteAction{ + Type: rhpv2.RPCWriteActionTrim, + A: uint64(len(batch)), + }) // check funds - price := rhpv2.RPCSectorRootsCost(settings, n) - if rev.RenterFunds().Cmp(price) < 0 { + proofSize := uint64(len(batch)) * 2 * uint64(bits.Len64(numSectors)) * 32 + if proofSize < minMessageSize { + proofSize = minMessageSize + } + + // calculate the cost + // + // TODO: switch out for exact cost calculations once it is added to core + cost = settings.BaseRPCPrice.Add(settings.DownloadBandwidthPrice.Mul64(proofSize)) + cost = cost.Mul64(125).Div64(100) // leeway + if rev.RenterFunds().Cmp(cost) < 0 { return ErrInsufficientFunds } // update the revision number - rev.Revision.RevisionNumber++ if rev.Revision.RevisionNumber == math.MaxUint64 { return ErrContractFinalized } + rev.Revision.RevisionNumber++ + + // update the revision filesize + rev.Revision.Filesize -= rhpv2.SectorSize * actions[len(actions)-1].A // update the revision outputs - newValid, newMissed, err := updateRevisionOutputs(&rev.Revision, price, types.ZeroCurrency) + newValid, newMissed, err := updateRevisionOutputs(&rev.Revision, cost, types.ZeroCurrency) if err != nil { return err } - // build the sector roots request - revisionHash := hashRevision(rev.Revision) - req := &rhpv2.RPCSectorRootsRequest{ - RootOffset: uint64(offset), - NumRoots: uint64(n), + // create request + wReq := &rhpv2.RPCWriteRequest{ + Actions: actions, + MerkleProof: true, RevisionNumber: rev.Revision.RevisionNumber, ValidProofValues: newValid, MissedProofValues: newMissed, - Signature: renterKey.SignHash(revisionHash), } - // execute the sector roots RPC - var rootsResp rhpv2.RPCSectorRootsResponse - if err := t.WriteRequest(rhpv2.RPCSectorRootsID, req); err != nil { + // send request and read merkle proof + var merkleResp rhpv2.RPCWriteMerkleProof + if err := t.WriteRequest(rhpv2.RPCWriteID, wReq); err != nil { return err - } else if err := t.ReadResponse(&rootsResp, uint64(4096+32*n)); err != nil { - return fmt.Errorf("couldn't read sector roots response: %w", err) + } else if err := t.ReadResponse(&merkleResp, minMessageSize+proofSize); err != nil { + return fmt.Errorf("couldn't read Merkle proof response, err: %v", err) } - // verify the host signature - if !hostKey.VerifyHash(revisionHash, rootsResp.Signature) { - return errors.New("host's signature is invalid") + // verify proof + proofHashes := merkleResp.OldSubtreeHashes + leafHashes := merkleResp.OldLeafHashes + oldRoot, newRoot := types.Hash256(rev.Revision.FileMerkleRoot), merkleResp.NewMerkleRoot + if rev.Revision.Filesize > 0 && !rhpv2.VerifyDiffProof(actions, numSectors, proofHashes, leafHashes, oldRoot, newRoot, nil) { + err := ErrInvalidMerkleProof + t.WriteResponseErr(err) + return err } - rev.Signatures[0].Signature = req.Signature[:] - rev.Signatures[1].Signature = rootsResp.Signature[:] - // verify the proof - if !rhpv2.VerifySectorRangeProof(rootsResp.MerkleProof, rootsResp.SectorRoots, offset, offset+n, numsectors, rev.Revision.FileMerkleRoot) { - return ErrInvalidMerkleProof + // update merkle root + copy(rev.Revision.FileMerkleRoot[:], newRoot[:]) + + // build the write response + revisionHash := hashRevision(rev.Revision) + renterSig := &rhpv2.RPCWriteResponse{ + Signature: renterKey.SignHash(revisionHash), } - // append roots - roots = append(roots, rootsResp.SectorRoots...) - offset += n + // exchange signatures + var hostSig rhpv2.RPCWriteResponse + if err := t.WriteResponse(renterSig); err != nil { + return fmt.Errorf("couldn't write signature response: %w", err) + } else if err := t.ReadResponse(&hostSig, minMessageSize); err != nil { + return fmt.Errorf("couldn't read signature response, err: %v", err) + } - // TODO: record contract spending + // verify the host signature + if !rev.HostKey().VerifyHash(revisionHash, hostSig.Signature) { + return errors.New("host's signature is invalid") + } + rev.Signatures[0].Signature = renterSig.Signature[:] + rev.Signatures[1].Signature = hostSig.Signature[:] + + // update total cost + totalCost = totalCost.Add(cost) + deleted += uint64(len(batch)) + return nil + }(); err != nil { + return } - return nil + } + return +} + +func (w *worker) FetchContractRoots(ctx context.Context, hostIP string, hostKey types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64) (roots []types.Hash256, err error) { + err = w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) error { + return w.withRevisionV2(ctx, defaultLockTimeout, t, hostKey, fcid, lastKnownRevisionNumber, func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) (err error) { + roots, err = w.fetchContractRoots(t, &rev, settings) + return + }) }) return } + +func (w *worker) fetchContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings) (roots []types.Hash256, _ error) { + // derive the renter key + renterKey := w.deriveRenterKey(rev.HostKey()) + + // record contract spending + var totalCost types.Currency + defer func() { + w.contractSpendingRecorder.Record(rev.ID(), rev.Revision.RevisionNumber, rev.Revision.Filesize, api.ContractSpending{SectorRoots: totalCost}) + }() + + // download the full set of SectorRoots + numsectors := rev.NumSectors() + for offset := uint64(0); offset < numsectors; { + n := batchSizeFetchSectors + if offset+n > numsectors { + n = numsectors - offset + } + + // check funds + cost := rhpv2.RPCSectorRootsCost(settings, n) + if rev.RenterFunds().Cmp(cost) < 0 { + return nil, ErrInsufficientFunds + } + + // update the revision number + if rev.Revision.RevisionNumber == math.MaxUint64 { + return nil, ErrContractFinalized + } + rev.Revision.RevisionNumber++ + + // update the revision outputs + newValid, newMissed, err := updateRevisionOutputs(&rev.Revision, cost, types.ZeroCurrency) + if err != nil { + return nil, err + } + + // build the sector roots request + revisionHash := hashRevision(rev.Revision) + req := &rhpv2.RPCSectorRootsRequest{ + RootOffset: uint64(offset), + NumRoots: uint64(n), + + RevisionNumber: rev.Revision.RevisionNumber, + ValidProofValues: newValid, + MissedProofValues: newMissed, + Signature: renterKey.SignHash(revisionHash), + } + + // calculate the proof size + proofSize := rhpv2.RangeProofSize(rev.NumSectors(), offset, n) + + // execute the sector roots RPC + var rootsResp rhpv2.RPCSectorRootsResponse + if err := t.WriteRequest(rhpv2.RPCSectorRootsID, req); err != nil { + return nil, err + } else if err := t.ReadResponse(&rootsResp, uint64(minMessageSize+proofSize+32*n)); err != nil { + return nil, fmt.Errorf("couldn't read sector roots response: %w", err) + } + + // verify the host signature + if !rev.HostKey().VerifyHash(revisionHash, rootsResp.Signature) { + return nil, errors.New("host's signature is invalid") + } + rev.Signatures[0].Signature = req.Signature[:] + rev.Signatures[1].Signature = rootsResp.Signature[:] + + // verify the proof + if !rhpv2.VerifySectorRangeProof(rootsResp.MerkleProof, rootsResp.SectorRoots, offset, offset+n, numsectors, rev.Revision.FileMerkleRoot) { + return nil, ErrInvalidMerkleProof + } + + // update the total cost + totalCost = totalCost.Add(cost) + + // append roots + roots = append(roots, rootsResp.SectorRoots...) + offset += n + } + return +} + +func (w *worker) withRevisionV2(ctx context.Context, lockTimeout time.Duration, t *rhpv2.Transport, hk types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64, fn func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) error) error { + renterKey := w.deriveRenterKey(hk) + + // execute lock RPC + var lockResp rhpv2.RPCLockResponse + err := t.Call(rhpv2.RPCLockID, &rhpv2.RPCLockRequest{ + ContractID: fcid, + Signature: t.SignChallenge(renterKey), + Timeout: uint64(lockTimeout.Milliseconds()), + }, &lockResp) + if err != nil { + return err + } + + // set transport challenge + t.SetChallenge(lockResp.NewChallenge) + + // defer unlock RPC + defer t.WriteRequest(rhpv2.RPCUnlockID, nil) + + // convenience variables + revision := lockResp.Revision + sigs := lockResp.Signatures + + // sanity check the signature + var sig types.Signature + copy(sig[:], sigs[0].Signature) + if !renterKey.PublicKey().VerifyHash(hashRevision(revision), sig) { + return fmt.Errorf("unexpected renter signature on revision host revision") + } + + // sanity check the revision number is not lower than our last known + // revision number, host might be slipping us an outdated revision + if revision.RevisionNumber < lastKnownRevisionNumber { + return fmt.Errorf("unexpected revision number, %v!=%v", revision.RevisionNumber, lastKnownRevisionNumber) + } + + // extract the revision + rev := rhpv2.ContractRevision{ + Revision: revision, + Signatures: [2]types.TransactionSignature{sigs[0], sigs[1]}, + } + + // execute settings RPC + var settingsResp rhpv2.RPCSettingsResponse + if err := t.Call(rhpv2.RPCSettingsID, nil, &settingsResp); err != nil { + return err + } + var settings rhpv2.HostSettings + if err := json.Unmarshal(settingsResp.Settings, &settings); err != nil { + return fmt.Errorf("couldn't unmarshal json: %w", err) + } + + return fn(t, rev, settings) +} + +func humanReadableSize(b int) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %ciB", + float64(b)/float64(div), "KMGTPE"[exp]) +} diff --git a/worker/upload.go b/worker/upload.go index 4a64ddcd3..674980b33 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -8,7 +8,6 @@ import ( "math" "sort" "sync" - "sync/atomic" "time" "github.com/montanaflynn/stats" @@ -221,7 +220,7 @@ func (mgr *uploadManager) Migrate(ctx context.Context, shards [][]byte, contract if err != nil { return nil, err } - defer finishFn(ctx) + defer finishFn() // upload the shards return upload.uploadShards(ctx, shards, nil) @@ -286,7 +285,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.Redund if err != nil { return object.Object{}, nil, err } - defer finishFn(ctx) + defer finishFn() // create the next slab channel nextSlabChan := make(chan struct{}, 1) @@ -296,7 +295,6 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, rs api.Redund respChan := make(chan slabUploadResponse) // collect the responses - var ongoingUploads uint64 var responses []slabUploadResponse var slabIndex int numSlabs := -1 @@ -336,12 +334,8 @@ loop: <-nextSlabChan // trigger next iteration } else { // Otherwise we upload it. - atomic.AddUint64(&ongoingUploads, 1) go func(rs api.RedundancySettings, data []byte, length, slabIndex int) { u.uploadSlab(ctx, rs, data, length, slabIndex, respChan, nextSlabChan) - if atomic.AddUint64(&ongoingUploads, ^uint64(0)) == 0 { - close(respChan) - } }(rs, data, length, slabIndex) } slabIndex++ @@ -383,7 +377,7 @@ func (mgr *uploadManager) launch(req *sectorUploadReq) error { return nil } -func (mgr *uploadManager) newUpload(ctx context.Context, totalShards int, contracts []api.ContractMetadata, bh uint64) (*upload, func(context.Context), error) { +func (mgr *uploadManager) newUpload(ctx context.Context, totalShards int, contracts []api.ContractMetadata, bh uint64) (*upload, func(), error) { mgr.mu.Lock() defer mgr.mu.Unlock() @@ -392,7 +386,7 @@ func (mgr *uploadManager) newUpload(ctx context.Context, totalShards int, contra // check if we have enough contracts if len(contracts) < totalShards { - return nil, func(_ context.Context) {}, errNotEnoughContracts + return nil, func() {}, errNotEnoughContracts } // create allowed map @@ -408,7 +402,9 @@ func (mgr *uploadManager) newUpload(ctx context.Context, totalShards int, contra } // create a finish function to finish the upload - finishFn := func(ctx context.Context) { + finishFn := func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() if err := mgr.b.FinishUpload(ctx, id); err != nil { mgr.logger.Errorf("failed to mark upload %v as finished: %v", id, err) } diff --git a/worker/worker.go b/worker/worker.go index 4e48e983a..69518d61a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -38,8 +38,10 @@ import ( ) const ( - batchSizeFetchSectors = uint64(130000) // ~4MiB of roots + batchSizeDeleteSectors = uint64(500000) // ~16MiB of roots + batchSizeFetchSectors = uint64(130000) // ~4MiB of roots + defaultLockTimeout = time.Minute defaultRevisionFetchTimeout = 30 * time.Second lockingPriorityActiveContractRevision = 100 // highest @@ -47,6 +49,7 @@ const ( lockingPriorityPriceTable = 60 lockingPriorityFunding = 40 lockingPrioritySyncing = 20 + lockingPriorityPruning = 10 lockingPriorityUpload = 1 // lowest ) @@ -143,6 +146,7 @@ type Bus interface { BroadcastTransaction(ctx context.Context, txns []types.Transaction) error Contract(ctx context.Context, id types.FileContractID) (api.ContractMetadata, error) + ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error) ContractRoots(ctx context.Context, id types.FileContractID) ([]types.Hash256, []types.Hash256, error) Contracts(ctx context.Context) ([]api.ContractMetadata, error) ContractSetContracts(ctx context.Context, set string) ([]api.ContractMetadata, error) @@ -322,9 +326,8 @@ func (w *worker) newHostV3(contractID types.FileContractID, hostKey types.Public } } -func (w *worker) withRevision(ctx context.Context, fetchTimeout time.Duration, contractID types.FileContractID, hk types.PublicKey, siamuxAddr string, lockPriority int, blockHeight uint64, fn func(rev types.FileContractRevision) error) error { - // lock the revision for the duration of the operation. - contractLock, err := w.acquireRevision(ctx, contractID, lockPriority) +func (w *worker) withContractLock(ctx context.Context, fcid types.FileContractID, priority int, fn func() error) error { + contractLock, err := w.acquireContractLock(ctx, fcid, priority) if err != nil { return err } @@ -334,13 +337,18 @@ func (w *worker) withRevision(ctx context.Context, fetchTimeout time.Duration, c cancel() }() - h := w.newHostV3(contractID, hk, siamuxAddr) - rev, err := h.FetchRevision(ctx, fetchTimeout, blockHeight) - if err != nil { - return err - } + return fn() +} - return fn(rev) +func (w *worker) withRevision(ctx context.Context, fetchTimeout time.Duration, contractID types.FileContractID, hk types.PublicKey, siamuxAddr string, lockPriority int, blockHeight uint64, fn func(rev types.FileContractRevision) error) error { + return w.withContractLock(ctx, contractID, lockPriority, func() error { + h := w.newHostV3(contractID, hk, siamuxAddr) + rev, err := h.FetchRevision(ctx, fetchTimeout, blockHeight) + if err != nil { + return err + } + return fn(rev) + }) } func (w *worker) rhpScanHandler(jc jape.Context) { @@ -561,7 +569,7 @@ func (w *worker) rhpBroadcastHandler(jc jape.Context) { // Acquire lock before fetching revision. ctx := jc.Request.Context() - unlocker, err := w.acquireRevision(ctx, fcid, lockingPriorityActiveContractRevision) + unlocker, err := w.acquireContractLock(ctx, fcid, lockingPriorityActiveContractRevision) if jc.Check("could not acquire revision lock", err) != nil { return } @@ -606,6 +614,70 @@ func (w *worker) rhpBroadcastHandler(jc jape.Context) { } } +func (w *worker) rhpPruneContractHandlerPOST(jc jape.Context) { + // decode fcid + var fcid types.FileContractID + if jc.DecodeParam("id", &fcid) != nil { + return + } + + // decode timeout + var pcr api.RHPPruneContractRequest + if jc.Decode(&pcr) != nil { + return + } + + // apply timeout + ctx := jc.Request.Context() + if pcr.Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(jc.Request.Context(), pcr.Timeout) + defer cancel() + } + + // attach gouging checker + gp, err := w.bus.GougingParams(ctx) + if jc.Check("could not get gouging parameters", err) != nil { + return + } + ctx = WithGougingChecker(ctx, w.bus, gp) + + // fetch the contract from the bus + contract, err := w.bus.Contract(ctx, fcid) + if errors.Is(err, api.ErrContractNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if jc.Check("couldn't fetch contract", err) != nil { + return + } + + // return early if there's no data to prune + size, err := w.bus.ContractSize(ctx, fcid) + if jc.Check("couldn't fetch contract size", err) != nil { + return + } else if size.Prunable == 0 { + jc.Encode(api.RHPPruneContractResponse{ + Pruned: 0, + Remaining: 0, + }) + return + } + + // prune the contract + pruned, remaining, err := w.PruneContract(ctx, contract.HostIP, contract.HostKey, fcid, contract.RevisionNumber) + if err == nil || (errors.Is(err, context.Canceled) && pruned > 0) { + jc.Encode(api.RHPPruneContractResponse{ + Pruned: pruned, + Remaining: remaining, + }) + } else { + if pruned > 0 { + err = fmt.Errorf("%w; couldn't prune all sectors (%d/%d)", err, pruned, pruned+remaining) + } + jc.Error(err, http.StatusInternalServerError) + } +} + func (w *worker) rhpContractRootsHandlerGET(jc jape.Context) { // decode fcid var id types.FileContractID @@ -624,8 +696,7 @@ func (w *worker) rhpContractRootsHandlerGET(jc jape.Context) { } // fetch the roots from the host - renterKey := w.deriveRenterKey(c.HostKey) - roots, err := w.FetchContractRoots(ctx, c.HostIP, c.HostKey, renterKey, id, c.RevisionNumber, time.Minute) + roots, err := w.FetchContractRoots(ctx, c.HostIP, c.HostKey, id, c.RevisionNumber) if jc.Check("couldn't fetch contract roots from host", err) == nil { jc.Encode(roots) } @@ -1250,6 +1321,7 @@ func (w *worker) Handler() http.Handler { "GET /rhp/contracts": w.rhpContractsHandlerGET, "POST /rhp/contract/:id/broadcast": w.rhpBroadcastHandler, + "POST /rhp/contract/:id/prune": w.rhpPruneContractHandlerPOST, "GET /rhp/contract/:id/roots": w.rhpContractRootsHandlerGET, "POST /rhp/scan": w.rhpScanHandler, "POST /rhp/form": w.rhpFormHandler, @@ -1369,7 +1441,7 @@ func (cl *contractLock) keepaliveLoop() { } } -func (w *worker) acquireRevision(ctx context.Context, fcid types.FileContractID, priority int) (_ revisionUnlocker, err error) { +func (w *worker) acquireContractLock(ctx context.Context, fcid types.FileContractID, priority int) (_ revisionUnlocker, err error) { lockID, err := w.bus.AcquireContract(ctx, fcid, priority, w.contractLockingDuration) if err != nil { return nil, err