From 53d9f46a595f36ba0e176ae95774a3aa2fa5a70d Mon Sep 17 00:00:00 2001 From: Peter-Jan Brone Date: Wed, 4 Sep 2024 10:46:37 +0200 Subject: [PATCH] Add contract pruning to the bus (#1472) This PR moves contract pruning over to the bus. I kept the `/rhp/contract/:id/roots` endpoint on the worker. This PR also uses a temporary table to store the roots we fetch from the host, this allows us to calculate which indices are prunable without fetching the contract roots from the database. This saves on network traffic but also turned out to make contract pruning about 2-3 times as fast. Not sure if this performance boost is linear though because the slow part might be the deletions.. pruning ~200MiB from 1.5 TiB contracts takes ~11s on average as opposed to ~25s when we fetch roots and do the diff manually. --- api/contract.go | 20 +++ api/worker.go | 18 +-- autopilot/autopilot.go | 6 +- autopilot/contract_pruning.go | 100 +++++++----- autopilot/workerpool.go | 1 - bus/bus.go | 11 +- bus/client/contracts.go | 6 + bus/routes.go | 112 +++++++++++++ internal/gouging/gouging.go | 10 +- internal/rhp/v2/rhp.go | 234 ++++++++++++++++------------ internal/test/e2e/cluster.go | 26 ++++ internal/test/e2e/contracts_test.go | 13 ++ internal/test/e2e/pruning_test.go | 66 +++++--- internal/test/e2e/uploads_test.go | 2 +- stores/bench_test.go | 167 ++++++++++++++++++++ stores/metadata.go | 8 + stores/metadata_test.go | 76 +++++++++ stores/sql/database.go | 4 + stores/sql/mysql/main.go | 60 +++++++ stores/sql/sqlite/main.go | 75 ++++++++- worker/client/rhp.go | 23 --- worker/host.go | 1 + worker/worker.go | 115 -------------- 23 files changed, 826 insertions(+), 328 deletions(-) create mode 100644 stores/bench_test.go diff --git a/api/contract.go b/api/contract.go index ea90c722f..19da16a2d 100644 --- a/api/contract.go +++ b/api/contract.go @@ -161,6 +161,21 @@ type ( LockID uint64 `json:"lockID"` } + // ContractPruneRequest is the request type for the /contract/:id/prune + // endpoint. + ContractPruneRequest struct { + Timeout DurationMS `json:"timeout"` + } + + // ContractPruneResponse is the response type for the /contract/:id/prune + // endpoint. + ContractPruneResponse struct { + ContractSize uint64 `json:"size"` + Pruned uint64 `json:"pruned"` + Remaining uint64 `json:"remaining"` + Error string `json:"error,omitempty"` + } + // ContractAcquireRequest is the request type for the /contract/:id/release // endpoint. ContractReleaseRequest struct { @@ -211,6 +226,11 @@ type ( } ) +// Total returns the total cost of the contract spending. +func (x ContractSpending) Total() types.Currency { + return x.Uploads.Add(x.Downloads).Add(x.FundAccount).Add(x.Deletions).Add(x.SectorRoots) +} + // Add returns the sum of the current and given contract spending. func (x ContractSpending) Add(y ContractSpending) (z ContractSpending) { z.Uploads = x.Uploads.Add(y.Uploads) diff --git a/api/worker.go b/api/worker.go index ab4aec5dd..d1c18b61b 100644 --- a/api/worker.go +++ b/api/worker.go @@ -87,18 +87,12 @@ type ( TransactionSet []types.Transaction `json:"transactionSet"` } - // RHPPruneContractRequest is the request type for the /rhp/contract/:id/prune - // endpoint. - RHPPruneContractRequest struct { - Timeout DurationMS `json:"timeout"` - } - - // RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune - // endpoint. - RHPPruneContractResponse struct { - Pruned uint64 `json:"pruned"` - Remaining uint64 `json:"remaining"` - Error string `json:"error,omitempty"` + // RHPFundRequest is the request type for the /rhp/fund endpoint. + RHPFundRequest struct { + ContractID types.FileContractID `json:"contractID"` + HostKey types.PublicKey `json:"hostKey"` + SiamuxAddr string `json:"siamuxAddr"` + Balance types.Currency `json:"balance"` } // RHPPriceTableRequest is the request type for the /rhp/pricetable endpoint. diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index c54fefa6c..f7d06d504 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -48,6 +48,7 @@ type Bus interface { RenewContract(ctx context.Context, fcid types.FileContractID, endHeight uint64, renterFunds, minNewCollateral, maxFundAmount types.Currency, expectedNewStorage uint64) (api.ContractMetadata, error) SetContractSet(ctx context.Context, set string, contracts []types.FileContractID) error PrunableData(ctx context.Context) (prunableData api.ContractsPrunableDataResponse, err error) + PruneContract(ctx context.Context, id types.FileContractID, timeout time.Duration) (api.ContractPruneResponse, error) // hostdb Host(ctx context.Context, hostKey types.PublicKey) (api.Host, error) @@ -322,7 +323,7 @@ func (ap *Autopilot) Run() { // pruning if autopilot.Config.Contracts.Prune { - ap.tryPerformPruning(ap.workers) + ap.tryPerformPruning() } else { ap.logger.Info("pruning disabled") } @@ -670,6 +671,9 @@ func (ap *Autopilot) configHandlerPUT(jc jape.Context) { autopilot, err := ap.bus.Autopilot(jc.Request.Context(), ap.id) if utils.IsErr(err, api.ErrAutopilotNotFound) { autopilot = api.Autopilot{ID: ap.id, Config: cfg} + } else if err != nil { + jc.Error(err, http.StatusInternalServerError) + return } else { if autopilot.Config.Contracts.Set != cfg.Contracts.Set { contractSetChanged = true diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index 7822fb326..0189430ec 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -105,15 +105,17 @@ func (ap *Autopilot) fetchHostContract(fcid types.FileContractID) (host api.Host return } -func (ap *Autopilot) performContractPruning(wp *workerPool) { - ap.logger.Info("performing contract pruning") +func (ap *Autopilot) performContractPruning() { + log := ap.logger.Named("performContractPruning") + log.Info("performing contract pruning") // fetch prunable contracts prunable, err := ap.fetchPrunableContracts() if err != nil { - ap.logger.Error(err) + log.Error(err) return } + log.Debugf("found %d prunable contracts", len(prunable)) // dismiss alerts for contracts that are no longer prunable ap.dismissPruneAlerts(prunable) @@ -129,39 +131,70 @@ func (ap *Autopilot) performContractPruning(wp *workerPool) { // fetch host h, _, err := ap.fetchHostContract(contract.ID) if utils.IsErr(err, api.ErrContractNotFound) { + log.Debugw("contract got archived", "contract", contract.ID) continue // contract got archived } else if err != nil { - ap.logger.Errorf("failed to fetch host for contract '%v', err %v", contract.ID, err) + log.Errorw("failed to fetch host", zap.Error(err), "contract", contract.ID) continue } - // prune contract using a random worker - wp.withWorker(func(w Worker) { - total += ap.pruneContract(w, contract.ID, h.PublicKey, h.Settings.Version, h.Settings.Release) - }) + // prune contract + n, err := ap.pruneContract(ap.shutdownCtx, contract.ID, h.PublicKey, h.Settings.Version, h.Settings.Release, log) + if err != nil { + log.Errorw("failed to prune contract", zap.Error(err), "contract", contract.ID) + continue + } + + // handle alerts + ap.mu.Lock() + alertID := alerts.IDForContract(alertPruningID, contract.ID) + if shouldSendPruneAlert(err, h.Settings.Version, h.Settings.Release) { + ap.RegisterAlert(ap.shutdownCtx, newContractPruningFailedAlert(h.PublicKey, h.Settings.Version, h.Settings.Release, contract.ID, err)) + ap.pruningAlertIDs[contract.ID] = alertID // store id to dismiss stale alerts + } else { + ap.DismissAlert(ap.shutdownCtx, alertID) + delete(ap.pruningAlertIDs, contract.ID) + } + ap.mu.Unlock() + + // adjust total + total += n } // log total pruned - ap.logger.Info(fmt.Sprintf("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(prunable))) + log.Info(fmt.Sprintf("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(prunable))) } -func (ap *Autopilot) pruneContract(w Worker, fcid types.FileContractID, hk types.PublicKey, hostVersion, hostRelease string) uint64 { - // use a sane timeout - ctx, cancel := context.WithTimeout(ap.shutdownCtx, timeoutPruneContract+5*time.Minute) - defer cancel() +func (ap *Autopilot) pruneContract(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, hostVersion, hostRelease string, logger *zap.SugaredLogger) (uint64, error) { + // define logger + log := logger.With( + zap.Stringer("contract", fcid), + zap.Stringer("host", hk), + zap.String("version", hostVersion), + zap.String("release", hostRelease)) // prune the contract start := time.Now() - pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract) - duration := time.Since(start) + res, err := ap.bus.PruneContract(ctx, fcid, timeoutPruneContract) + if err != nil { + return 0, err + } + + // decorate logger + log = log.With( + zap.String("pruned", utils.HumanReadableSize(int(res.Pruned))), + zap.String("remaining", utils.HumanReadableSize(int(res.Remaining))), + zap.String("size", utils.HumanReadableSize(int(res.ContractSize))), + zap.Duration("elapsed", time.Since(start)), + ) // ignore slow pruning until host network is 1.6.0+ - if utils.IsErr(err, context.DeadlineExceeded) && pruned > 0 { - err = nil + if res.Error != "" && utils.IsErr(errors.New(res.Error), context.DeadlineExceeded) && res.Pruned > 0 { + res.Error = "" } // handle metrics - if err == nil || pruned > 0 { + if res.Pruned > 0 { if err := ap.bus.RecordContractPruneMetric(ctx, api.ContractPruneMetric{ Timestamp: api.TimeRFC3339(start), @@ -169,40 +202,25 @@ func (ap *Autopilot) pruneContract(w Worker, fcid types.FileContractID, hk types HostKey: hk, HostVersion: hostVersion, - Pruned: pruned, - Remaining: remaining, - Duration: duration, + Pruned: res.Pruned, + Remaining: res.Remaining, + Duration: time.Since(start), }); err != nil { ap.logger.Error(err) } } // handle logs - log := ap.logger.With("contract", fcid, "host", hk, "version", hostVersion, "release", hostRelease, "pruned", pruned, "remaining", remaining, "elapsed", duration) - if err != nil && pruned > 0 { - log.With(zap.Error(err)).Error("unexpected error interrupted pruning") - } else if err != nil { - log.With(zap.Error(err)).Error("failed to prune contract") + if res.Error != "" { + log.Errorw("unexpected error interrupted pruning", zap.Error(errors.New(res.Error))) } else { log.Info("successfully pruned contract") } - // handle alerts - ap.mu.Lock() - defer ap.mu.Unlock() - alertID := alerts.IDForContract(alertPruningID, fcid) - if shouldSendPruneAlert(err, hostVersion, hostRelease) { - ap.RegisterAlert(ctx, newContractPruningFailedAlert(hk, hostVersion, hostRelease, fcid, err)) - ap.pruningAlertIDs[fcid] = alertID // store id to dismiss stale alerts - } else { - ap.DismissAlert(ctx, alertID) - delete(ap.pruningAlertIDs, fcid) - } - - return pruned + return res.Pruned, nil } -func (ap *Autopilot) tryPerformPruning(wp *workerPool) { +func (ap *Autopilot) tryPerformPruning() { ap.mu.Lock() if ap.pruning || ap.isStopped() { ap.mu.Unlock() @@ -215,7 +233,7 @@ func (ap *Autopilot) tryPerformPruning(wp *workerPool) { ap.wg.Add(1) go func() { defer ap.wg.Done() - ap.performContractPruning(wp) + ap.performContractPruning() ap.mu.Lock() ap.pruning = false ap.mu.Unlock() diff --git a/autopilot/workerpool.go b/autopilot/workerpool.go index 7220f7f4a..11bcfa09b 100644 --- a/autopilot/workerpool.go +++ b/autopilot/workerpool.go @@ -20,7 +20,6 @@ type Worker interface { RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (api.HostPriceTable, error) - RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) } diff --git a/bus/bus.go b/bus/bus.go index d4a43a8ef..bc0ff9804 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -43,9 +43,12 @@ const ( defaultWalletRecordMetricInterval = 5 * time.Minute defaultPinUpdateInterval = 5 * time.Minute defaultPinRateWindow = 6 * time.Hour - lockingPriorityFunding = 40 - lockingPriorityRenew = 80 - stdTxnSize = 1200 // bytes + + lockingPriorityPruning = 20 + lockingPriorityFunding = 40 + lockingPriorityRenew = 80 + + stdTxnSize = 1200 // bytes ) // Client re-exports the client from the client package. @@ -223,6 +226,7 @@ type ( ContractRoots(ctx context.Context, id types.FileContractID) ([]types.Hash256, error) ContractSizes(ctx context.Context) (map[types.FileContractID]api.ContractSize, error) ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error) + PrunableContractRoots(ctx context.Context, id types.FileContractID, roots []types.Hash256) ([]uint64, error) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error) @@ -421,6 +425,7 @@ func (b *Bus) Handler() http.Handler { "POST /contract/:id/acquire": b.contractAcquireHandlerPOST, "GET /contract/:id/ancestors": b.contractIDAncestorsHandler, "POST /contract/:id/keepalive": b.contractKeepaliveHandlerPOST, + "POST /contract/:id/prune": b.contractPruneHandlerPOST, "POST /contract/:id/renew": b.contractIDRenewHandlerPOST, "POST /contract/:id/renewed": b.contractIDRenewedHandlerPOST, "POST /contract/:id/release": b.contractReleaseHandlerPOST, diff --git a/bus/client/contracts.go b/bus/client/contracts.go index a831cb8e7..6929c8af9 100644 --- a/bus/client/contracts.go +++ b/bus/client/contracts.go @@ -160,6 +160,12 @@ func (c *Client) PrunableData(ctx context.Context) (prunableData api.ContractsPr return } +// PruneContract prunes the given contract. +func (c *Client) PruneContract(ctx context.Context, contractID types.FileContractID, timeout time.Duration) (res api.ContractPruneResponse, err error) { + err = c.c.WithContext(ctx).POST(fmt.Sprintf("/contract/%s/prune", contractID), api.ContractPruneRequest{Timeout: api.DurationMS(timeout)}, &res) + return +} + // RenewContract renews an existing contract with a host and adds it to the bus. func (c *Client) RenewContract(ctx context.Context, contractID types.FileContractID, endHeight uint64, renterFunds, minNewCollateral, maxFundAmount types.Currency, expectedStorage uint64) (renewal api.ContractMetadata, err error) { req := api.ContractRenewRequest{ diff --git a/bus/routes.go b/bus/routes.go index 92fe0843b..26a530ef7 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -19,6 +19,7 @@ import ( ibus "go.sia.tech/renterd/internal/bus" "go.sia.tech/renterd/internal/gouging" + rhp2 "go.sia.tech/renterd/internal/rhp/v2" "go.sia.tech/core/gateway" "go.sia.tech/core/types" @@ -863,6 +864,117 @@ func (b *Bus) contractKeepaliveHandlerPOST(jc jape.Context) { } } +func (b *Bus) contractPruneHandlerPOST(jc jape.Context) { + ctx := jc.Request.Context() + + // decode fcid + var fcid types.FileContractID + if jc.DecodeParam("id", &fcid) != nil { + return + } + + // decode timeout + var req api.ContractPruneRequest + if jc.Decode(&req) != nil { + return + } + + // create gouging checker + gp, err := b.gougingParams(ctx) + if jc.Check("couldn't fetch gouging parameters", err) != nil { + return + } + gc := gouging.NewChecker(gp.GougingSettings, gp.ConsensusState, gp.TransactionFee, nil, nil) + + // apply timeout + pruneCtx := ctx + if req.Timeout > 0 { + var cancel context.CancelFunc + pruneCtx, cancel = context.WithTimeout(ctx, time.Duration(req.Timeout)) + defer cancel() + } + + // acquire contract lock indefinitely and defer the release + lockID, err := b.contractLocker.Acquire(pruneCtx, lockingPriorityPruning, fcid, time.Duration(math.MaxInt64)) + if jc.Check("couldn't acquire contract lock", err) != nil { + return + } + defer func() { + if err := b.contractLocker.Release(fcid, lockID); err != nil { + b.logger.Error("failed to release contract lock", zap.Error(err)) + } + }() + + // fetch the contract from the bus + c, err := b.ms.Contract(ctx, fcid) + if errors.Is(err, api.ErrContractNotFound) { + jc.Error(err, http.StatusNotFound) + return + } else if jc.Check("couldn't fetch contract", err) != nil { + return + } + + // build map of uploading sectors + pending := make(map[types.Hash256]struct{}) + for _, root := range b.sectors.Sectors(fcid) { + pending[root] = struct{}{} + } + + // prune the contract + rev, spending, pruned, remaining, err := b.rhp2.PruneContract(pruneCtx, b.deriveRenterKey(c.HostKey), gc, c.HostIP, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) { + indices, err := b.ms.PrunableContractRoots(ctx, fcid, roots) + if err != nil { + return nil, err + } else if len(indices) > len(roots) { + return nil, fmt.Errorf("selected %d prunable roots but only %d were provided", len(indices), len(roots)) + } + + filtered := indices[:0] + for _, index := range indices { + _, ok := pending[roots[index]] + if !ok { + filtered = append(filtered, index) + } + } + indices = filtered + return indices, nil + }) + + if errors.Is(err, rhp2.ErrNoSectorsToPrune) { + err = nil // ignore error + } else if !errors.Is(err, context.Canceled) { + if jc.Check("couldn't prune contract", err) != nil { + return + } + } + + // record spending + if !spending.Total().IsZero() { + b.ms.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{ + { + ContractSpending: spending, + ContractID: fcid, + RevisionNumber: rev.RevisionNumber, + Size: rev.Filesize, + + MissedHostPayout: rev.MissedHostPayout(), + ValidRenterPayout: rev.ValidRenterPayout(), + }, + }) + } + + // return response + res := api.ContractPruneResponse{ + ContractSize: rev.Filesize, + Pruned: pruned, + Remaining: remaining, + } + if err != nil { + res.Error = err.Error() + } + jc.Encode(res) +} + func (b *Bus) contractsPrunableDataHandlerGET(jc jape.Context) { sizes, err := b.ms.ContractSizes(jc.Request.Context()) if jc.Check("failed to fetch contract sizes", err) != nil { diff --git a/internal/gouging/gouging.go b/internal/gouging/gouging.go index 8e729247d..aadfdd57f 100644 --- a/internal/gouging/gouging.go +++ b/internal/gouging/gouging.go @@ -32,7 +32,7 @@ const ( ) var ( - errHostSettingsGouging = errors.New("host settings gouging detected") + ErrHostSettingsGouging = errors.New("host settings gouging detected") ErrPriceTableGouging = errors.New("price table gouging detected") ) @@ -243,7 +243,7 @@ func checkContractGougingRHPv2(period, renewWindow *uint64, hs *rhpv2.HostSettin err = checkContractGouging(*period, *renewWindow, hs.MaxDuration, hs.WindowSize) if err != nil { - err = fmt.Errorf("%w: %v", errHostSettingsGouging, err) + err = fmt.Errorf("%w: %v", ErrHostSettingsGouging, err) } return } @@ -290,14 +290,14 @@ func checkPruneGougingRHPv2(gs api.GougingSettings, hs *rhpv2.HostSettings) erro hs.UploadBandwidthPrice, ) if overflow { - return fmt.Errorf("%w: overflow detected when computing sector download price", errHostSettingsGouging) + return fmt.Errorf("%w: overflow detected when computing sector download price", ErrHostSettingsGouging) } dpptb, overflow := sectorDownloadPrice.Mul64WithOverflow(uint64(bytesPerTB) / rhpv2.SectorSize) // sectors per TB if overflow { - return fmt.Errorf("%w: overflow detected when computing download price per TiB", errHostSettingsGouging) + return fmt.Errorf("%w: overflow detected when computing download price per TiB", ErrHostSettingsGouging) } if !gs.MaxDownloadPrice.IsZero() && dpptb.Cmp(gs.MaxDownloadPrice) > 0 { - return fmt.Errorf("%w: cost per TiB exceeds max dl price: %v > %v", errHostSettingsGouging, dpptb, gs.MaxDownloadPrice) + return fmt.Errorf("%w: cost per TiB exceeds max dl price: %v > %v", ErrHostSettingsGouging, dpptb, gs.MaxDownloadPrice) } return nil } diff --git a/internal/rhp/v2/rhp.go b/internal/rhp/v2/rhp.go index c2454c13d..14c2b48c7 100644 --- a/internal/rhp/v2/rhp.go +++ b/internal/rhp/v2/rhp.go @@ -13,6 +13,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/gouging" "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" @@ -67,6 +68,10 @@ var ( ErrNoSectorsToPrune = errors.New("no sectors to prune") ) +type ( + PrunableRootsFn = func(fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) +) + type ( Dialer interface { Dial(ctx context.Context, hk types.PublicKey, address string) (net.Conn, error) @@ -163,50 +168,34 @@ func (c *Client) FormContract(ctx context.Context, hostKey types.PublicKey, host return } -func (c *Client) PruneContract(ctx context.Context, renterKey types.PrivateKey, gougingChecker gouging.Checker, hostIP string, hostKey types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64, toKeep []types.Hash256) (revision *types.FileContractRevision, deleted, remaining uint64, cost types.Currency, err error) { +func (c *Client) PruneContract(ctx context.Context, renterKey types.PrivateKey, gougingChecker gouging.Checker, hostIP string, hostKey types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64, diffRootsFn PrunableRootsFn) (revision *types.FileContractRevision, spending api.ContractSpending, deleted, remaining uint64, err error) { + log := c.logger.Named("performContractPruning") err = c.withTransport(ctx, hostKey, hostIP, func(t *rhpv2.Transport) error { return c.withRevisionV2(renterKey, gougingChecker, t, fcid, lastKnownRevisionNumber, func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) (err error) { - // fetch roots - got, fetchCost, err := c.fetchContractRoots(t, renterKey, &rev, settings) - if err != nil { - return err - } - - // update cost and revision - cost = cost.Add(fetchCost) + // reference the revision revision = &rev.Revision - keep := make(map[types.Hash256]struct{}) - for _, root := range toKeep { - keep[root] = struct{}{} - } - - // collect indices for roots we want to prune + // fetch roots to delete var indices []uint64 - for i, root := range got { - if _, wanted := keep[root]; wanted { - delete(keep, root) // prevent duplicates - continue - } - indices = append(indices, uint64(i)) - } - if len(indices) == 0 { - return fmt.Errorf("%w: database holds %d, contract contains %d", ErrNoSectorsToPrune, len(toKeep), len(got)) + indices, spending.SectorRoots, err = c.prunableContractRoots(t, renterKey, &rev, settings, func(fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) { + startt := time.Now() + defer func() { + log.Debugf("batch diff roots took %v", time.Since(startt)) + }() + return diffRootsFn(fcid, roots) + }) + if err != nil { + return err + } else if len(indices) == 0 { + return ErrNoSectorsToPrune } // delete the roots from the contract - var deleteCost types.Currency - deleted, deleteCost, err = c.deleteContractRoots(t, renterKey, &rev, settings, indices) + deleted, spending.Deletions, err = c.deleteContractRoots(t, renterKey, &rev, settings, indices) if deleted < uint64(len(indices)) { remaining = uint64(len(indices)) - deleted } - // update cost and revision - if deleted > 0 { - cost = cost.Add(deleteCost) - revision = &rev.Revision - } - // return sizes instead of number of roots deleted *= rhpv2.SectorSize remaining *= rhpv2.SectorSize @@ -403,99 +392,144 @@ func (c *Client) deleteContractRoots(t *rhpv2.Transport, renterKey types.Private return } -func (c *Client) fetchContractRoots(t *rhpv2.Transport, renterKey types.PrivateKey, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings) (roots []types.Hash256, cost types.Currency, _ error) { - // download the full set of SectorRoots +func (c *Client) prunableContractRoots(t *rhpv2.Transport, renterKey types.PrivateKey, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings, prunableRootsFn PrunableRootsFn) (indices []uint64, cost types.Currency, _ error) { numsectors := rev.NumSectors() for offset := uint64(0); offset < numsectors; { + // calculate the batch size n := batchSizeFetchSectors if offset+n > numsectors { n = numsectors - offset } - // calculate the cost - batchCost, _ := settings.RPCSectorRootsCost(offset, n).Total() - - // TODO: remove once host network is updated - if utils.VersionCmp(settings.Version, "1.6.0") < 0 { - // calculate the response size - proofSize := rhpv2.RangeProofSize(numsectors, offset, offset+n) - responseSize := (proofSize + n) * 32 - if responseSize < minMessageSize { - responseSize = minMessageSize - } - batchCost = settings.BaseRPCPrice.Add(settings.DownloadBandwidthPrice.Mul64(responseSize)) - batchCost = batchCost.Mul64(2) // generous leeway - } - - // check funds - if rev.RenterFunds().Cmp(batchCost) < 0 { - return nil, types.ZeroCurrency, ErrInsufficientFunds - } - - // update the revision number - if rev.Revision.RevisionNumber == math.MaxUint64 { - return nil, types.ZeroCurrency, ErrContractFinalized + // fetch the batch + batch, batchCost, err := c.fetchContractRootsBatch(t, renterKey, rev, settings, offset, n) + if err != nil { + return nil, types.ZeroCurrency, err } - rev.Revision.RevisionNumber++ - // update the revision outputs - newRevision, err := updatedRevision(rev.Revision, batchCost, types.ZeroCurrency) + // fetch prunable roots for this batch + prunable, err := prunableRootsFn(rev.ID(), batch) if err != nil { return nil, types.ZeroCurrency, err } - // build the sector roots request - revisionHash := hashRevision(newRevision) - req := &rhpv2.RPCSectorRootsRequest{ - RootOffset: uint64(offset), - NumRoots: uint64(n), - - RevisionNumber: rev.Revision.RevisionNumber, - ValidProofValues: []types.Currency{ - newRevision.ValidProofOutputs[0].Value, - newRevision.ValidProofOutputs[1].Value, - }, - MissedProofValues: []types.Currency{ - newRevision.MissedProofOutputs[0].Value, - newRevision.MissedProofOutputs[1].Value, - newRevision.MissedProofOutputs[2].Value, - }, - Signature: renterKey.SignHash(revisionHash), + // append the roots, make sure to take the offset into account + for _, index := range prunable { + indices = append(indices, index+offset) } + offset += n - // execute the sector roots RPC - var rootsResp rhpv2.RPCSectorRootsResponse - if err := t.WriteRequest(rhpv2.RPCSectorRootsID, req); err != nil { - return nil, types.ZeroCurrency, err - } else if err := t.ReadResponse(&rootsResp, maxMerkleProofResponseSize); err != nil { - return nil, types.ZeroCurrency, fmt.Errorf("couldn't read sector roots response: %w", err) - } + // update the cost + cost = cost.Add(batchCost) + } + return +} - // verify the host signature - if !rev.HostKey().VerifyHash(revisionHash, rootsResp.Signature) { - return nil, types.ZeroCurrency, errors.New("host's signature is invalid") +func (c *Client) fetchContractRoots(t *rhpv2.Transport, renterKey types.PrivateKey, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings) (roots []types.Hash256, cost types.Currency, _ error) { + numsectors := rev.NumSectors() + for offset := uint64(0); offset < numsectors; { + // calculate the batch size + n := batchSizeFetchSectors + if offset+n > numsectors { + n = numsectors - offset } - rev.Signatures[0].Signature = req.Signature[:] - rev.Signatures[1].Signature = rootsResp.Signature[:] - - // verify the proof - if uint64(len(rootsResp.SectorRoots)) != n { - return nil, types.ZeroCurrency, fmt.Errorf("couldn't verify contract roots proof, host %v, version %v, err: number of roots does not match range %d != %d (num sectors: %d rev size: %d offset: %d)", rev.HostKey(), settings.Version, len(rootsResp.SectorRoots), n, numsectors, rev.Revision.Filesize, offset) - } else if !rhpv2.VerifySectorRangeProof(rootsResp.MerkleProof, rootsResp.SectorRoots, offset, offset+n, numsectors, rev.Revision.FileMerkleRoot) { - return nil, types.ZeroCurrency, fmt.Errorf("couldn't verify contract roots proof, host %v, version %v; %w", rev.HostKey(), settings.Version, ErrInvalidMerkleProof) + + // fetch the batch + batch, batchCost, err := c.fetchContractRootsBatch(t, renterKey, rev, settings, offset, n) + if err != nil { + return nil, types.ZeroCurrency, err } - // append roots - roots = append(roots, rootsResp.SectorRoots...) + // append the roots + roots = append(roots, batch...) offset += n - // update revision - rev.Revision = newRevision + // update the cost cost = cost.Add(batchCost) } return } +func (c *Client) fetchContractRootsBatch(t *rhpv2.Transport, renterKey types.PrivateKey, rev *rhpv2.ContractRevision, settings rhpv2.HostSettings, offset, limit uint64) ([]types.Hash256, types.Currency, error) { + // calculate the cost + cost, _ := settings.RPCSectorRootsCost(offset, limit).Total() + + // TODO: remove once host network is updated + if utils.VersionCmp(settings.Version, "1.6.0") < 0 { + // calculate the response size + proofSize := rhpv2.RangeProofSize(rev.NumSectors(), offset, offset+limit) + responseSize := (proofSize + limit) * 32 + if responseSize < minMessageSize { + responseSize = minMessageSize + } + cost = settings.BaseRPCPrice.Add(settings.DownloadBandwidthPrice.Mul64(responseSize)) + cost = cost.Mul64(2) // generous leeway + } + + // check funds + if rev.RenterFunds().Cmp(cost) < 0 { + return nil, types.ZeroCurrency, ErrInsufficientFunds + } + + // update the revision number + if rev.Revision.RevisionNumber == math.MaxUint64 { + return nil, types.ZeroCurrency, ErrContractFinalized + } + rev.Revision.RevisionNumber++ + + // update the revision outputs + newRevision, err := updatedRevision(rev.Revision, cost, types.ZeroCurrency) + if err != nil { + return nil, types.ZeroCurrency, err + } + + // build the sector roots request + revisionHash := hashRevision(newRevision) + req := &rhpv2.RPCSectorRootsRequest{ + RootOffset: offset, + NumRoots: limit, + + RevisionNumber: rev.Revision.RevisionNumber, + ValidProofValues: []types.Currency{ + newRevision.ValidProofOutputs[0].Value, + newRevision.ValidProofOutputs[1].Value, + }, + MissedProofValues: []types.Currency{ + newRevision.MissedProofOutputs[0].Value, + newRevision.MissedProofOutputs[1].Value, + newRevision.MissedProofOutputs[2].Value, + }, + Signature: renterKey.SignHash(revisionHash), + } + + // execute the sector roots RPC + var rootsResp rhpv2.RPCSectorRootsResponse + if err := t.WriteRequest(rhpv2.RPCSectorRootsID, req); err != nil { + return nil, types.ZeroCurrency, err + } else if err := t.ReadResponse(&rootsResp, maxMerkleProofResponseSize); err != nil { + return nil, types.ZeroCurrency, fmt.Errorf("couldn't read sector roots response: %w", err) + } + + // verify the host signature + if !rev.HostKey().VerifyHash(revisionHash, rootsResp.Signature) { + return nil, cost, errors.New("host's signature is invalid") + } + rev.Signatures[0].Signature = req.Signature[:] + rev.Signatures[1].Signature = rootsResp.Signature[:] + + // verify the proof + if uint64(len(rootsResp.SectorRoots)) != limit { + return nil, cost, fmt.Errorf("couldn't verify contract roots proof, host %v, version %v, err: number of roots does not match range %d != %d (num sectors: %d rev size: %d offset: %d)", rev.HostKey(), settings.Version, len(rootsResp.SectorRoots), limit, rev.NumSectors(), rev.Revision.Filesize, offset) + } else if !rhpv2.VerifySectorRangeProof(rootsResp.MerkleProof, rootsResp.SectorRoots, offset, offset+limit, rev.NumSectors(), rev.Revision.FileMerkleRoot) { + return nil, cost, fmt.Errorf("couldn't verify contract roots proof, host %v, version %v; %w", rev.HostKey(), settings.Version, ErrInvalidMerkleProof) + } + + // update revision + rev.Revision = newRevision + + return rootsResp.SectorRoots, cost, nil +} + func (w *Client) withRevisionV2(renterKey types.PrivateKey, gougingChecker gouging.Checker, t *rhpv2.Transport, fcid types.FileContractID, lastKnownRevisionNumber uint64, fn func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) error) error { // execute lock RPC var lockResp rhpv2.RPCLockResponse @@ -549,7 +583,7 @@ func (w *Client) withRevisionV2(renterKey types.PrivateKey, gougingChecker gougi // perform gouging checks on settings if breakdown := gougingChecker.CheckSettings(settings); breakdown.Gouging() { - return fmt.Errorf("failed to prune contract: %v", breakdown) + return fmt.Errorf("%w: %v", gouging.ErrHostSettingsGouging, breakdown) } return fn(t, rev, settings) diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 3b09b672e..3aaf6456e 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -100,6 +100,32 @@ func (tc *TestCluster) Accounts() []api.Account { return accounts } +func (tc *TestCluster) ContractRoots(ctx context.Context, fcid types.FileContractID) ([]types.Hash256, error) { + tc.tt.Helper() + + c, err := tc.Bus.Contract(ctx, fcid) + if err != nil { + return nil, err + } + + var h *Host + for _, host := range tc.hosts { + if host.PublicKey() == c.HostKey { + h = host + break + } + } + if h == nil { + return nil, fmt.Errorf("no host found for contract %v", c) + } + + roots, err := h.store.SectorRoots() + if err != nil { + return nil, err + } + return roots[c.ID], nil +} + func (tc *TestCluster) ShutdownAutopilot(ctx context.Context) { tc.tt.Helper() for _, fn := range tc.autopilotShutdownFns { diff --git a/internal/test/e2e/contracts_test.go b/internal/test/e2e/contracts_test.go index 82a8f043b..0304e0909 100644 --- a/internal/test/e2e/contracts_test.go +++ b/internal/test/e2e/contracts_test.go @@ -2,6 +2,7 @@ package e2e import ( "context" + "errors" "fmt" "testing" "time" @@ -45,9 +46,21 @@ func TestFormContract(t *testing.T) { _, err = b.Contract(context.Background(), contract.ID) tt.OK(err) + // fetch autopilot config + old, err := b.Autopilot(context.Background(), api.DefaultAutopilotID) + tt.OK(err) + // mine to the renew window cluster.MineToRenewWindow() + // wait until autopilot updated the current period + tt.Retry(100, 100*time.Millisecond, func() error { + if curr, _ := b.Autopilot(context.Background(), api.DefaultAutopilotID); curr.CurrentPeriod == old.CurrentPeriod { + return errors.New("autopilot didn't update the current period") + } + return nil + }) + // update autopilot config to allow for 1 contract, this won't form a // contract but will ensure we don't skip contract maintenance, which should // renew the contract we formed diff --git a/internal/test/e2e/pruning_test.go b/internal/test/e2e/pruning_test.go index 8492bf9f1..84cce4b21 100644 --- a/internal/test/e2e/pruning_test.go +++ b/internal/test/e2e/pruning_test.go @@ -3,6 +3,7 @@ package e2e import ( "bytes" "context" + "errors" "fmt" "math" "strings" @@ -12,6 +13,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/test" + "go.uber.org/zap" ) func TestHostPruning(t *testing.T) { @@ -98,7 +100,9 @@ func TestSectorPruning(t *testing.T) { } // create a cluster - cluster := newTestCluster(t, clusterOptsDefault) + opts := clusterOptsDefault + opts.logger = zap.NewNop() + cluster := newTestCluster(t, opts) defer cluster.Shutdown() // add a helper to check whether a root is in a given slice @@ -121,13 +125,13 @@ func TestSectorPruning(t *testing.T) { numObjects := 10 // add hosts - hosts := cluster.AddHostsBlocking(int(cfg.Contracts.Amount)) + hosts := cluster.AddHostsBlocking(rs.TotalShards) // wait until we have accounts cluster.WaitForAccounts() // wait until we have a contract set - cluster.WaitForContractSetContracts(cfg.Contracts.Set, int(cfg.Contracts.Amount)) + cluster.WaitForContractSetContracts(cfg.Contracts.Set, rs.TotalShards) // add several objects for i := 0; i < numObjects; i++ { @@ -147,7 +151,8 @@ func TestSectorPruning(t *testing.T) { for _, c := range contracts { dbRoots, _, err := b.ContractRoots(context.Background(), c.ID) tt.OK(err) - cRoots, err := w.RHPContractRoots(context.Background(), c.ID) + + cRoots, err := cluster.ContractRoots(context.Background(), c.ID) tt.OK(err) if len(dbRoots) != len(cRoots) { t.Fatal("unexpected number of roots", dbRoots, cRoots) @@ -163,7 +168,7 @@ func TestSectorPruning(t *testing.T) { t.Fatal("unexpected number of roots", n) } - // sleep for a bit to ensure spending records get flushed + // sleep to ensure spending records get flushed time.Sleep(3 * testBusFlushInterval) // assert prunable data is 0 @@ -180,7 +185,7 @@ func TestSectorPruning(t *testing.T) { } // assert amount of prunable data - tt.Retry(100, 100*time.Millisecond, func() error { + tt.Retry(300, 100*time.Millisecond, func() error { res, err = b.PrunableData(context.Background()) tt.OK(err) if res.TotalPrunable != uint64(math.Ceil(float64(numObjects)/2))*rs.SlabSize() { @@ -191,18 +196,21 @@ func TestSectorPruning(t *testing.T) { // prune all contracts for _, c := range contracts { - tt.OKAll(w.RHPPruneContract(context.Background(), c.ID, 0)) - } - - // assert spending records were updated and prunable data is 0 - tt.Retry(10, testBusFlushInterval, func() error { - res, err := b.PrunableData(context.Background()) + res, err := b.PruneContract(context.Background(), c.ID, 0) tt.OK(err) - if res.TotalPrunable != 0 { - return fmt.Errorf("unexpected prunable data: %d", n) + if res.Pruned == 0 { + t.Fatal("expected pruned to be non-zero") + } else if res.Remaining != 0 { + t.Fatal("expected remaining to be zero") } - return nil - }) + } + + // assert prunable data is 0 + res, err = b.PrunableData(context.Background()) + tt.OK(err) + if res.TotalPrunable != 0 { + t.Fatalf("unexpected prunable data: %d", n) + } // assert spending was updated for _, c := range contracts { @@ -222,15 +230,23 @@ func TestSectorPruning(t *testing.T) { tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, filename, api.DeleteObjectOptions{})) } - // sleep for a bit to ensure spending records get flushed - time.Sleep(3 * testBusFlushInterval) - // assert amount of prunable data - res, err = b.PrunableData(context.Background()) - tt.OK(err) - if res.TotalPrunable == 0 { - t.Fatal("expected prunable data") - } + tt.Retry(300, 100*time.Millisecond, func() error { + res, err = b.PrunableData(context.Background()) + tt.OK(err) + + if len(res.Contracts) != len(contracts) { + return fmt.Errorf("expected %d contracts, got %d", len(contracts), len(res.Contracts)) + } else if res.TotalPrunable == 0 { + var sizes []string + for _, c := range res.Contracts { + res, _ := b.ContractSize(context.Background(), c.ID) + sizes = append(sizes, fmt.Sprintf("c: %v size: %v prunable: %v", c.ID, res.Size, res.Prunable)) + } + return errors.New("expected prunable data, contract sizes:\n" + strings.Join(sizes, "\n")) + } + return nil + }) // update the host settings so it's gouging host := hosts[0] @@ -249,7 +265,7 @@ func TestSectorPruning(t *testing.T) { } // prune the contract and assert it threw a gouging error - _, _, err = w.RHPPruneContract(context.Background(), c.ID, 0) + _, err = b.PruneContract(context.Background(), c.ID, 0) if err == nil || !strings.Contains(err.Error(), "gouging") { t.Fatal("expected gouging error", err) } diff --git a/internal/test/e2e/uploads_test.go b/internal/test/e2e/uploads_test.go index 3f83fd7e4..c601becd6 100644 --- a/internal/test/e2e/uploads_test.go +++ b/internal/test/e2e/uploads_test.go @@ -135,7 +135,7 @@ func TestUploadingSectorsCache(t *testing.T) { } } - cr, err := w.RHPContractRoots(context.Background(), id) + cr, err := cluster.ContractRoots(context.Background(), id) tt.OK(err) expected := make(map[types.Hash256]struct{}) for _, root := range cr { diff --git a/stores/bench_test.go b/stores/bench_test.go new file mode 100644 index 000000000..60f75b52f --- /dev/null +++ b/stores/bench_test.go @@ -0,0 +1,167 @@ +package stores + +import ( + "context" + "errors" + "fmt" + "path/filepath" + "testing" + "time" + + "go.sia.tech/core/types" + isql "go.sia.tech/renterd/internal/sql" + "go.sia.tech/renterd/object" + "go.sia.tech/renterd/stores/sql" + "go.sia.tech/renterd/stores/sql/sqlite" + "go.uber.org/zap" + "lukechampine.com/frand" +) + +// BenchmarkPrunableContractRoots benchmarks diffing the roots of a contract +// with a given set of roots to determine which roots are prunable. +// +// 15.32 MB/s | M1 Max | cd32fad7 (diff ~2TiB of contract data per second) +func BenchmarkPrunableContractRoots(b *testing.B) { + // define parameters + batchSize := int64(25600) // 100GiB of contract data + contractSize := 1 << 40 // 1TiB contract + sectorSize := 4 << 20 // 4MiB sector + numSectors := contractSize / sectorSize + + // create database + db, err := newTestDB(context.Background(), b.TempDir()) + if err != nil { + b.Fatal(err) + } + + // prepare database + fcid := types.FileContractID{1} + roots, err := prepareDB(db.DB(), fcid, numSectors) + if err != nil { + b.Fatal(err) + } + + // prepare batch + frand.Shuffle(len(roots), func(i, j int) { + roots[i], roots[j] = roots[j], roots[i] + }) + batch := roots[:batchSize] + + // start benchmark + b.ResetTimer() + b.SetBytes(batchSize * 32) + for i := 0; i < b.N; i++ { + if err := db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { + indices, err := tx.PrunableContractRoots(context.Background(), fcid, batch) + if err != nil { + return err + } else if len(indices) != 0 { + return errors.New("expected no prunable roots") + } + return nil + }); err != nil { + b.Fatal(err) + } + } +} + +func prepareDB(db *isql.DB, fcid types.FileContractID, n int) (roots []types.Hash256, _ error) { + // insert host + hk := types.PublicKey{1} + res, err := db.Exec(context.Background(), ` +INSERT INTO hosts (public_key) VALUES (?)`, sql.PublicKey(hk)) + if err != nil { + return nil, err + } + hostID, err := res.LastInsertId() + if err != nil { + return nil, err + } + + // insert contract + res, err = db.Exec(context.Background(), ` +INSERT INTO contracts (host_id, fcid,start_height) VALUES (?, ?, ?)`, hostID, sql.FileContractID(fcid), 0) + if err != nil { + return nil, err + } + contractID, err := res.LastInsertId() + if err != nil { + return nil, err + } + + // insert slab + key := object.GenerateEncryptionKey() + res, err = db.Exec(context.Background(), ` +INSERT INTO slabs (created_at, `+"`key`"+`) VALUES (?, ?)`, time.Now(), sql.EncryptionKey(key)) + if err != nil { + return nil, err + } + slabID, err := res.LastInsertId() + if err != nil { + return nil, err + } + + // insert sectors + insertSectorStmt, err := db.Prepare(context.Background(), ` +INSERT INTO sectors (db_slab_id, slab_index, latest_host, root) VALUES (?, ?, ?, ?) RETURNING id`) + if err != nil { + return nil, fmt.Errorf("failed to prepare statement to insert sector: %w", err) + } + defer insertSectorStmt.Close() + var sectorIDs []int64 + for i := 0; i < n; i++ { + var sectorID int64 + roots = append(roots, frand.Entropy256()) + err := insertSectorStmt.QueryRow(context.Background(), slabID, i, sql.PublicKey(hk), sql.Hash256(roots[i])).Scan(§orID) + if err != nil { + return nil, fmt.Errorf("failed to insert sector: %w", err) + } + sectorIDs = append(sectorIDs, sectorID) + } + + // insert contract sectors + insertLinkStmt, err := db.Prepare(context.Background(), ` +INSERT INTO contract_sectors (db_contract_id, db_sector_id) VALUES (?, ?)`) + if err != nil { + return nil, fmt.Errorf("failed to prepare statement to insert contract sectors: %w", err) + } + defer insertLinkStmt.Close() + for _, sectorID := range sectorIDs { + if _, err := insertLinkStmt.Exec(context.Background(), contractID, sectorID); err != nil { + return nil, fmt.Errorf("failed to insert contract sector: %w", err) + } + } + + // sanity check + var cnt int + err = db.QueryRow(context.Background(), ` +SELECT COUNT(s.root) +FROM contracts c +INNER JOIN contract_sectors cs ON cs.db_contract_id = c.id +INNER JOIN sectors s ON cs.db_sector_id = s.id +WHERE c.fcid = ?`, sql.FileContractID(fcid)).Scan(&cnt) + if cnt != n { + return nil, fmt.Errorf("expected %v sectors, got %v", n, cnt) + } + + return +} + +func newTestDB(ctx context.Context, dir string) (*sqlite.MainDatabase, error) { + db, err := sqlite.Open(filepath.Join(dir, "db.sqlite")) + if err != nil { + return nil, err + } + + dbMain, err := sqlite.NewMainDatabase(db, zap.NewNop(), 100*time.Millisecond, 100*time.Millisecond) + if err != nil { + return nil, err + } + + err = dbMain.Migrate(ctx) + if err != nil { + return nil, err + } + + return dbMain, nil +} diff --git a/stores/metadata.go b/stores/metadata.go index fe26bc29e..1f4755bd7 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -669,6 +669,14 @@ func (s *SQLStore) ObjectsBySlabKey(ctx context.Context, bucket string, slabKey return } +func (s *SQLStore) PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) { + err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { + indices, err = tx.PrunableContractRoots(ctx, fcid, roots) + return err + }) + return +} + // MarkPackedSlabsUploaded marks the given slabs as uploaded and deletes them // from the buffer. func (s *SQLStore) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) error { diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 6d0639e5d..b3529eea6 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -126,6 +126,82 @@ SET health = ( return err } +func TestPrunableContractRoots(t *testing.T) { + // create a SQL store + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // add a contract + hks, err := ss.addTestHosts(1) + if err != nil { + t.Fatal(err) + } + fcids, _, err := ss.addTestContracts(hks) + if err != nil { + t.Fatal(err) + } + + // add 4 objects + for i := 1; i <= 4; i++ { + if _, err := ss.addTestObject(fmt.Sprintf("%s_%d", t.Name(), i), object.Object{ + Key: object.GenerateEncryptionKey(), + Slabs: []object.SlabSlice{ + { + Slab: object.Slab{ + Key: object.GenerateEncryptionKey(), + MinShards: 1, + Shards: newTestShards(hks[0], fcids[0], types.Hash256{byte(i)}), + }, + }, + }, + }); err != nil { + t.Fatal(err) + } + } + + // assert there's 4 roots in the database + roots, err := ss.ContractRoots(context.Background(), fcids[0]) + if err != nil { + t.Fatal(err) + } else if len(roots) != 4 { + t.Fatal("unexpected number of roots", len(roots)) + } + + // diff the roots - should be empty + indices, err := ss.PrunableContractRoots(context.Background(), fcids[0], roots) + if err != nil { + t.Fatal(err) + } else if len(indices) != 0 { + t.Fatal("unexpected number of indices", len(indices)) + } + + // delete every other object + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, fmt.Sprintf("%s_1", t.Name())); err != nil { + t.Fatal(err) + } + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, fmt.Sprintf("%s_3", t.Name())); err != nil { + t.Fatal(err) + } + + // assert there's 2 roots left + updated, err := ss.ContractRoots(context.Background(), fcids[0]) + if err != nil { + t.Fatal(err) + } else if len(updated) != 2 { + t.Fatal("unexpected number of roots", len(updated)) + } + + // diff the roots again, should return indices 0 and 2 + indices, err = ss.PrunableContractRoots(context.Background(), fcids[0], roots) + if err != nil { + t.Fatal(err) + } else if len(indices) != 2 { + t.Fatal("unexpected number of indices", len(indices)) + } else if indices[0] != 0 || indices[1] != 2 { + t.Fatal("unexpected indices", indices) + } +} + // TestObjectBasic tests the hydration of raw objects works when we fetch // objects from the metadata store. func TestObjectBasic(t *testing.T) { diff --git a/stores/sql/database.go b/stores/sql/database.go index 3e1917502..bc08865da 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -243,6 +243,10 @@ type ( // ProcessChainUpdate applies the given chain update to the database. ProcessChainUpdate(ctx context.Context, applyFn func(ChainUpdateTx) error) error + // PrunableContractRoots returns the indices of roots that are not in + // the contract. + PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) + // PruneEmptydirs prunes any directories that are empty. PruneEmptydirs(ctx context.Context) error diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index de8b97bfa..ef671baef 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -26,6 +26,10 @@ import ( "go.uber.org/zap" ) +const ( + batchSizeInsertSectors = 500 +) + type ( MainDatabase struct { db *sql.DB @@ -566,6 +570,62 @@ func (tx *MainDatabaseTx) ProcessChainUpdate(ctx context.Context, fn func(ssql.C }) } +func (tx *MainDatabaseTx) PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) { + // build tmp table name + tmpTable := strings.ReplaceAll(fmt.Sprintf("tmp_host_roots_%s", fcid.String()[:8]), ":", "_") + + // create temporary table + _, err = tx.Exec(ctx, fmt.Sprintf(` +DROP TABLE IF EXISTS %s; +CREATE TEMPORARY TABLE %s (idx INT, root varbinary(32)) ENGINE=MEMORY; +CREATE INDEX %s_idx ON %s (root(32));`, tmpTable, tmpTable, tmpTable, tmpTable)) + if err != nil { + return nil, fmt.Errorf("failed to create temporary table: %w", err) + } + + // defer removal + defer func() { + if _, err := tx.Exec(ctx, fmt.Sprintf(`DROP TABLE %s;`, tmpTable)); err != nil { + tx.log.Warnw("failed to drop temporary table", zap.Error(err)) + } + }() + + // insert roots in batches + for i := 0; i < len(roots); i += batchSizeInsertSectors { + end := i + batchSizeInsertSectors + if end > len(roots) { + end = len(roots) + } + + var params []interface{} + for i, r := range roots[i:end] { + params = append(params, uint64(i), ssql.Hash256(r)) + } + + _, err = tx.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (idx, root) VALUES %s`, tmpTable, strings.TrimSuffix(strings.Repeat("(?, ?), ", end-i), ", ")), params...) + if err != nil { + return nil, fmt.Errorf("failed to insert into roots into temporary table: %w", err) + } + } + + // execute query + rows, err := tx.Query(ctx, fmt.Sprintf(`SELECT idx FROM %s tmp LEFT JOIN sectors s ON s.root = tmp.root WHERE s.root IS NULL`, tmpTable)) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract roots: %w", err) + } + defer rows.Close() + + // fetch indices + for rows.Next() { + var idx uint64 + if err := rows.Scan(&idx); err != nil { + return nil, fmt.Errorf("failed to scan root index: %w", err) + } + indices = append(indices, idx) + } + return +} + func (tx *MainDatabaseTx) PruneEmptydirs(ctx context.Context) error { stmt, err := tx.Prepare(ctx, ` DELETE diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 739fb47f4..744617565 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -25,6 +25,10 @@ import ( "go.uber.org/zap" ) +const ( + batchSizeInsertSectors = 500 +) + type ( MainDatabase struct { db *sql.DB @@ -447,7 +451,6 @@ func (tx *MainDatabaseTx) InvalidateSlabHealthByFCID(ctx context.Context, fcids ) `, strings.Repeat("?, ", len(fcids)-1)+"?"), args...) if err != nil { - fmt.Println(strings.Repeat("?, ", len(fcids)-1) + "?") return 0, err } return res.RowsAffected() @@ -563,6 +566,76 @@ func (tx *MainDatabaseTx) ProcessChainUpdate(ctx context.Context, fn func(ssql.C }) } +func (tx *MainDatabaseTx) PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) { + // build tmp table name + tmpTable := strings.ReplaceAll(fmt.Sprintf("tmp_host_roots_%s", fcid.String()[:8]), ":", "_") + + // create temporary table + _, err = tx.Exec(ctx, fmt.Sprintf(` +DROP TABLE IF EXISTS %s; +CREATE TEMPORARY TABLE %s (idx INT, root blob); +CREATE INDEX %s_idx ON %s (root);`, tmpTable, tmpTable, tmpTable, tmpTable)) + if err != nil { + return nil, fmt.Errorf("failed to create temporary table: %w", err) + } + + // defer removal + defer func() { + if _, err := tx.Exec(ctx, fmt.Sprintf(`DROP TABLE %s;`, tmpTable)); err != nil { + tx.log.Warnw("failed to drop temporary table", zap.Error(err)) + } + }() + + // prepare insert statement + insertStmt, err := tx.Prepare(ctx, fmt.Sprintf(`INSERT INTO %s (idx, root) VALUES %s`, tmpTable, strings.TrimSuffix(strings.Repeat("(?, ?), ", batchSizeInsertSectors), ", "))) + if err != nil { + return nil, fmt.Errorf("failed to prepare statement to insert contract roots: %w", err) + } + defer insertStmt.Close() + + // insert roots in batches + for i := 0; i < len(roots); i += batchSizeInsertSectors { + end := i + batchSizeInsertSectors + if end > len(roots) { + end = len(roots) + } + + var params []interface{} + for i, r := range roots[i:end] { + params = append(params, uint64(i), ssql.Hash256(r)) + } + + if len(params) == batchSizeInsertSectors { + _, err := insertStmt.Exec(ctx, params...) + if err != nil { + return nil, fmt.Errorf("failed to insert into roots into temporary table: %w", err) + } + } else { + _, err = tx.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (idx, root) VALUES %s`, tmpTable, strings.TrimSuffix(strings.Repeat("(?, ?), ", end-i), ", ")), params...) + if err != nil { + return nil, fmt.Errorf("failed to insert into roots into temporary table: %w", err) + } + } + } + + // execute query + rows, err := tx.Query(ctx, fmt.Sprintf(`SELECT idx FROM %s tmp LEFT JOIN sectors s ON s.root = tmp.root WHERE s.root IS NULL`, tmpTable)) + if err != nil { + return nil, fmt.Errorf("failed to fetch contract roots: %w", err) + } + defer rows.Close() + + // fetch indices + for rows.Next() { + var idx uint64 + if err := rows.Scan(&idx); err != nil { + return nil, fmt.Errorf("failed to scan root index: %w", err) + } + indices = append(indices, idx) + } + return +} + func (tx *MainDatabaseTx) PruneEmptydirs(ctx context.Context) error { stmt, err := tx.Prepare(ctx, ` DELETE diff --git a/worker/client/rhp.go b/worker/client/rhp.go index 8352f54ad..bb923b705 100644 --- a/worker/client/rhp.go +++ b/worker/client/rhp.go @@ -2,7 +2,6 @@ package client import ( "context" - "errors" "fmt" "time" @@ -16,12 +15,6 @@ func (c *Client) RHPBroadcast(ctx context.Context, contractID types.FileContract return } -// RHPContractRoots fetches the roots of the contract with given id. -func (c *Client) RHPContractRoots(ctx context.Context, contractID types.FileContractID) (roots []types.Hash256, err error) { - err = c.c.WithContext(ctx).GET(fmt.Sprintf("/rhp/contract/%s/roots", contractID), &roots) - return -} - // RHPPriceTable fetches a price table for a host. func (c *Client) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (pt api.HostPriceTable, err error) { req := api.RHPPriceTableRequest{ @@ -33,22 +26,6 @@ func (c *Client) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, sia return } -// RHPPruneContract prunes deleted sectors from the contract with given id. -func (c *Client) RHPPruneContract(ctx context.Context, contractID types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error) { - var res api.RHPPruneContractResponse - if err = c.c.WithContext(ctx).POST(fmt.Sprintf("/rhp/contract/%s/prune", contractID), api.RHPPruneContractRequest{ - Timeout: api.DurationMS(timeout), - }, &res); err != nil { - return - } else if res.Error != "" { - err = errors.New(res.Error) - } - - pruned = res.Pruned - remaining = res.Remaining - return -} - // RHPScan scans a host, returning its current settings. func (c *Client) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (resp api.RHPScanResponse, err error) { err = c.c.WithContext(ctx).POST("/rhp/scan", api.RHPScanRequest{ diff --git a/worker/host.go b/worker/host.go index 40695f8b0..2ecd95233 100644 --- a/worker/host.go +++ b/worker/host.go @@ -110,6 +110,7 @@ func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, secto }); err != nil { return err } + // upload cost, err := h.client.AppendSector(ctx, sectorRoot, sector, &rev, h.hk, h.siamuxAddr, h.acc.ID(), pt, h.renterKey) if err != nil { diff --git a/worker/worker.go b/worker/worker.go index be27f2cc6..76a8a8e31 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -416,119 +416,6 @@ func (w *Worker) rhpBroadcastHandler(jc jape.Context) { } } -func (w *Worker) rhpPruneContractHandlerPOST(jc jape.Context) { - ctx := jc.Request.Context() - - // decode fcid - var fcid types.FileContractID - if jc.DecodeParam("id", &fcid) != nil { - return - } - - // decode timeout - var pcr api.RHPPruneContractRequest - if jc.Decode(&pcr) != nil { - return - } - - // apply timeout - if pcr.Timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(pcr.Timeout)) - defer cancel() - } - - // fetch the contract from the bus - contract, err := w.bus.Contract(ctx, fcid) - if errors.Is(err, api.ErrContractNotFound) { - jc.Error(err, http.StatusNotFound) - return - } else if jc.Check("couldn't fetch contract", err) != nil { - return - } - - // return early if there's no data to prune - size, err := w.bus.ContractSize(ctx, fcid) - if jc.Check("couldn't fetch contract size", err) != nil { - return - } else if size.Prunable == 0 { - jc.Encode(api.RHPPruneContractResponse{}) - return - } - - // fetch gouging params - gp, err := w.bus.GougingParams(ctx) - if jc.Check("could not fetch gouging parameters", err) != nil { - return - } - gc := newGougingChecker(gp.GougingSettings, gp.ConsensusState, gp.TransactionFee, false) - - // prune the contract - var pruned, remaining uint64 - var rev *types.FileContractRevision - var cost types.Currency - err = w.withContractLock(ctx, contract.ID, lockingPriorityPruning, func() error { - stored, pending, err := w.bus.ContractRoots(ctx, contract.ID) - if err != nil { - return fmt.Errorf("failed to fetch contract roots; %w", err) - } - rev, pruned, remaining, cost, err = w.rhp2Client.PruneContract(ctx, w.deriveRenterKey(contract.HostKey), gc, contract.HostIP, contract.HostKey, fcid, contract.RevisionNumber, append(stored, pending...)) - return err - }) - if rev != nil { - w.contractSpendingRecorder.Record(*rev, api.ContractSpending{Deletions: cost}) - } - if err != nil && !errors.Is(err, rhp2.ErrNoSectorsToPrune) && pruned == 0 { - err = fmt.Errorf("failed to prune contract %v; %w", fcid, err) - jc.Error(err, http.StatusInternalServerError) - return - } - - res := api.RHPPruneContractResponse{ - Pruned: pruned, - Remaining: remaining, - } - if err != nil { - res.Error = err.Error() - } - jc.Encode(res) -} - -func (w *Worker) rhpContractRootsHandlerGET(jc jape.Context) { - ctx := jc.Request.Context() - - // decode fcid - var id types.FileContractID - if jc.DecodeParam("id", &id) != nil { - return - } - - // fetch the contract from the bus - c, err := w.bus.Contract(ctx, id) - if errors.Is(err, api.ErrContractNotFound) { - jc.Error(err, http.StatusNotFound) - return - } else if jc.Check("couldn't fetch contract", err) != nil { - return - } - - // fetch gouging params - gp, err := w.bus.GougingParams(ctx) - if jc.Check("couldn't fetch gouging parameters from bus", err) != nil { - return - } - gc := newGougingChecker(gp.GougingSettings, gp.ConsensusState, gp.TransactionFee, false) - - // fetch the roots from the host - roots, rev, cost, err := w.rhp2Client.ContractRoots(ctx, w.deriveRenterKey(c.HostKey), gc, c.HostIP, c.HostKey, id, c.RevisionNumber) - if jc.Check("couldn't fetch contract roots from host", err) != nil { - return - } else if rev != nil { - w.contractSpendingRecorder.Record(*rev, api.ContractSpending{SectorRoots: cost}) - } - jc.Encode(roots) -} - func (w *Worker) slabMigrateHandler(jc jape.Context) { ctx := jc.Request.Context() @@ -1163,8 +1050,6 @@ func (w *Worker) Handler() http.Handler { "GET /rhp/contracts": w.rhpContractsHandlerGET, "POST /rhp/contract/:id/broadcast": w.rhpBroadcastHandler, - "POST /rhp/contract/:id/prune": w.rhpPruneContractHandlerPOST, - "GET /rhp/contract/:id/roots": w.rhpContractRootsHandlerGET, "POST /rhp/scan": w.rhpScanHandler, "POST /rhp/pricetable": w.rhpPriceTableHandler,