diff --git a/api/contract.go b/api/contract.go index ea90c722f..19da16a2d 100644 --- a/api/contract.go +++ b/api/contract.go @@ -161,6 +161,21 @@ type ( LockID uint64 `json:"lockID"` } + // ContractPruneRequest is the request type for the /contract/:id/prune + // endpoint. + ContractPruneRequest struct { + Timeout DurationMS `json:"timeout"` + } + + // ContractPruneResponse is the response type for the /contract/:id/prune + // endpoint. + ContractPruneResponse struct { + ContractSize uint64 `json:"size"` + Pruned uint64 `json:"pruned"` + Remaining uint64 `json:"remaining"` + Error string `json:"error,omitempty"` + } + // ContractAcquireRequest is the request type for the /contract/:id/release // endpoint. ContractReleaseRequest struct { @@ -211,6 +226,11 @@ type ( } ) +// Total returns the total cost of the contract spending. +func (x ContractSpending) Total() types.Currency { + return x.Uploads.Add(x.Downloads).Add(x.FundAccount).Add(x.Deletions).Add(x.SectorRoots) +} + // Add returns the sum of the current and given contract spending. func (x ContractSpending) Add(y ContractSpending) (z ContractSpending) { z.Uploads = x.Uploads.Add(y.Uploads) diff --git a/api/worker.go b/api/worker.go index ab4aec5dd..d1c18b61b 100644 --- a/api/worker.go +++ b/api/worker.go @@ -87,18 +87,12 @@ type ( TransactionSet []types.Transaction `json:"transactionSet"` } - // RHPPruneContractRequest is the request type for the /rhp/contract/:id/prune - // endpoint. - RHPPruneContractRequest struct { - Timeout DurationMS `json:"timeout"` - } - - // RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune - // endpoint. - RHPPruneContractResponse struct { - Pruned uint64 `json:"pruned"` - Remaining uint64 `json:"remaining"` - Error string `json:"error,omitempty"` + // RHPFundRequest is the request type for the /rhp/fund endpoint. + RHPFundRequest struct { + ContractID types.FileContractID `json:"contractID"` + HostKey types.PublicKey `json:"hostKey"` + SiamuxAddr string `json:"siamuxAddr"` + Balance types.Currency `json:"balance"` } // RHPPriceTableRequest is the request type for the /rhp/pricetable endpoint. diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index c54fefa6c..f7d06d504 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -48,6 +48,7 @@ type Bus interface { RenewContract(ctx context.Context, fcid types.FileContractID, endHeight uint64, renterFunds, minNewCollateral, maxFundAmount types.Currency, expectedNewStorage uint64) (api.ContractMetadata, error) SetContractSet(ctx context.Context, set string, contracts []types.FileContractID) error PrunableData(ctx context.Context) (prunableData api.ContractsPrunableDataResponse, err error) + PruneContract(ctx context.Context, id types.FileContractID, timeout time.Duration) (api.ContractPruneResponse, error) // hostdb Host(ctx context.Context, hostKey types.PublicKey) (api.Host, error) @@ -322,7 +323,7 @@ func (ap *Autopilot) Run() { // pruning if autopilot.Config.Contracts.Prune { - ap.tryPerformPruning(ap.workers) + ap.tryPerformPruning() } else { ap.logger.Info("pruning disabled") } @@ -670,6 +671,9 @@ func (ap *Autopilot) configHandlerPUT(jc jape.Context) { autopilot, err := ap.bus.Autopilot(jc.Request.Context(), ap.id) if utils.IsErr(err, api.ErrAutopilotNotFound) { autopilot = api.Autopilot{ID: ap.id, Config: cfg} + } else if err != nil { + jc.Error(err, http.StatusInternalServerError) + return } else { if autopilot.Config.Contracts.Set != cfg.Contracts.Set { contractSetChanged = true diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index 7822fb326..0189430ec 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -105,15 +105,17 @@ func (ap *Autopilot) fetchHostContract(fcid types.FileContractID) (host api.Host return } -func (ap *Autopilot) performContractPruning(wp *workerPool) { - ap.logger.Info("performing contract pruning") +func (ap *Autopilot) performContractPruning() { + log := ap.logger.Named("performContractPruning") + log.Info("performing contract pruning") // fetch prunable contracts prunable, err := ap.fetchPrunableContracts() if err != nil { - ap.logger.Error(err) + log.Error(err) return } + log.Debugf("found %d prunable contracts", len(prunable)) // dismiss alerts for contracts that are no longer prunable ap.dismissPruneAlerts(prunable) @@ -129,39 +131,70 @@ func (ap *Autopilot) performContractPruning(wp *workerPool) { // fetch host h, _, err := ap.fetchHostContract(contract.ID) if utils.IsErr(err, api.ErrContractNotFound) { + log.Debugw("contract got archived", "contract", contract.ID) continue // contract got archived } else if err != nil { - ap.logger.Errorf("failed to fetch host for contract '%v', err %v", contract.ID, err) + log.Errorw("failed to fetch host", zap.Error(err), "contract", contract.ID) continue } - // prune contract using a random worker - wp.withWorker(func(w Worker) { - total += ap.pruneContract(w, contract.ID, h.PublicKey, h.Settings.Version, h.Settings.Release) - }) + // prune contract + n, err := ap.pruneContract(ap.shutdownCtx, contract.ID, h.PublicKey, h.Settings.Version, h.Settings.Release, log) + if err != nil { + log.Errorw("failed to prune contract", zap.Error(err), "contract", contract.ID) + continue + } + + // handle alerts + ap.mu.Lock() + alertID := alerts.IDForContract(alertPruningID, contract.ID) + if shouldSendPruneAlert(err, h.Settings.Version, h.Settings.Release) { + ap.RegisterAlert(ap.shutdownCtx, newContractPruningFailedAlert(h.PublicKey, h.Settings.Version, h.Settings.Release, contract.ID, err)) + ap.pruningAlertIDs[contract.ID] = alertID // store id to dismiss stale alerts + } else { + ap.DismissAlert(ap.shutdownCtx, alertID) + delete(ap.pruningAlertIDs, contract.ID) + } + ap.mu.Unlock() + + // adjust total + total += n } // log total pruned - ap.logger.Info(fmt.Sprintf("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(prunable))) + log.Info(fmt.Sprintf("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(prunable))) } -func (ap *Autopilot) pruneContract(w Worker, fcid types.FileContractID, hk types.PublicKey, hostVersion, hostRelease string) uint64 { - // use a sane timeout - ctx, cancel := context.WithTimeout(ap.shutdownCtx, timeoutPruneContract+5*time.Minute) - defer cancel() +func (ap *Autopilot) pruneContract(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, hostVersion, hostRelease string, logger *zap.SugaredLogger) (uint64, error) { + // define logger + log := logger.With( + zap.Stringer("contract", fcid), + zap.Stringer("host", hk), + zap.String("version", hostVersion), + zap.String("release", hostRelease)) // prune the contract start := time.Now() - pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract) - duration := time.Since(start) + res, err := ap.bus.PruneContract(ctx, fcid, timeoutPruneContract) + if err != nil { + return 0, err + } + + // decorate logger + log = log.With( + zap.String("pruned", utils.HumanReadableSize(int(res.Pruned))), + zap.String("remaining", utils.HumanReadableSize(int(res.Remaining))), + zap.String("size", utils.HumanReadableSize(int(res.ContractSize))), + zap.Duration("elapsed", time.Since(start)), + ) // ignore slow pruning until host network is 1.6.0+ - if utils.IsErr(err, context.DeadlineExceeded) && pruned > 0 { - err = nil + if res.Error != "" && utils.IsErr(errors.New(res.Error), context.DeadlineExceeded) && res.Pruned > 0 { + res.Error = "" } // handle metrics - if err == nil || pruned > 0 { + if res.Pruned > 0 { if err := ap.bus.RecordContractPruneMetric(ctx, api.ContractPruneMetric{ Timestamp: api.TimeRFC3339(start), @@ -169,40 +202,25 @@ func (ap *Autopilot) pruneContract(w Worker, fcid types.FileContractID, hk types HostKey: hk, HostVersion: hostVersion, - Pruned: pruned, - Remaining: remaining, - Duration: duration, + Pruned: res.Pruned, + Remaining: res.Remaining, + Duration: time.Since(start), }); err != nil { ap.logger.Error(err) } } // handle logs - log := ap.logger.With("contract", fcid, "host", hk, "version", hostVersion, "release", hostRelease, "pruned", pruned, "remaining", remaining, "elapsed", duration) - if err != nil && pruned > 0 { - log.With(zap.Error(err)).Error("unexpected error interrupted pruning") - } else if err != nil { - log.With(zap.Error(err)).Error("failed to prune contract") + if res.Error != "" { + log.Errorw("unexpected error interrupted pruning", zap.Error(errors.New(res.Error))) } else { log.Info("successfully pruned contract") } - // handle alerts - ap.mu.Lock() - defer ap.mu.Unlock() - alertID := alerts.IDForContract(alertPruningID, fcid) - if shouldSendPruneAlert(err, hostVersion, hostRelease) { - ap.RegisterAlert(ctx, newContractPruningFailedAlert(hk, hostVersion, hostRelease, fcid, err)) - ap.pruningAlertIDs[fcid] = alertID // store id to dismiss stale alerts - } else { - ap.DismissAlert(ctx, alertID) - delete(ap.pruningAlertIDs, fcid) - } - - return pruned + return res.Pruned, nil } -func (ap *Autopilot) tryPerformPruning(wp *workerPool) { +func (ap *Autopilot) tryPerformPruning() { ap.mu.Lock() if ap.pruning || ap.isStopped() { ap.mu.Unlock() @@ -215,7 +233,7 @@ func (ap *Autopilot) tryPerformPruning(wp *workerPool) { ap.wg.Add(1) go func() { defer ap.wg.Done() - ap.performContractPruning(wp) + ap.performContractPruning() ap.mu.Lock() ap.pruning = false ap.mu.Unlock() diff --git a/autopilot/workerpool.go b/autopilot/workerpool.go index 7220f7f4a..11bcfa09b 100644 --- a/autopilot/workerpool.go +++ b/autopilot/workerpool.go @@ -20,7 +20,6 @@ type Worker interface { RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (api.HostPriceTable, error) - RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) } diff --git a/bus/bus.go b/bus/bus.go index d4a43a8ef..bc0ff9804 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -43,9 +43,12 @@ const ( defaultWalletRecordMetricInterval = 5 * time.Minute defaultPinUpdateInterval = 5 * time.Minute defaultPinRateWindow = 6 * time.Hour - lockingPriorityFunding = 40 - lockingPriorityRenew = 80 - stdTxnSize = 1200 // bytes + + lockingPriorityPruning = 20 + lockingPriorityFunding = 40 + lockingPriorityRenew = 80 + + stdTxnSize = 1200 // bytes ) // Client re-exports the client from the client package. @@ -223,6 +226,7 @@ type ( ContractRoots(ctx context.Context, id types.FileContractID) ([]types.Hash256, error) ContractSizes(ctx context.Context) (map[types.FileContractID]api.ContractSize, error) ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error) + PrunableContractRoots(ctx context.Context, id types.FileContractID, roots []types.Hash256) ([]uint64, error) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error) @@ -421,6 +425,7 @@ func (b *Bus) Handler() http.Handler { "POST /contract/:id/acquire": b.contractAcquireHandlerPOST, "GET /contract/:id/ancestors": b.contractIDAncestorsHandler, "POST /contract/:id/keepalive": b.contractKeepaliveHandlerPOST, + "POST /contract/:id/prune": b.contractPruneHandlerPOST, "POST /contract/:id/renew": b.contractIDRenewHandlerPOST, "POST /contract/:id/renewed": b.contractIDRenewedHandlerPOST, "POST /contract/:id/release": b.contractReleaseHandlerPOST, diff --git a/bus/client/contracts.go b/bus/client/contracts.go index a831cb8e7..6929c8af9 100644 --- a/bus/client/contracts.go +++ b/bus/client/contracts.go @@ -160,6 +160,12 @@ func (c *Client) PrunableData(ctx context.Context) (prunableData api.ContractsPr return } +// PruneContract prunes the given contract. +func (c *Client) PruneContract(ctx context.Context, contractID types.FileContractID, timeout time.Duration) (res api.ContractPruneResponse, err error) { + err = c.c.WithContext(ctx).POST(fmt.Sprintf("/contract/%s/prune", contractID), api.ContractPruneRequest{Timeout: api.DurationMS(timeout)}, &res) + return +} + // RenewContract renews an existing contract with a host and adds it to the bus. func (c *Client) RenewContract(ctx context.Context, contractID types.FileContractID, endHeight uint64, renterFunds, minNewCollateral, maxFundAmount types.Currency, expectedStorage uint64) (renewal api.ContractMetadata, err error) { req := api.ContractRenewRequest{ diff --git a/bus/routes.go b/bus/routes.go index 92fe0843b..26a530ef7 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -19,6 +19,7 @@ import ( ibus "go.sia.tech/renterd/internal/bus" "go.sia.tech/renterd/internal/gouging" + rhp2 "go.sia.tech/renterd/internal/rhp/v2" "go.sia.tech/core/gateway" "go.sia.tech/core/types" @@ -863,6 +864,117 @@ func (b *Bus) contractKeepaliveHandlerPOST(jc jape.Context) { } } +func (b *Bus) contractPruneHandlerPOST(jc jape.Context) { + ctx := jc.Request.Context() + + // decode fcid + var fcid types.FileContractID + if jc.DecodeParam("id", &fcid) != nil { + return + } + + // decode timeout + var req api.ContractPruneRequest + if jc.Decode(&req) != nil { + return + } + + // create gouging checker + gp, err := b.gougingParams(ctx) + if jc.Check("couldn't fetch gouging parameters", err) != nil { + return + } + gc := gouging.NewChecker(gp.GougingSettings, gp.ConsensusState, gp.TransactionFee, nil, nil) + + // apply timeout + pruneCtx := ctx + if req.Timeout > 0 { + var cancel context.CancelFunc + pruneCtx, cancel = context.WithTimeout(ctx, time.Duration(req.Timeout)) + defer cancel() + } + + // acquire contract lock indefinitely and defer the release + lockID, err := b.contractLocker.Acquire(pruneCtx, lockingPriorityPruning, fcid, time.Duration(math.MaxInt64)) + if jc.Check("couldn't acquire contract lock", err) != nil { + return + } + defer func() { + if err := b.contractLocker.Release(fcid, lockID); err != nil { + b.logger.Error("failed to release contract lock", zap.Error(err)) + } + }() + + // fetch the contract from the bus + c, err := b.ms.Contract(ctx, fcid) + if errors.Is(err, api.ErrContractNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if jc.Check("couldn't fetch contract", err) != nil { + return + } + + // build map of uploading sectors + pending := make(map[types.Hash256]struct{}) + for _, root := range b.sectors.Sectors(fcid) { + pending[root] = struct{}{} + } + + // prune the contract + rev, spending, pruned, remaining, err := b.rhp2.PruneContract(pruneCtx, b.deriveRenterKey(c.HostKey), gc, c.HostIP, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) { + indices, err := b.ms.PrunableContractRoots(ctx, fcid, roots) + if err != nil { + return nil, err + } else if len(indices) > len(roots) { + return nil, fmt.Errorf("selected %d prunable roots but only %d were provided", len(indices), len(roots)) + } + + filtered := indices[:0] + for _, index := range indices { + _, ok := pending[roots[index]] + if !ok { + filtered = append(filtered, index) + } + } + indices = filtered + return indices, nil + }) + + if errors.Is(err, rhp2.ErrNoSectorsToPrune) { + err = nil // ignore error + } else if !errors.Is(err, context.Canceled) { + if jc.Check("couldn't prune contract", err) != nil { + return + } + } + + // record spending + if !spending.Total().IsZero() { + b.ms.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{ + { + ContractSpending: spending, + ContractID: fcid, + RevisionNumber: rev.RevisionNumber, + Size: rev.Filesize, + + MissedHostPayout: rev.MissedHostPayout(), + ValidRenterPayout: rev.ValidRenterPayout(), + }, + }) + } + + // return response + res := api.ContractPruneResponse{ + ContractSize: rev.Filesize, + Pruned: pruned, + Remaining: remaining, + } + if err != nil { + res.Error = err.Error() + } + jc.Encode(res) +} + func (b *Bus) contractsPrunableDataHandlerGET(jc jape.Context) { sizes, err := b.ms.ContractSizes(jc.Request.Context()) if jc.Check("failed to fetch contract sizes", err) != nil { diff --git a/internal/gouging/gouging.go b/internal/gouging/gouging.go index 8e729247d..aadfdd57f 100644 --- a/internal/gouging/gouging.go +++ b/internal/gouging/gouging.go @@ -32,7 +32,7 @@ const ( ) var ( - errHostSettingsGouging = errors.New("host settings gouging detected") + ErrHostSettingsGouging = errors.New("host settings gouging detected") ErrPriceTableGouging = errors.New("price table gouging detected") ) @@ -243,7 +243,7 @@ func checkContractGougingRHPv2(period, renewWindow *uint64, hs *rhpv2.HostSettin err = checkContractGouging(*period, *renewWindow, hs.MaxDuration, hs.WindowSize) if err != nil { - err = fmt.Errorf("%w: %v", errHostSettingsGouging, err) + err = fmt.Errorf("%w: %v", ErrHostSettingsGouging, err) } return } @@ -290,14 +290,14 @@ func checkPruneGougingRHPv2(gs api.GougingSettings, hs *rhpv2.HostSettings) erro hs.UploadBandwidthPrice, ) if overflow { - return fmt.Errorf("%w: overflow detected when computing sector download price", errHostSettingsGouging) + return fmt.Errorf("%w: overflow detected when computing sector download price", ErrHostSettingsGouging) } dpptb, overflow := sectorDownloadPrice.Mul64WithOverflow(uint64(bytesPerTB) / rhpv2.SectorSize) // sectors per TB if overflow { - return fmt.Errorf("%w: overflow detected when computing download price per TiB", errHostSettingsGouging) + return fmt.Errorf("%w: overflow detected when computing download price per TiB", ErrHostSettingsGouging) } if !gs.MaxDownloadPrice.IsZero() && dpptb.Cmp(gs.MaxDownloadPrice) > 0 { - return fmt.Errorf("%w: cost per TiB exceeds max dl price: %v > %v", errHostSettingsGouging, dpptb, gs.MaxDownloadPrice) + return fmt.Errorf("%w: cost per TiB exceeds max dl price: %v > %v", ErrHostSettingsGouging, dpptb, gs.MaxDownloadPrice) } return nil } diff --git a/internal/rhp/v2/rhp.go b/internal/rhp/v2/rhp.go index c2454c13d..14c2b48c7 100644 --- a/internal/rhp/v2/rhp.go +++ b/internal/rhp/v2/rhp.go @@ -13,6 +13,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/gouging" "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" @@ -67,6 +68,10 @@ var ( ErrNoSectorsToPrune = errors.New("no sectors to prune") ) +type ( + PrunableRootsFn = func(fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) +) + type ( Dialer interface { Dial(ctx context.Context, hk types.PublicKey, address string) (net.Conn, error) @@ -163,50 +168,34 @@ func (c *Client) FormContract(ctx context.Context, hostKey types.PublicKey, host return } -func (c *Client) PruneContract(ctx context.Context, renterKey types.PrivateKey, gougingChecker gouging.Checker, hostIP string, hostKey types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64, toKeep []types.Hash256) (revision *types.FileContractRevision, deleted, remaining uint64, cost types.Currency, err error) { +func (c *Client) PruneContract(ctx context.Context, renterKey types.PrivateKey, gougingChecker gouging.Checker, hostIP string, hostKey types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64, diffRootsFn PrunableRootsFn) (revision *types.FileContractRevision, spending api.ContractSpending, deleted, remaining uint64, err error) { + log := c.logger.Named("performContractPruning") err = c.withTransport(ctx, hostKey, hostIP, func(t *rhpv2.Transport) error { return c.withRevisionV2(renterKey, gougingChecker, t, fcid, lastKnownRevisionNumber, func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) (err error) { - // fetch roots - got, fetchCost, err := c.fetchContractRoots(t, renterKey, &rev, settings) - if err != nil { - return err - } - - // update cost and revision - cost = cost.Add(fetchCost) + // reference the revision revision = &rev.Revision - keep := make(map[types.Hash256]struct{}) - for _, root := range toKeep { - keep[root] = struct{}{} - } - - // collect indices for roots we want to prune + // fetch roots to delete var indices []uint64 - for i, root := range got { - if _, wanted := keep[root]; wanted { - delete(keep, root) // prevent duplicates - continue - } - indices = append(indices, uint64(i)) - } - if len(indices) == 0 { - return fmt.Errorf("%w: database holds %d, contract contains %d", ErrNoSectorsToPrune, len(toKeep), len(got)) + indices, spending.SectorRoots, err = c.prunableContractRoots(t, renterKey, &rev, settings, func(fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) { + startt := time.Now() + defer func() { + log.Debugf("batch diff roots took %v", time.Since(startt)) + }() + return diffRootsFn(fcid, roots) + }) + if err != nil { + return err + } else if len(indices) == 0 { + return ErrNoSectorsToPrune } // delete the roots from the contract - var deleteCost types.Currency - deleted, deleteCost, err = c.deleteContractRoots(t, renterKey, &rev, settings, indices) + deleted, spending.Deletions, err = c.deleteContractRoots(t, renterKey, &rev, settings, indices) if deleted < uint64(len(indices)) { remaining = uint64(len(indices)) - deleted } - // update cost and revision - if deleted > 0 { - cost = cost.Add(deleteCost) - revision = &rev.Revision - } - // return sizes instead of number of roots deleted *= rhpv2.SectorSize remaining *= rhpv2.SectorSize @@ -403,99 +392,144 @@ func (c *Client) deleteContractRoots(t *rhpv2.Transport, renterKey types.Private return } -func (c *Client) fetchContractRoots(t *rhpv2.Transport, renterKey types.PrivateKey, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings) (roots []types.Hash256, cost types.Currency, _ error) { - // download the full set of SectorRoots +func (c *Client) prunableContractRoots(t *rhpv2.Transport, renterKey types.PrivateKey, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings, prunableRootsFn PrunableRootsFn) (indices []uint64, cost types.Currency, _ error) { numsectors := rev.NumSectors() for offset := uint64(0); offset < numsectors; { + // calculate the batch size n := batchSizeFetchSectors if offset+n > numsectors { n = numsectors - offset } - // calculate the cost - batchCost, _ := settings.RPCSectorRootsCost(offset, n).Total() - - // TODO: remove once host network is updated - if utils.VersionCmp(settings.Version, "1.6.0") < 0 { - // calculate the response size - proofSize := rhpv2.RangeProofSize(numsectors, offset, offset+n) - responseSize := (proofSize + n) * 32 - if responseSize < minMessageSize { - responseSize = minMessageSize - } - batchCost = settings.BaseRPCPrice.Add(settings.DownloadBandwidthPrice.Mul64(responseSize)) - batchCost = batchCost.Mul64(2) // generous leeway - } - - // check funds - if rev.RenterFunds().Cmp(batchCost) < 0 { - return nil, types.ZeroCurrency, ErrInsufficientFunds - } - - // update the revision number - if rev.Revision.RevisionNumber == math.MaxUint64 { - return nil, types.ZeroCurrency, ErrContractFinalized + // fetch the batch + batch, batchCost, err := c.fetchContractRootsBatch(t, renterKey, rev, settings, offset, n) + if err != nil { + return nil, types.ZeroCurrency, err } - rev.Revision.RevisionNumber++ - // update the revision outputs - newRevision, err := updatedRevision(rev.Revision, batchCost, types.ZeroCurrency) + // fetch prunable roots for this batch + prunable, err := prunableRootsFn(rev.ID(), batch) if err != nil { return nil, types.ZeroCurrency, err } - // build the sector roots request - revisionHash := hashRevision(newRevision) - req := &rhpv2.RPCSectorRootsRequest{ - RootOffset: uint64(offset), - NumRoots: uint64(n), - - RevisionNumber: rev.Revision.RevisionNumber, - ValidProofValues: []types.Currency{ - newRevision.ValidProofOutputs[0].Value, - newRevision.ValidProofOutputs[1].Value, - }, - MissedProofValues: []types.Currency{ - newRevision.MissedProofOutputs[0].Value, - newRevision.MissedProofOutputs[1].Value, - newRevision.MissedProofOutputs[2].Value, - }, - Signature: renterKey.SignHash(revisionHash), + // append the roots, make sure to take the offset into account + for _, index := range prunable { + indices = append(indices, index+offset) } + offset += n - // execute the sector roots RPC - var rootsResp rhpv2.RPCSectorRootsResponse - if err := t.WriteRequest(rhpv2.RPCSectorRootsID, req); err != nil { - return nil, types.ZeroCurrency, err - } else if err := t.ReadResponse(&rootsResp, maxMerkleProofResponseSize); err != nil { - return nil, types.ZeroCurrency, fmt.Errorf("couldn't read sector roots response: %w", err) - } + // update the cost + cost = cost.Add(batchCost) + } + return +} - // verify the host signature - if !rev.HostKey().VerifyHash(revisionHash, rootsResp.Signature) { - return nil, types.ZeroCurrency, errors.New("host's signature is invalid") +func (c *Client) fetchContractRoots(t *rhpv2.Transport, renterKey types.PrivateKey, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings) (roots []types.Hash256, cost types.Currency, _ error) { + numsectors := rev.NumSectors() + for offset := uint64(0); offset < numsectors; { + // calculate the batch size + n := batchSizeFetchSectors + if offset+n > numsectors { + n = numsectors - offset } - rev.Signatures[0].Signature = req.Signature[:] - rev.Signatures[1].Signature = rootsResp.Signature[:] - - // verify the proof - if uint64(len(rootsResp.SectorRoots)) != n { - return nil, types.ZeroCurrency, fmt.Errorf("couldn't verify contract roots proof, host %v, version %v, err: number of roots does not match range %d != %d (num sectors: %d rev size: %d offset: %d)", rev.HostKey(), settings.Version, len(rootsResp.SectorRoots), n, numsectors, rev.Revision.Filesize, offset) - } else if !rhpv2.VerifySectorRangeProof(rootsResp.MerkleProof, rootsResp.SectorRoots, offset, offset+n, numsectors, rev.Revision.FileMerkleRoot) { - return nil, types.ZeroCurrency, fmt.Errorf("couldn't verify contract roots proof, host %v, version %v; %w", rev.HostKey(), settings.Version, ErrInvalidMerkleProof) + + // fetch the batch + batch, batchCost, err := c.fetchContractRootsBatch(t, renterKey, rev, settings, offset, n) + if err != nil { + return nil, types.ZeroCurrency, err } - // append roots - roots = append(roots, rootsResp.SectorRoots...) + // append the roots + roots = append(roots, batch...) offset += n - // update revision - rev.Revision = newRevision + // update the cost cost = cost.Add(batchCost) } return } +func (c *Client) fetchContractRootsBatch(t *rhpv2.Transport, renterKey types.PrivateKey, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings, offset, limit uint64) ([]types.Hash256, types.Currency, error) { + // calculate the cost + cost, _ := settings.RPCSectorRootsCost(offset, limit).Total() + + // TODO: remove once host network is updated + if utils.VersionCmp(settings.Version, "1.6.0") < 0 { + // calculate the response size + proofSize := rhpv2.RangeProofSize(rev.NumSectors(), offset, offset+limit) + responseSize := (proofSize + limit) * 32 + if responseSize < minMessageSize { + responseSize = minMessageSize + } + cost = settings.BaseRPCPrice.Add(settings.DownloadBandwidthPrice.Mul64(responseSize)) + cost = cost.Mul64(2) // generous leeway + } + + // check funds + if rev.RenterFunds().Cmp(cost) < 0 { + return nil, types.ZeroCurrency, ErrInsufficientFunds + } + + // update the revision number + if rev.Revision.RevisionNumber == math.MaxUint64 { + return nil, types.ZeroCurrency, ErrContractFinalized + } + rev.Revision.RevisionNumber++ + + // update the revision outputs + newRevision, err := updatedRevision(rev.Revision, cost, types.ZeroCurrency) + if err != nil { + return nil, types.ZeroCurrency, err + } + + // build the sector roots request + revisionHash := hashRevision(newRevision) + req := &rhpv2.RPCSectorRootsRequest{ + RootOffset: offset, + NumRoots: limit, + + RevisionNumber: rev.Revision.RevisionNumber, + ValidProofValues: []types.Currency{ + newRevision.ValidProofOutputs[0].Value, + newRevision.ValidProofOutputs[1].Value, + }, + MissedProofValues: []types.Currency{ + newRevision.MissedProofOutputs[0].Value, + newRevision.MissedProofOutputs[1].Value, + newRevision.MissedProofOutputs[2].Value, + }, + Signature: renterKey.SignHash(revisionHash), + } + + // execute the sector roots RPC + var rootsResp rhpv2.RPCSectorRootsResponse + if err := t.WriteRequest(rhpv2.RPCSectorRootsID, req); err != nil { + return nil, types.ZeroCurrency, err + } else if err := t.ReadResponse(&rootsResp, maxMerkleProofResponseSize); err != nil { + return nil, types.ZeroCurrency, fmt.Errorf("couldn't read sector roots response: %w", err) + } + + // verify the host signature + if !rev.HostKey().VerifyHash(revisionHash, rootsResp.Signature) { + return nil, cost, errors.New("host's signature is invalid") + } + rev.Signatures[0].Signature = req.Signature[:] + rev.Signatures[1].Signature = rootsResp.Signature[:] + + // verify the proof + if uint64(len(rootsResp.SectorRoots)) != limit { + return nil, cost, fmt.Errorf("couldn't verify contract roots proof, host %v, version %v, err: number of roots does not match range %d != %d (num sectors: %d rev size: %d offset: %d)", rev.HostKey(), settings.Version, len(rootsResp.SectorRoots), limit, rev.NumSectors(), rev.Revision.Filesize, offset) + } else if !rhpv2.VerifySectorRangeProof(rootsResp.MerkleProof, rootsResp.SectorRoots, offset, offset+limit, rev.NumSectors(), rev.Revision.FileMerkleRoot) { + return nil, cost, fmt.Errorf("couldn't verify contract roots proof, host %v, version %v; %w", rev.HostKey(), settings.Version, ErrInvalidMerkleProof) + } + + // update revision + rev.Revision = newRevision + + return rootsResp.SectorRoots, cost, nil +} + func (w *Client) withRevisionV2(renterKey types.PrivateKey, gougingChecker gouging.Checker, t *rhpv2.Transport, fcid types.FileContractID, lastKnownRevisionNumber uint64, fn func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) error) error { // execute lock RPC var lockResp rhpv2.RPCLockResponse @@ -549,7 +583,7 @@ func (w *Client) withRevisionV2(renterKey types.PrivateKey, gougingChecker gougi // perform gouging checks on settings if breakdown := gougingChecker.CheckSettings(settings); breakdown.Gouging() { - return fmt.Errorf("failed to prune contract: %v", breakdown) + return fmt.Errorf("%w: %v", gouging.ErrHostSettingsGouging, breakdown) } return fn(t, rev, settings) diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 3b09b672e..3aaf6456e 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -100,6 +100,32 @@ func (tc *TestCluster) Accounts() []api.Account { return accounts } +func (tc *TestCluster) ContractRoots(ctx context.Context, fcid types.FileContractID) ([]types.Hash256, error) { + tc.tt.Helper() + + c, err := tc.Bus.Contract(ctx, fcid) + if err != nil { + return nil, err + } + + var h *Host + for _, host := range tc.hosts { + if host.PublicKey() == c.HostKey { + h = host + break + } + } + if h == nil { + return nil, fmt.Errorf("no host found for contract %v", c) + } + + roots, err := h.store.SectorRoots() + if err != nil { + return nil, err + } + return roots[c.ID], nil +} + func (tc *TestCluster) ShutdownAutopilot(ctx context.Context) { tc.tt.Helper() for _, fn := range tc.autopilotShutdownFns { diff --git a/internal/test/e2e/contracts_test.go b/internal/test/e2e/contracts_test.go index 82a8f043b..0304e0909 100644 --- a/internal/test/e2e/contracts_test.go +++ b/internal/test/e2e/contracts_test.go @@ -2,6 +2,7 @@ package e2e import ( "context" + "errors" "fmt" "testing" "time" @@ -45,9 +46,21 @@ func TestFormContract(t *testing.T) { _, err = b.Contract(context.Background(), contract.ID) tt.OK(err) + // fetch autopilot config + old, err := b.Autopilot(context.Background(), api.DefaultAutopilotID) + tt.OK(err) + // mine to the renew window cluster.MineToRenewWindow() + // wait until autopilot updated the current period + tt.Retry(100, 100*time.Millisecond, func() error { + if curr, _ := b.Autopilot(context.Background(), api.DefaultAutopilotID); curr.CurrentPeriod == old.CurrentPeriod { + return errors.New("autopilot didn't update the current period") + } + return nil + }) + // update autopilot config to allow for 1 contract, this won't form a // contract but will ensure we don't skip contract maintenance, which should // renew the contract we formed diff --git a/internal/test/e2e/pruning_test.go b/internal/test/e2e/pruning_test.go index 8492bf9f1..84cce4b21 100644 --- a/internal/test/e2e/pruning_test.go +++ b/internal/test/e2e/pruning_test.go @@ -3,6 +3,7 @@ package e2e import ( "bytes" "context" + "errors" "fmt" "math" "strings" @@ -12,6 +13,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/test" + "go.uber.org/zap" ) func TestHostPruning(t *testing.T) { @@ -98,7 +100,9 @@ func TestSectorPruning(t *testing.T) { } // create a cluster - cluster := newTestCluster(t, clusterOptsDefault) + opts := clusterOptsDefault + opts.logger = zap.NewNop() + cluster := newTestCluster(t, opts) defer cluster.Shutdown() // add a helper to check whether a root is in a given slice @@ -121,13 +125,13 @@ func TestSectorPruning(t *testing.T) { numObjects := 10 // add hosts - hosts := cluster.AddHostsBlocking(int(cfg.Contracts.Amount)) + hosts := cluster.AddHostsBlocking(rs.TotalShards) // wait until we have accounts cluster.WaitForAccounts() // wait until we have a contract set - cluster.WaitForContractSetContracts(cfg.Contracts.Set, int(cfg.Contracts.Amount)) + cluster.WaitForContractSetContracts(cfg.Contracts.Set, rs.TotalShards) // add several objects for i := 0; i < numObjects; i++ { @@ -147,7 +151,8 @@ func TestSectorPruning(t *testing.T) { for _, c := range contracts { dbRoots, _, err := b.ContractRoots(context.Background(), c.ID) tt.OK(err) - cRoots, err := w.RHPContractRoots(context.Background(), c.ID) + + cRoots, err := cluster.ContractRoots(context.Background(), c.ID) tt.OK(err) if len(dbRoots) != len(cRoots) { t.Fatal("unexpected number of roots", dbRoots, cRoots) @@ -163,7 +168,7 @@ func TestSectorPruning(t *testing.T) { t.Fatal("unexpected number of roots", n) } - // sleep for a bit to ensure spending records get flushed + // sleep to ensure spending records get flushed time.Sleep(3 * testBusFlushInterval) // assert prunable data is 0 @@ -180,7 +185,7 @@ func TestSectorPruning(t *testing.T) { } // assert amount of prunable data - tt.Retry(100, 100*time.Millisecond, func() error { + tt.Retry(300, 100*time.Millisecond, func() error { res, err = b.PrunableData(context.Background()) tt.OK(err) if res.TotalPrunable != uint64(math.Ceil(float64(numObjects)/2))*rs.SlabSize() { @@ -191,18 +196,21 @@ func TestSectorPruning(t *testing.T) { // prune all contracts for _, c := range contracts { - tt.OKAll(w.RHPPruneContract(context.Background(), c.ID, 0)) - } - - // assert spending records were updated and prunable data is 0 - tt.Retry(10, testBusFlushInterval, func() error { - res, err := b.PrunableData(context.Background()) + res, err := b.PruneContract(context.Background(), c.ID, 0) tt.OK(err) - if res.TotalPrunable != 0 { - return fmt.Errorf("unexpected prunable data: %d", n) + if res.Pruned == 0 { + t.Fatal("expected pruned to be non-zero") + } else if res.Remaining != 0 { + t.Fatal("expected remaining to be zero") } - return nil - }) + } + + // assert prunable data is 0 + res, err = b.PrunableData(context.Background()) + tt.OK(err) + if res.TotalPrunable != 0 { + t.Fatalf("unexpected prunable data: %d", n) + } // assert spending was updated for _, c := range contracts { @@ -222,15 +230,23 @@ func TestSectorPruning(t *testing.T) { tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename, api.DeleteObjectOptions{})) } - // sleep for a bit to ensure spending records get flushed - time.Sleep(3 * testBusFlushInterval) - // assert amount of prunable data - res, err = b.PrunableData(context.Background()) - tt.OK(err) - if res.TotalPrunable == 0 { - t.Fatal("expected prunable data") - } + tt.Retry(300, 100*time.Millisecond, func() error { + res, err = b.PrunableData(context.Background()) + tt.OK(err) + + if len(res.Contracts) != len(contracts) { + return fmt.Errorf("expected %d contracts, got %d", len(contracts), len(res.Contracts)) + } else if res.TotalPrunable == 0 { + var sizes []string + for _, c := range res.Contracts { + res, _ := b.ContractSize(context.Background(), c.ID) + sizes = append(sizes, fmt.Sprintf("c: %v size: %v prunable: %v", c.ID, res.Size, res.Prunable)) + } + return errors.New("expected prunable data, contract sizes:\n" + strings.Join(sizes, "\n")) + } + return nil + }) // update the host settings so it's gouging host := hosts[0] @@ -249,7 +265,7 @@ func TestSectorPruning(t *testing.T) { } // prune the contract and assert it threw a gouging error - _, _, err = w.RHPPruneContract(context.Background(), c.ID, 0) + _, err = b.PruneContract(context.Background(), c.ID, 0) if err == nil || !strings.Contains(err.Error(), "gouging") { t.Fatal("expected gouging error", err) } diff --git a/internal/test/e2e/uploads_test.go b/internal/test/e2e/uploads_test.go index 3f83fd7e4..c601becd6 100644 --- a/internal/test/e2e/uploads_test.go +++ b/internal/test/e2e/uploads_test.go @@ -135,7 +135,7 @@ func TestUploadingSectorsCache(t *testing.T) { } } - cr, err := w.RHPContractRoots(context.Background(), id) + cr, err := cluster.ContractRoots(context.Background(), id) tt.OK(err) expected := make(map[types.Hash256]struct{}) for _, root := range cr { diff --git a/stores/bench_test.go b/stores/bench_test.go new file mode 100644 index 000000000..60f75b52f --- /dev/null +++ b/stores/bench_test.go @@ -0,0 +1,167 @@ +package stores + +import ( + "context" + "errors" + "fmt" + "path/filepath" + "testing" + "time" + + "go.sia.tech/core/types" + isql "go.sia.tech/renterd/internal/sql" + "go.sia.tech/renterd/object" + "go.sia.tech/renterd/stores/sql" + "go.sia.tech/renterd/stores/sql/sqlite" + "go.uber.org/zap" + "lukechampine.com/frand" +) + +// BenchmarkPrunableContractRoots benchmarks diffing the roots of a contract +// with a given set of roots to determine which roots are prunable. +// +// 15.32 MB/s | M1 Max | cd32fad7 (diff ~2TiB of contract data per second) +func BenchmarkPrunableContractRoots(b *testing.B) { + // define parameters + batchSize := int64(25600) // 100GiB of contract data + contractSize := 1 << 40 // 1TiB contract + sectorSize := 4 << 20 // 4MiB sector + numSectors := contractSize / sectorSize + + // create database + db, err := newTestDB(context.Background(), b.TempDir()) + if err != nil { + b.Fatal(err) + } + + // prepare database + fcid := types.FileContractID{1} + roots, err := prepareDB(db.DB(), fcid, numSectors) + if err != nil { + b.Fatal(err) + } + + // prepare batch + frand.Shuffle(len(roots), func(i, j int) { + roots[i], roots[j] = roots[j], roots[i] + }) + batch := roots[:batchSize] + + // start benchmark + b.ResetTimer() + b.SetBytes(batchSize * 32) + for i := 0; i < b.N; i++ { + if err := db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { + indices, err := tx.PrunableContractRoots(context.Background(), fcid, batch) + if err != nil { + return err + } else if len(indices) != 0 { + return errors.New("expected no prunable roots") + } + return nil + }); err != nil { + b.Fatal(err) + } + } +} + +func prepareDB(db *isql.DB, fcid types.FileContractID, n int) (roots []types.Hash256, _ error) { + // insert host + hk := types.PublicKey{1} + res, err := db.Exec(context.Background(), ` +INSERT INTO hosts (public_key) VALUES (?)`, sql.PublicKey(hk)) + if err != nil { + return nil, err + } + hostID, err := res.LastInsertId() + if err != nil { + return nil, err + } + + // insert contract + res, err = db.Exec(context.Background(), ` +INSERT INTO contracts (host_id, fcid,start_height) VALUES (?, ?, ?)`, hostID, sql.FileContractID(fcid), 0) + if err != nil { + return nil, err + } + contractID, err := res.LastInsertId() + if err != nil { + return nil, err + } + + // insert slab + key := object.GenerateEncryptionKey() + res, err = db.Exec(context.Background(), ` +INSERT INTO slabs (created_at, `+"`key`"+`) VALUES (?, ?)`, time.Now(), sql.EncryptionKey(key)) + if err != nil { + return nil, err + } + slabID, err := res.LastInsertId() + if err != nil { + return nil, err + } + + // insert sectors + insertSectorStmt, err := db.Prepare(context.Background(), ` +INSERT INTO sectors (db_slab_id, slab_index, latest_host, root) VALUES (?, ?, ?, ?) RETURNING id`) + if err != nil { + return nil, fmt.Errorf("failed to prepare statement to insert sector: %w", err) + } + defer insertSectorStmt.Close() + var sectorIDs []int64 + for i := 0; i < n; i++ { + var sectorID int64 + roots = append(roots, frand.Entropy256()) + err := insertSectorStmt.QueryRow(context.Background(), slabID, i, sql.PublicKey(hk), sql.Hash256(roots[i])).Scan(§orID) + if err != nil { + return nil, fmt.Errorf("failed to insert sector: %w", err) + } + sectorIDs = append(sectorIDs, sectorID) + } + + // insert contract sectors + insertLinkStmt, err := db.Prepare(context.Background(), ` +INSERT INTO contract_sectors (db_contract_id, db_sector_id) VALUES (?, ?)`) + if err != nil { + return nil, fmt.Errorf("failed to prepare statement to insert contract sectors: %w", err) + } + defer insertLinkStmt.Close() + for _, sectorID := range sectorIDs { + if _, err := insertLinkStmt.Exec(context.Background(), contractID, sectorID); err != nil { + return nil, fmt.Errorf("failed to insert contract sector: %w", err) + } + } + + // sanity check + var cnt int + err = db.QueryRow(context.Background(), ` +SELECT COUNT(s.root) +FROM contracts c +INNER JOIN contract_sectors cs ON cs.db_contract_id = c.id +INNER JOIN sectors s ON cs.db_sector_id = s.id +WHERE c.fcid = ?`, sql.FileContractID(fcid)).Scan(&cnt) + if cnt != n { + return nil, fmt.Errorf("expected %v sectors, got %v", n, cnt) + } + + return +} + +func newTestDB(ctx context.Context, dir string) (*sqlite.MainDatabase, error) { + db, err := sqlite.Open(filepath.Join(dir, "db.sqlite")) + if err != nil { + return nil, err + } + + dbMain, err := sqlite.NewMainDatabase(db, zap.NewNop(), 100*time.Millisecond, 100*time.Millisecond) + if err != nil { + return nil, err + } + + err = dbMain.Migrate(ctx) + if err != nil { + return nil, err + } + + return dbMain, nil +} diff --git a/stores/metadata.go b/stores/metadata.go index fe26bc29e..1f4755bd7 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -669,6 +669,14 @@ func (s *SQLStore) ObjectsBySlabKey(ctx context.Context, bucket string, slabKey return } +func (s *SQLStore) PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) { + err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { + indices, err = tx.PrunableContractRoots(ctx, fcid, roots) + return err + }) + return +} + // MarkPackedSlabsUploaded marks the given slabs as uploaded and deletes them // from the buffer. func (s *SQLStore) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) error { diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 6d0639e5d..b3529eea6 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -126,6 +126,82 @@ SET health = ( return err } +func TestPrunableContractRoots(t *testing.T) { + // create a SQL store + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // add a contract + hks, err := ss.addTestHosts(1) + if err != nil { + t.Fatal(err) + } + fcids, _, err := ss.addTestContracts(hks) + if err != nil { + t.Fatal(err) + } + + // add 4 objects + for i := 1; i <= 4; i++ { + if _, err := ss.addTestObject(fmt.Sprintf("%s_%d", t.Name(), i), object.Object{ + Key: object.GenerateEncryptionKey(), + Slabs: []object.SlabSlice{ + { + Slab: object.Slab{ + Key: object.GenerateEncryptionKey(), + MinShards: 1, + Shards: newTestShards(hks[0], fcids[0], types.Hash256{byte(i)}), + }, + }, + }, + }); err != nil { + t.Fatal(err) + } + } + + // assert there's 4 roots in the database + roots, err := ss.ContractRoots(context.Background(), fcids[0]) + if err != nil { + t.Fatal(err) + } else if len(roots) != 4 { + t.Fatal("unexpected number of roots", len(roots)) + } + + // diff the roots - should be empty + indices, err := ss.PrunableContractRoots(context.Background(), fcids[0], roots) + if err != nil { + t.Fatal(err) + } else if len(indices) != 0 { + t.Fatal("unexpected number of indices", len(indices)) + } + + // delete every other object + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, fmt.Sprintf("%s_1", t.Name())); err != nil { + t.Fatal(err) + } + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, fmt.Sprintf("%s_3", t.Name())); err != nil { + t.Fatal(err) + } + + // assert there's 2 roots left + updated, err := ss.ContractRoots(context.Background(), fcids[0]) + if err != nil { + t.Fatal(err) + } else if len(updated) != 2 { + t.Fatal("unexpected number of roots", len(updated)) + } + + // diff the roots again, should return indices 0 and 2 + indices, err = ss.PrunableContractRoots(context.Background(), fcids[0], roots) + if err != nil { + t.Fatal(err) + } else if len(indices) != 2 { + t.Fatal("unexpected number of indices", len(indices)) + } else if indices[0] != 0 || indices[1] != 2 { + t.Fatal("unexpected indices", indices) + } +} + // TestObjectBasic tests the hydration of raw objects works when we fetch // objects from the metadata store. func TestObjectBasic(t *testing.T) { diff --git a/stores/sql/database.go b/stores/sql/database.go index 3e1917502..bc08865da 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -243,6 +243,10 @@ type ( // ProcessChainUpdate applies the given chain update to the database. ProcessChainUpdate(ctx context.Context, applyFn func(ChainUpdateTx) error) error + // PrunableContractRoots returns the indices of roots that are not in + // the contract. + PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) + // PruneEmptydirs prunes any directories that are empty. PruneEmptydirs(ctx context.Context) error diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index de8b97bfa..ef671baef 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -26,6 +26,10 @@ import ( "go.uber.org/zap" ) +const ( + batchSizeInsertSectors = 500 +) + type ( MainDatabase struct { db *sql.DB @@ -566,6 +570,62 @@ func (tx *MainDatabaseTx) ProcessChainUpdate(ctx context.Context, fn func(ssql.C }) } +func (tx *MainDatabaseTx) PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) { + // build tmp table name + tmpTable := strings.ReplaceAll(fmt.Sprintf("tmp_host_roots_%s", fcid.String()[:8]), ":", "_") + + // create temporary table + _, err = tx.Exec(ctx, fmt.Sprintf(` +DROP TABLE IF EXISTS %s; +CREATE TEMPORARY TABLE %s (idx INT, root varbinary(32)) ENGINE=MEMORY; +CREATE INDEX %s_idx ON %s (root(32));`, tmpTable, tmpTable, tmpTable, tmpTable)) + if err != nil { + return nil, fmt.Errorf("failed to create temporary table: %w", err) + } + + // defer removal + defer func() { + if _, err := tx.Exec(ctx, fmt.Sprintf(`DROP TABLE %s;`, tmpTable)); err != nil { + tx.log.Warnw("failed to drop temporary table", zap.Error(err)) + } + }() + + // insert roots in batches + for i := 0; i < len(roots); i += batchSizeInsertSectors { + end := i + batchSizeInsertSectors + if end > len(roots) { + end = len(roots) + } + + var params []interface{} + for i, r := range roots[i:end] { + params = append(params, uint64(i), ssql.Hash256(r)) + } + + _, err = tx.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (idx, root) VALUES %s`, tmpTable, strings.TrimSuffix(strings.Repeat("(?, ?), ", end-i), ", ")), params...) + if err != nil { + return nil, fmt.Errorf("failed to insert into roots into temporary table: %w", err) + } + } + + // execute query + rows, err := tx.Query(ctx, fmt.Sprintf(`SELECT idx FROM %s tmp LEFT JOIN sectors s ON s.root = tmp.root WHERE s.root IS NULL`, tmpTable)) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract roots: %w", err) + } + defer rows.Close() + + // fetch indices + for rows.Next() { + var idx uint64 + if err := rows.Scan(&idx); err != nil { + return nil, fmt.Errorf("failed to scan root index: %w", err) + } + indices = append(indices, idx) + } + return +} + func (tx *MainDatabaseTx) PruneEmptydirs(ctx context.Context) error { stmt, err := tx.Prepare(ctx, ` DELETE diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 739fb47f4..744617565 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -25,6 +25,10 @@ import ( "go.uber.org/zap" ) +const ( + batchSizeInsertSectors = 500 +) + type ( MainDatabase struct { db *sql.DB @@ -447,7 +451,6 @@ func (tx *MainDatabaseTx) InvalidateSlabHealthByFCID(ctx context.Context, fcids ) `, strings.Repeat("?, ", len(fcids)-1)+"?"), args...) if err != nil { - fmt.Println(strings.Repeat("?, ", len(fcids)-1) + "?") return 0, err } return res.RowsAffected() @@ -563,6 +566,76 @@ func (tx *MainDatabaseTx) ProcessChainUpdate(ctx context.Context, fn func(ssql.C }) } +func (tx *MainDatabaseTx) PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) { + // build tmp table name + tmpTable := strings.ReplaceAll(fmt.Sprintf("tmp_host_roots_%s", fcid.String()[:8]), ":", "_") + + // create temporary table + _, err = tx.Exec(ctx, fmt.Sprintf(` +DROP TABLE IF EXISTS %s; +CREATE TEMPORARY TABLE %s (idx INT, root blob); +CREATE INDEX %s_idx ON %s (root);`, tmpTable, tmpTable, tmpTable, tmpTable)) + if err != nil { + return nil, fmt.Errorf("failed to create temporary table: %w", err) + } + + // defer removal + defer func() { + if _, err := tx.Exec(ctx, fmt.Sprintf(`DROP TABLE %s;`, tmpTable)); err != nil { + tx.log.Warnw("failed to drop temporary table", zap.Error(err)) + } + }() + + // prepare insert statement + insertStmt, err := tx.Prepare(ctx, fmt.Sprintf(`INSERT INTO %s (idx, root) VALUES %s`, tmpTable, strings.TrimSuffix(strings.Repeat("(?, ?), ", batchSizeInsertSectors), ", "))) + if err != nil { + return nil, fmt.Errorf("failed to prepare statement to insert contract roots: %w", err) + } + defer insertStmt.Close() + + // insert roots in batches + for i := 0; i < len(roots); i += batchSizeInsertSectors { + end := i + batchSizeInsertSectors + if end > len(roots) { + end = len(roots) + } + + var params []interface{} + for i, r := range roots[i:end] { + params = append(params, uint64(i), ssql.Hash256(r)) + } + + if len(params) == batchSizeInsertSectors { + _, err := insertStmt.Exec(ctx, params...) + if err != nil { + return nil, fmt.Errorf("failed to insert into roots into temporary table: %w", err) + } + } else { + _, err = tx.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (idx, root) VALUES %s`, tmpTable, strings.TrimSuffix(strings.Repeat("(?, ?), ", end-i), ", ")), params...) + if err != nil { + return nil, fmt.Errorf("failed to insert into roots into temporary table: %w", err) + } + } + } + + // execute query + rows, err := tx.Query(ctx, fmt.Sprintf(`SELECT idx FROM %s tmp LEFT JOIN sectors s ON s.root = tmp.root WHERE s.root IS NULL`, tmpTable)) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract roots: %w", err) + } + defer rows.Close() + + // fetch indices + for rows.Next() { + var idx uint64 + if err := rows.Scan(&idx); err != nil { + return nil, fmt.Errorf("failed to scan root index: %w", err) + } + indices = append(indices, idx) + } + return +} + func (tx *MainDatabaseTx) PruneEmptydirs(ctx context.Context) error { stmt, err := tx.Prepare(ctx, ` DELETE diff --git a/worker/client/rhp.go b/worker/client/rhp.go index 8352f54ad..bb923b705 100644 --- a/worker/client/rhp.go +++ b/worker/client/rhp.go @@ -2,7 +2,6 @@ package client import ( "context" - "errors" "fmt" "time" @@ -16,12 +15,6 @@ func (c *Client) RHPBroadcast(ctx context.Context, contractID types.FileContract return } -// RHPContractRoots fetches the roots of the contract with given id. -func (c *Client) RHPContractRoots(ctx context.Context, contractID types.FileContractID) (roots []types.Hash256, err error) { - err = c.c.WithContext(ctx).GET(fmt.Sprintf("/rhp/contract/%s/roots", contractID), &roots) - return -} - // RHPPriceTable fetches a price table for a host. func (c *Client) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (pt api.HostPriceTable, err error) { req := api.RHPPriceTableRequest{ @@ -33,22 +26,6 @@ func (c *Client) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, sia return } -// RHPPruneContract prunes deleted sectors from the contract with given id. -func (c *Client) RHPPruneContract(ctx context.Context, contractID types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) { - var res api.RHPPruneContractResponse - if err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/prune", contractID), api.RHPPruneContractRequest{ - Timeout: api.DurationMS(timeout), - }, &res); err != nil { - return - } else if res.Error != "" { - err = errors.New(res.Error) - } - - pruned = res.Pruned - remaining = res.Remaining - return -} - // RHPScan scans a host, returning its current settings. func (c *Client) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (resp api.RHPScanResponse, err error) { err = c.c.WithContext(ctx).POST("/rhp/scan", api.RHPScanRequest{ diff --git a/worker/host.go b/worker/host.go index 40695f8b0..2ecd95233 100644 --- a/worker/host.go +++ b/worker/host.go @@ -110,6 +110,7 @@ func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, secto }); err != nil { return err } + // upload cost, err := h.client.AppendSector(ctx, sectorRoot, sector, &rev, h.hk, h.siamuxAddr, h.acc.ID(), pt, h.renterKey) if err != nil { diff --git a/worker/worker.go b/worker/worker.go index be27f2cc6..76a8a8e31 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -416,119 +416,6 @@ func (w *Worker) rhpBroadcastHandler(jc jape.Context) { } } -func (w *Worker) rhpPruneContractHandlerPOST(jc jape.Context) { - ctx := jc.Request.Context() - - // decode fcid - var fcid types.FileContractID - if jc.DecodeParam("id", &fcid) != nil { - return - } - - // decode timeout - var pcr api.RHPPruneContractRequest - if jc.Decode(&pcr) != nil { - return - } - - // apply timeout - if pcr.Timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(pcr.Timeout)) - defer cancel() - } - - // fetch the contract from the bus - contract, err := w.bus.Contract(ctx, fcid) - if errors.Is(err, api.ErrContractNotFound) { - jc.Error(err, http.StatusNotFound) - return - } else if jc.Check("couldn't fetch contract", err) != nil { - return - } - - // return early if there's no data to prune - size, err := w.bus.ContractSize(ctx, fcid) - if jc.Check("couldn't fetch contract size", err) != nil { - return - } else if size.Prunable == 0 { - jc.Encode(api.RHPPruneContractResponse{}) - return - } - - // fetch gouging params - gp, err := w.bus.GougingParams(ctx) - if jc.Check("could not fetch gouging parameters", err) != nil { - return - } - gc := newGougingChecker(gp.GougingSettings, gp.ConsensusState, gp.TransactionFee, false) - - // prune the contract - var pruned, remaining uint64 - var rev *types.FileContractRevision - var cost types.Currency - err = w.withContractLock(ctx, contract.ID, lockingPriorityPruning, func() error { - stored, pending, err := w.bus.ContractRoots(ctx, contract.ID) - if err != nil { - return fmt.Errorf("failed to fetch contract roots; %w", err) - } - rev, pruned, remaining, cost, err = w.rhp2Client.PruneContract(ctx, w.deriveRenterKey(contract.HostKey), gc, contract.HostIP, contract.HostKey, fcid, contract.RevisionNumber, append(stored, pending...)) - return err - }) - if rev != nil { - w.contractSpendingRecorder.Record(*rev, api.ContractSpending{Deletions: cost}) - } - if err != nil && !errors.Is(err, rhp2.ErrNoSectorsToPrune) && pruned == 0 { - err = fmt.Errorf("failed to prune contract %v; %w", fcid, err) - jc.Error(err, http.StatusInternalServerError) - return - } - - res := api.RHPPruneContractResponse{ - Pruned: pruned, - Remaining: remaining, - } - if err != nil { - res.Error = err.Error() - } - jc.Encode(res) -} - -func (w *Worker) rhpContractRootsHandlerGET(jc jape.Context) { - ctx := jc.Request.Context() - - // decode fcid - var id types.FileContractID - if jc.DecodeParam("id", &id) != nil { - return - } - - // fetch the contract from the bus - c, err := w.bus.Contract(ctx, id) - if errors.Is(err, api.ErrContractNotFound) { - jc.Error(err, http.StatusNotFound) - return - } else if jc.Check("couldn't fetch contract", err) != nil { - return - } - - // fetch gouging params - gp, err := w.bus.GougingParams(ctx) - if jc.Check("couldn't fetch gouging parameters from bus", err) != nil { - return - } - gc := newGougingChecker(gp.GougingSettings, gp.ConsensusState, gp.TransactionFee, false) - - // fetch the roots from the host - roots, rev, cost, err := w.rhp2Client.ContractRoots(ctx, w.deriveRenterKey(c.HostKey), gc, c.HostIP, c.HostKey, id, c.RevisionNumber) - if jc.Check("couldn't fetch contract roots from host", err) != nil { - return - } else if rev != nil { - w.contractSpendingRecorder.Record(*rev, api.ContractSpending{SectorRoots: cost}) - } - jc.Encode(roots) -} - func (w *Worker) slabMigrateHandler(jc jape.Context) { ctx := jc.Request.Context() @@ -1163,8 +1050,6 @@ func (w *Worker) Handler() http.Handler { "GET /rhp/contracts": w.rhpContractsHandlerGET, "POST /rhp/contract/:id/broadcast": w.rhpBroadcastHandler, - "POST /rhp/contract/:id/prune": w.rhpPruneContractHandlerPOST, - "GET /rhp/contract/:id/roots": w.rhpContractRootsHandlerGET, "POST /rhp/scan": w.rhpScanHandler, "POST /rhp/pricetable": w.rhpPriceTableHandler,