From f07554ae6a949f0c7e1e24f8fd7c88a9a852349d Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 29 Jul 2024 15:19:43 +0200 Subject: [PATCH] contractor: address comments --- api/autopilot.go | 13 +- autopilot/contractor/alerts.go | 2 +- autopilot/contractor/contract_spending.go | 11 +- autopilot/contractor/contractor.go | 167 ++++++++---------- autopilot/contractor/hostfilter.go | 2 +- .../contractor/{ipfilter.go => hostset.go} | 14 +- autopilot/contractor/state.go | 4 +- 7 files changed, 94 insertions(+), 119 deletions(-) rename autopilot/contractor/{ipfilter.go => hostset.go} (59%) diff --git a/api/autopilot.go b/api/autopilot.go index c6f78cac0..e81328d88 100644 --- a/api/autopilot.go +++ b/api/autopilot.go @@ -138,19 +138,10 @@ func (c AutopilotConfig) Validate() error { return nil } -func (c ContractsConfig) IsContractInSet(contract Contract) bool { - for _, set := range contract.ContractSets { - if set == c.Set { - return true - } - } - return false -} - func (c ContractsConfig) SortContractsForMaintenance(contracts []Contract) { sort.SliceStable(contracts, func(i, j int) bool { - iInSet := c.IsContractInSet(contracts[i]) - jInSet := c.IsContractInSet(contracts[j]) + iInSet := contracts[i].InSet(c.Set) + jInSet := contracts[j].InSet(c.Set) if iInSet != jInSet { return iInSet } diff --git a/autopilot/contractor/alerts.go b/autopilot/contractor/alerts.go index 4d83a122a..9185c88dd 100644 --- a/autopilot/contractor/alerts.go +++ b/autopilot/contractor/alerts.go @@ -33,7 +33,7 @@ func newContractRenewalFailedAlert(contract api.ContractMetadata, ourFault bool, Message: "Contract renewal failed", Data: map[string]interface{}{ "error": err.Error(), - "ourFault": ourFault, + "hostError": !ourFault, "contractID": contract.ID.String(), "hostKey": contract.HostKey.String(), }, diff --git a/autopilot/contractor/contract_spending.go b/autopilot/contractor/contract_spending.go index 4f8f846c7..54985a130 100644 --- a/autopilot/contractor/contract_spending.go +++ b/autopilot/contractor/contract_spending.go @@ -1,6 +1,8 @@ package contractor import ( + "context" + "go.sia.tech/core/types" "go.sia.tech/renterd/api" ) @@ -27,7 +29,12 @@ func currentPeriodSpending(contracts []api.ContractMetadata, currentPeriod uint6 return totalAllocated } -func remainingFunds(contracts []api.ContractMetadata, state *MaintenanceState) types.Currency { +func remainingAllowance(ctx context.Context, bus Bus, state *MaintenanceState) (types.Currency, error) { + contracts, err := bus.Contracts(ctx, api.ContractsOpts{}) + if err != nil { + return types.Currency{}, err + } + // find out how much we spent in the current period spent := currentPeriodSpending(contracts, state.Period()) @@ -36,5 +43,5 @@ func remainingFunds(contracts []api.ContractMetadata, state *MaintenanceState) t if state.Allowance().Cmp(spent) > 0 { remaining = state.Allowance().Sub(spent) } - return remaining + return remaining, nil } diff --git a/autopilot/contractor/contractor.go b/autopilot/contractor/contractor.go index b16d2a05e..837b9d20a 100644 --- a/autopilot/contractor/contractor.go +++ b/autopilot/contractor/contractor.go @@ -106,19 +106,19 @@ type Worker interface { } type contractChecker interface { - isUsableContract(cfg api.AutopilotConfig, s rhpv2.HostSettings, pt rhpv3.HostPriceTable, rs api.RedundancySettings, contract api.Contract, inSet bool, bh uint64, f *ipFilter) (usable, refresh, renew bool, reasons []string) + isUsableContract(cfg api.AutopilotConfig, s rhpv2.HostSettings, pt rhpv3.HostPriceTable, rs api.RedundancySettings, contract api.Contract, inSet bool, bh uint64, f *hostSet) (usable, refresh, renew bool, reasons []string) pruneContractRefreshFailures(contracts []api.ContractMetadata) shouldArchive(c api.Contract, bh uint64) error } type contractReviser interface { - formContract(ctx *mCtx, w Worker, host api.Host, minInitialContractFunds, maxInitialContractFunds types.Currency, budget *types.Currency) (cm api.ContractMetadata, ourFault bool, err error) - renewContract(ctx *mCtx, w Worker, c api.Contract, h api.Host, budget *types.Currency) (cm api.ContractMetadata, ourFault bool, err error) - refreshContract(ctx *mCtx, w Worker, c api.Contract, h api.Host, budget *types.Currency) (cm api.ContractMetadata, ourFault bool, err error) + formContract(ctx *mCtx, w Worker, host api.Host, minInitialContractFunds, maxInitialContractFunds types.Currency, budget *types.Currency, logger *zap.SugaredLogger) (cm api.ContractMetadata, ourFault bool, err error) + renewContract(ctx *mCtx, w Worker, c api.Contract, h api.Host, budget *types.Currency, logger *zap.SugaredLogger) (cm api.ContractMetadata, ourFault bool, err error) + refreshContract(ctx *mCtx, w Worker, c api.Contract, h api.Host, budget *types.Currency, logger *zap.SugaredLogger) (cm api.ContractMetadata, ourFault bool, err error) } type revisionBroadcaster interface { - runRevisionBroadcast(ctx context.Context, w Worker, contracts []api.ContractMetadata) + broadcastRevisions(ctx context.Context, w Worker, contracts []api.ContractMetadata, logger *zap.SugaredLogger) } type ( @@ -192,17 +192,11 @@ func (c *Contractor) Close() error { } func (c *Contractor) PerformContractMaintenance(ctx context.Context, w Worker, state *MaintenanceState) (bool, error) { - // figure out remaining budget for this period - contracts, err := c.bus.Contracts(ctx, api.ContractsOpts{}) - if err != nil { - return false, fmt.Errorf("failed to fetch contracts: %w", err) - } - remainingFunds := remainingFunds(contracts, state) - return performContractMaintenance(newMaintenanceCtx(ctx, state), c.alerter, c.bus, c.churn, w, c, c, c, remainingFunds, c.logger) + return performContractMaintenance(newMaintenanceCtx(ctx, state), c.alerter, c.bus, c.churn, w, c, c, c, c.logger) } -func (c *Contractor) formContract(ctx *mCtx, w Worker, host api.Host, minInitialContractFunds, maxInitialContractFunds types.Currency, budget *types.Currency) (cm api.ContractMetadata, proceed bool, err error) { - log := c.logger.With("hk", host.PublicKey, "hostVersion", host.Settings.Version, "hostRelease", host.Settings.Release) +func (c *Contractor) formContract(ctx *mCtx, w Worker, host api.Host, minInitialContractFunds, maxInitialContractFunds types.Currency, budget *types.Currency, logger *zap.SugaredLogger) (cm api.ContractMetadata, proceed bool, err error) { + logger = logger.With("hk", host.PublicKey, "hostVersion", host.Settings.Version, "hostRelease", host.Settings.Release) // convenience variables hk := host.PublicKey @@ -210,7 +204,7 @@ func (c *Contractor) formContract(ctx *mCtx, w Worker, host api.Host, minInitial // fetch host settings scan, err := w.RHPScan(ctx, hk, host.NetAddress, 0) if err != nil { - log.Infow(err.Error(), "hk", hk) + logger.Infow(err.Error(), "hk", hk) return api.ContractMetadata{}, true, err } @@ -224,7 +218,7 @@ func (c *Contractor) formContract(ctx *mCtx, w Worker, host api.Host, minInitial txnFee := ctx.state.Fee.Mul64(estimatedFileContractTransactionSetSize) renterFunds := initialContractFunding(scan.Settings, txnFee, minInitialContractFunds, maxInitialContractFunds) if budget.Cmp(renterFunds) < 0 { - log.Infow("insufficient budget", "budget", budget, "needed", renterFunds) + logger.Infow("insufficient budget", "budget", budget, "needed", renterFunds) return api.ContractMetadata{}, false, errors.New("insufficient budget") } @@ -237,7 +231,7 @@ func (c *Contractor) formContract(ctx *mCtx, w Worker, host api.Host, minInitial contract, _, err := w.RHPForm(ctx, endHeight, hk, host.NetAddress, ctx.state.Address, renterFunds, hostCollateral) if err != nil { // TODO: keep track of consecutive failures and break at some point - log.Errorw(fmt.Sprintf("contract formation failed, err: %v", err), "hk", hk) + logger.Errorw(fmt.Sprintf("contract formation failed, err: %v", err), "hk", hk) if utils.IsErr(err, wallet.ErrNotEnoughFunds) { return api.ContractMetadata{}, false, err } @@ -251,11 +245,11 @@ func (c *Contractor) formContract(ctx *mCtx, w Worker, host api.Host, minInitial contractPrice := contract.Revision.MissedHostPayout().Sub(hostCollateral) formedContract, err := c.bus.AddContract(ctx, contract, contractPrice, renterFunds, cs.BlockHeight, api.ContractStatePending) if err != nil { - log.Errorw(fmt.Sprintf("contract formation failed, err: %v", err), "hk", hk) + logger.Errorw(fmt.Sprintf("contract formation failed, err: %v", err), "hk", hk) return api.ContractMetadata{}, true, err } - log.Infow("formation succeeded", + logger.Infow("formation succeeded", "fcid", formedContract.ID, "renterFunds", renterFunds.String(), "collateral", hostCollateral.String(), @@ -290,11 +284,11 @@ func (c *Contractor) pruneContractRefreshFailures(contracts []api.ContractMetada } } -func (c *Contractor) refreshContract(ctx *mCtx, w Worker, contract api.Contract, host api.Host, budget *types.Currency) (cm api.ContractMetadata, proceed bool, err error) { +func (c *Contractor) refreshContract(ctx *mCtx, w Worker, contract api.Contract, host api.Host, budget *types.Currency, logger *zap.SugaredLogger) (cm api.ContractMetadata, proceed bool, err error) { if contract.Revision == nil { return api.ContractMetadata{}, true, errors.New("can't refresh contract without a revision") } - log := c.logger.With("to_renew", contract.ID, "hk", contract.HostKey, "hostVersion", host.Settings.Version, "hostRelease", host.Settings.Release) + logger = logger.With("to_renew", contract.ID, "hk", contract.HostKey, "hostVersion", host.Settings.Version, "hostRelease", host.Settings.Release) // convenience variables settings := host.Settings @@ -312,14 +306,14 @@ func (c *Contractor) refreshContract(ctx *mCtx, w Worker, contract api.Contract, // calculate the renter funds var renterFunds types.Currency if isOutOfFunds(ctx.AutopilotConfig(), pt, contract) { - renterFunds = c.refreshFundingEstimate(ctx.AutopilotConfig(), contract, host, ctx.state.Fee) + renterFunds = c.refreshFundingEstimate(ctx.AutopilotConfig(), contract, host, ctx.state.Fee, logger) } else { renterFunds = rev.ValidRenterPayout() // don't increase funds } // check our budget if budget.Cmp(renterFunds) < 0 { - log.Warnw("insufficient budget for refresh", "hk", hk, "fcid", fcid, "budget", budget, "needed", renterFunds) + logger.Warnw("insufficient budget for refresh", "hk", hk, "fcid", fcid, "budget", budget, "needed", renterFunds) return api.ContractMetadata{}, false, fmt.Errorf("insufficient budget: %s < %s", budget.String(), renterFunds.String()) } @@ -337,7 +331,7 @@ func (c *Contractor) refreshContract(ctx *mCtx, w Worker, contract api.Contract, resp, err := w.RHPRenew(ctx, contract.ID, contract.EndHeight(), hk, contract.SiamuxAddr, settings.Address, ctx.state.Address, renterFunds, minNewCollateral, maxFundAmount, expectedNewStorage, settings.WindowSize) if err != nil { if strings.Contains(err.Error(), "new collateral is too low") { - log.Infow("refresh failed: contract wouldn't have enough collateral after refresh", + logger.Infow("refresh failed: contract wouldn't have enough collateral after refresh", "hk", hk, "fcid", fcid, "unallocatedCollateral", unallocatedCollateral.String(), @@ -345,7 +339,7 @@ func (c *Contractor) refreshContract(ctx *mCtx, w Worker, contract api.Contract, ) return api.ContractMetadata{}, true, err } - log.Errorw("refresh failed", zap.Error(err), "hk", hk, "fcid", fcid) + logger.Errorw("refresh failed", zap.Error(err), "hk", hk, "fcid", fcid) if utils.IsErr(err, wallet.ErrNotEnoughFunds) && !worker.IsErrHost(err) { return api.ContractMetadata{}, false, err } @@ -358,13 +352,13 @@ func (c *Contractor) refreshContract(ctx *mCtx, w Worker, contract api.Contract, // persist the contract refreshedContract, err := c.bus.AddRenewedContract(ctx, resp.Contract, resp.ContractPrice, renterFunds, cs.BlockHeight, contract.ID, api.ContractStatePending) if err != nil { - log.Errorw("adding refreshed contract failed", zap.Error(err), "hk", hk, "fcid", fcid) + logger.Errorw("adding refreshed contract failed", zap.Error(err), "hk", hk, "fcid", fcid) return api.ContractMetadata{}, false, err } // add to renewed set newCollateral := resp.Contract.Revision.MissedHostPayout().Sub(resp.ContractPrice) - log.Infow("refresh succeeded", + logger.Infow("refresh succeeded", "fcid", refreshedContract.ID, "renewedFrom", contract.ID, "renterFunds", renterFunds.String(), @@ -374,11 +368,11 @@ func (c *Contractor) refreshContract(ctx *mCtx, w Worker, contract api.Contract, return refreshedContract, true, nil } -func (c *Contractor) renewContract(ctx *mCtx, w Worker, contract api.Contract, host api.Host, budget *types.Currency) (cm api.ContractMetadata, proceed bool, err error) { +func (c *Contractor) renewContract(ctx *mCtx, w Worker, contract api.Contract, host api.Host, budget *types.Currency, logger *zap.SugaredLogger) (cm api.ContractMetadata, proceed bool, err error) { if contract.Revision == nil { return api.ContractMetadata{}, true, errors.New("can't renew contract without a revision") } - log := c.logger.With("to_renew", contract.ID, "hk", contract.HostKey, "hostVersion", host.Settings.Version, "hostRelease", host.Settings.Release) + logger = logger.With("to_renew", contract.ID, "hk", contract.HostKey, "hostVersion", host.Settings.Version, "hostRelease", host.Settings.Release) // convenience variables settings := host.Settings @@ -396,18 +390,18 @@ func (c *Contractor) renewContract(ctx *mCtx, w Worker, contract api.Contract, h // calculate the renter funds for the renewal a.k.a. the funds the renter will // be able to spend minRenterFunds, _ := initialContractFundingMinMax(ctx.AutopilotConfig()) - renterFunds := renewFundingEstimate(minRenterFunds, contract.TotalCost, contract.RenterFunds(), log) + renterFunds := renewFundingEstimate(minRenterFunds, contract.TotalCost, contract.RenterFunds(), logger) // check our budget if budget.Cmp(renterFunds) < 0 { - log.Infow("insufficient budget", "budget", budget, "needed", renterFunds) + logger.Infow("insufficient budget", "budget", budget, "needed", renterFunds) return api.ContractMetadata{}, false, errors.New("insufficient budget") } // sanity check the endheight is not the same on renewals endHeight := ctx.EndHeight() if endHeight <= rev.EndHeight() { - log.Infow("invalid renewal endheight", "oldEndheight", rev.EndHeight(), "newEndHeight", endHeight, "period", ctx.state.Period, "bh", cs.BlockHeight) + logger.Infow("invalid renewal endheight", "oldEndheight", rev.EndHeight(), "newEndHeight", endHeight, "period", ctx.state.Period, "bh", cs.BlockHeight) return api.ContractMetadata{}, false, fmt.Errorf("renewal endheight should surpass the current contract endheight, %v <= %v", endHeight, rev.EndHeight()) } @@ -417,7 +411,7 @@ func (c *Contractor) renewContract(ctx *mCtx, w Worker, contract api.Contract, h // renew the contract resp, err := w.RHPRenew(ctx, fcid, endHeight, hk, contract.SiamuxAddr, settings.Address, ctx.state.Address, renterFunds, types.ZeroCurrency, *budget, expectedNewStorage, settings.WindowSize) if err != nil { - log.Errorw( + logger.Errorw( "renewal failed", zap.Error(err), "endHeight", endHeight, @@ -436,12 +430,12 @@ func (c *Contractor) renewContract(ctx *mCtx, w Worker, contract api.Contract, h // persist the contract renewedContract, err := c.bus.AddRenewedContract(ctx, resp.Contract, resp.ContractPrice, renterFunds, cs.BlockHeight, fcid, api.ContractStatePending) if err != nil { - log.Errorw(fmt.Sprintf("renewal failed to persist, err: %v", err)) + logger.Errorw(fmt.Sprintf("renewal failed to persist, err: %v", err)) return api.ContractMetadata{}, false, err } newCollateral := resp.Contract.Revision.MissedHostPayout().Sub(resp.ContractPrice) - log.Infow( + logger.Infow( "renewal succeeded", "fcid", renewedContract.ID, "renewedFrom", fcid, @@ -451,18 +445,18 @@ func (c *Contractor) renewContract(ctx *mCtx, w Worker, contract api.Contract, h return renewedContract, true, nil } -// runRevisionBroadcast broadcasts contract revisions from the current set of +// broadcastRevisions broadcasts contract revisions from the current set of // contracts. Since we are migrating away from all contracts not in the set and // are not uploading to those contracts anyway, we only worry about contracts in // the set. -func (c *Contractor) runRevisionBroadcast(ctx context.Context, w Worker, contracts []api.ContractMetadata) { +func (c *Contractor) broadcastRevisions(ctx context.Context, w Worker, contracts []api.ContractMetadata, logger *zap.SugaredLogger) { if c.revisionBroadcastInterval == 0 { return // not enabled } cs, err := c.bus.ConsensusState(ctx) if err != nil { - c.logger.Warnf("revision broadcast failed to fetch blockHeight: %v", err) + logger.Warnf("revision broadcast failed to fetch blockHeight: %v", err) return } bh := cs.BlockHeight @@ -486,7 +480,7 @@ func (c *Contractor) runRevisionBroadcast(ctx context.Context, w Worker, contrac if utils.IsErr(err, errors.New("transaction has a file contract with an outdated revision number")) { continue // don't log - revision was already broadcasted } else if err != nil { - c.logger.Warnw(fmt.Sprintf("failed to broadcast contract revision: %v", err), + logger.Warnw(fmt.Sprintf("failed to broadcast contract revision: %v", err), "hk", contract.HostKey, "fcid", contract.ID) failed++ @@ -495,7 +489,7 @@ func (c *Contractor) runRevisionBroadcast(ctx context.Context, w Worker, contrac } successful++ } - c.logger.Infow("revision broadcast completed", + logger.Infow("revision broadcast completed", "successful", successful, "failed", failed) @@ -511,7 +505,7 @@ func (c *Contractor) runRevisionBroadcast(ctx context.Context, w Worker, contrac } } -func (c *Contractor) refreshFundingEstimate(cfg api.AutopilotConfig, contract api.Contract, host api.Host, fee types.Currency) types.Currency { +func (c *Contractor) refreshFundingEstimate(cfg api.AutopilotConfig, contract api.Contract, host api.Host, fee types.Currency, logger *zap.SugaredLogger) types.Currency { // refresh with 1.2x the funds refreshAmount := contract.TotalCost.Mul64(6).Div64(5) @@ -526,7 +520,7 @@ func (c *Contractor) refreshFundingEstimate(cfg api.AutopilotConfig, contract ap if refreshAmountCapped.Cmp(minimum) < 0 { refreshAmountCapped = minimum } - c.logger.Infow("refresh estimate", + logger.Infow("refresh estimate", "fcid", contract.ID, "refreshAmount", refreshAmount, "refreshAmountCapped", refreshAmountCapped) @@ -895,19 +889,20 @@ func renterFundsToExpectedStorage(renterFunds types.Currency, duration uint64, p // keep contracts with and the 'dropOutReasons' map is updated with the reasons // for dropping out of the set. If a contract is refreshed or renewed, the // 'remainingFunds' are adjusted. -func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, cc contractChecker, cr contractReviser, ipFilter *ipFilter, dropOutReasons map[types.FileContractID]string, logger *zap.SugaredLogger, remainingFunds *types.Currency) ([]api.ContractMetadata, error) { +func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, cc contractChecker, cr contractReviser, ipFilter *hostSet, logger *zap.SugaredLogger, remainingFunds *types.Currency) ([]api.ContractMetadata, map[types.FileContractID]string, error) { var filteredContracts []api.ContractMetadata keepContract := func(c api.ContractMetadata, h api.Host) { filteredContracts = append(filteredContracts, c) ipFilter.Add(h) } + churnReasons := make(map[types.FileContractID]string) // fetch all contracts we already have logger.Info("fetching existing contracts") start := time.Now() resp, err := w.Contracts(ctx, timeoutHostRevision) if err != nil { - return nil, err + return nil, nil, err } contracts := resp.Contracts logger.With("elapsed", time.Since(start)).Info("done fetching existing contracts") @@ -933,7 +928,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, logger.With("contracts", len(contracts)).Info("checking existing contracts") var renewed, refreshed int for _, c := range contracts { - inSet := ctx.IsContractInSet(c) + inSet := c.InSet(ctx.Set()) logger := logger.With("contractID", c.ID). With("inSet", inSet). @@ -950,22 +945,15 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, // abort if we have enough contracts if uint64(len(filteredContracts)) >= ctx.WantedContracts() { - dropOutReasons[c.ID] = "truncated" + churnReasons[c.ID] = "truncated" logger.Debug("ignoring contract since we have enough contracts") continue } - // check for interruption - select { - case <-ctx.Done(): - return nil, context.Cause(ctx) - default: - } - // fetch recent consensus state cs, err := bus.ConsensusState(ctx) if err != nil { - return nil, fmt.Errorf("failed to fetch consensus state: %w", err) + return nil, nil, fmt.Errorf("failed to fetch consensus state: %w", err) } bh := cs.BlockHeight logger = logger.With("blockHeight", bh) @@ -979,7 +967,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, } else { logger.Debug("successfully archived contract") } - dropOutReasons[c.ID] = reason.Error() + churnReasons[c.ID] = reason.Error() continue } @@ -987,7 +975,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, host, err := bus.Host(ctx, c.HostKey) if err != nil { logger.With(zap.Error(err)).Warn("missing host") - dropOutReasons[c.ID] = api.ErrUsabilityHostNotFound.Error() + churnReasons[c.ID] = api.ErrUsabilityHostNotFound.Error() continue } @@ -998,14 +986,14 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, // check if host is blocked if host.Blocked { logger.Info("host is blocked") - dropOutReasons[c.ID] = api.ErrUsabilityHostBlocked.Error() + churnReasons[c.ID] = api.ErrUsabilityHostBlocked.Error() continue } // check if host has a redundant ip if ctx.ShouldFilterRedundantIPs() && ipFilter.HasRedundantIP(host) { logger.Info("host has redundant IP") - dropOutReasons[c.ID] = api.ErrUsabilityHostRedundantIP.Error() + churnReasons[c.ID] = api.ErrUsabilityHostRedundantIP.Error() continue } @@ -1013,7 +1001,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, check, ok := host.Checks[ctx.ApID()] if !ok { logger.Warn("missing host check") - dropOutReasons[c.ID] = api.ErrUsabilityHostNotFound.Error() + churnReasons[c.ID] = api.ErrUsabilityHostNotFound.Error() continue } @@ -1021,7 +1009,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, if !check.Usability.IsUsable() { reasons := strings.Join(check.Usability.UnusableReasons(), ",") logger.With("reasons", reasons).Info("unusable host") - dropOutReasons[c.ID] = reasons + churnReasons[c.ID] = reasons continue } @@ -1033,7 +1021,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, remainingLeeway-- } else { logger.Debug("ignoring contract without revision") - dropOutReasons[c.ID] = errContractNoRevision.Error() + churnReasons[c.ID] = errContractNoRevision.Error() } continue // no more checks without revision } @@ -1049,7 +1037,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, // remember reason for potential drop of contract if len(reasons) > 0 { - dropOutReasons[c.ID] = strings.Join(reasons, ",") + churnReasons[c.ID] = strings.Join(reasons, ",") } contract := c.ContractMetadata @@ -1058,7 +1046,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, var ourFault bool if needsRenew { var renewedContract api.ContractMetadata - renewedContract, ourFault, err = cr.renewContract(ctx, w, c, host, remainingFunds) + renewedContract, ourFault, err = cr.renewContract(ctx, w, c, host, remainingFunds, logger) if err != nil { logger = logger.With(zap.Error(err)).With("ourFault", ourFault) @@ -1077,7 +1065,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, } } else if needsRefresh { var refreshedContract api.ContractMetadata - refreshedContract, ourFault, err = cr.refreshContract(ctx, w, c, host, remainingFunds) + refreshedContract, ourFault, err = cr.refreshContract(ctx, w, c, host, remainingFunds, logger) if err != nil { logger = logger.With(zap.Error(err)).With("ourFault", ourFault) @@ -1121,12 +1109,12 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, With("renewed", renewed). With("filteredContracts", len(filteredContracts)). Info("checking existing contracts done") - return filteredContracts, nil + return filteredContracts, churnReasons, nil } // performContracdtFormations forms up to 'wanted' new contracts with hosts. The // 'ipFilter' and 'remainingFunds' are updated with every new contract. -func performContractFormations(ctx *mCtx, bus Bus, w Worker, cr contractReviser, ipFilter *ipFilter, logger *zap.SugaredLogger, remainingFunds *types.Currency, wanted int) ([]api.ContractMetadata, error) { +func performContractFormations(ctx *mCtx, bus Bus, w Worker, cr contractReviser, ipFilter *hostSet, logger *zap.SugaredLogger, remainingFunds *types.Currency, wanted int) ([]api.ContractMetadata, error) { var formedContracts []api.ContractMetadata addContract := func(c api.ContractMetadata, h api.Host) { formedContracts = append(formedContracts, c) @@ -1231,7 +1219,7 @@ func performContractFormations(ctx *mCtx, bus Bus, w Worker, cr contractReviser, continue } - formedContract, proceed, err := cr.formContract(ctx, w, candidate.host, minInitialContractFunds, maxInitialContractFunds, remainingFunds) + formedContract, proceed, err := cr.formContract(ctx, w, candidate.host, minInitialContractFunds, maxInitialContractFunds, remainingFunds, logger) if err != nil { logger.With(zap.Error(err)).Error("failed to form contract") continue @@ -1244,7 +1232,7 @@ func performContractFormations(ctx *mCtx, bus Bus, w Worker, cr contractReviser, // add new contract and host addContract(formedContract, candidate.host) } - logger.With("formedContractx", len(formedContracts)).Info("done forming contracts") + logger.With("formedContracts", len(formedContracts)).Info("done forming contracts") return formedContracts, nil } @@ -1260,10 +1248,6 @@ func performHostChecks(ctx *mCtx, bus Bus, logger *zap.SugaredLogger) error { var scoredHosts []scoredHost for _, host := range hosts { - // filter out hosts that have never been scanned - if !host.Scanned { - continue - } // score host sb, err := ctx.HostScore(host) if err != nil { @@ -1300,7 +1284,7 @@ func performHostChecks(ctx *mCtx, bus Bus, logger *zap.SugaredLogger) error { return nil } -func performPostMaintenanceTasks(ctx *mCtx, bus Bus, w Worker, alerter alerts.Alerter, cc contractChecker, rb revisionBroadcaster) error { +func performPostMaintenanceTasks(ctx *mCtx, bus Bus, w Worker, alerter alerts.Alerter, cc contractChecker, rb revisionBroadcaster, logger *zap.SugaredLogger) error { // fetch some contract and host info allContracts, err := bus.Contracts(ctx, api.ContractsOpts{}) if err != nil { @@ -1324,7 +1308,7 @@ func performPostMaintenanceTasks(ctx *mCtx, bus Bus, w Worker, alerter alerts.Al } // run revision broadcast on contracts in the new set - rb.runRevisionBroadcast(ctx, w, setContracts) + rb.broadcastRevisions(ctx, w, setContracts, logger) // register alerts for used hosts with lost sectors var toDismiss []types.Hash256 @@ -1346,7 +1330,7 @@ func performPostMaintenanceTasks(ctx *mCtx, bus Bus, w Worker, alerter alerts.Al return nil } -func performContractMaintenance(ctx *mCtx, alerter alerts.Alerter, bus Bus, churn *accumulatedChurn, w Worker, cc contractChecker, cr contractReviser, rb revisionBroadcaster, remaining types.Currency, logger *zap.SugaredLogger) (bool, error) { +func performContractMaintenance(ctx *mCtx, alerter alerts.Alerter, bus Bus, churn *accumulatedChurn, w Worker, cc contractChecker, cr contractReviser, rb revisionBroadcaster, logger *zap.SugaredLogger) (bool, error) { logger = logger.Named("performContractMaintenance"). Named(hex.EncodeToString(frand.Bytes(16))). // uuid for this iteration With("contractSet", ctx.ContractSet()) @@ -1356,38 +1340,31 @@ func performContractMaintenance(ctx *mCtx, alerter alerts.Alerter, bus Bus, chur logger.With("reason", reason).Info("skipping contract maintenance") return false, nil } + + // compute the remaining budget for this period + remaining, err := remainingAllowance(ctx, bus, ctx.state) + if err != nil { + return false, fmt.Errorf("failed to compute remaining allowance: %w", err) + } + logger = logger.With("remainingAllowance", remaining) + logger.Infow("performing contract maintenance") - // STEP 1: perform host maintenance + // STEP 1: perform host checks if err := performHostChecks(ctx, bus, logger); err != nil { return false, err } - // check for interruption - select { - case <-ctx.Done(): - return false, context.Cause(ctx) - default: - } - // STEP 2: perform contract maintenance - ipFilter := &ipFilter{ + ipFilter := &hostSet{ logger: logger.Named("ipFilter"), subnetToHostKey: make(map[string]string), } - dropOutReasons := make(map[types.FileContractID]string) - keptContracts, err := performContractChecks(ctx, alerter, bus, w, cc, cr, ipFilter, dropOutReasons, logger, &remaining) + keptContracts, churnReasons, err := performContractChecks(ctx, alerter, bus, w, cc, cr, ipFilter, logger, &remaining) if err != nil { return false, err } - // check for interruption - select { - case <-ctx.Done(): - return false, context.Cause(ctx) - default: - } - // STEP 3: perform contract formation formedContracts, err := performContractFormations(ctx, bus, w, cr, ipFilter, logger, &remaining, int(ctx.WantedContracts())-len(keptContracts)) if err != nil { @@ -1414,10 +1391,10 @@ func performContractMaintenance(ctx *mCtx, alerter alerts.Alerter, bus Bus, chur // STEP 5: perform minor maintenance such as cleanups and broadcasting // revisions - if err := performPostMaintenanceTasks(ctx, bus, w, alerter, cc, rb); err != nil { + if err := performPostMaintenanceTasks(ctx, bus, w, alerter, cc, rb, logger); err != nil { return false, err } // STEP 6: log changes and register alerts - return computeContractSetChanged(ctx, alerter, bus, churn, logger, oldSet, newSet, dropOutReasons) + return computeContractSetChanged(ctx, alerter, bus, churn, logger, oldSet, newSet, churnReasons) } diff --git a/autopilot/contractor/hostfilter.go b/autopilot/contractor/hostfilter.go index c68f91686..0b358d60f 100644 --- a/autopilot/contractor/hostfilter.go +++ b/autopilot/contractor/hostfilter.go @@ -102,7 +102,7 @@ func (u *unusableHostsBreakdown) keysAndValues() []interface{} { // - recoverable -> can be usable in the contract set if it is refreshed/renewed // - refresh -> should be refreshed // - renew -> should be renewed -func (c *Contractor) isUsableContract(cfg api.AutopilotConfig, s rhpv2.HostSettings, pt rhpv3.HostPriceTable, rs api.RedundancySettings, contract api.Contract, inSet bool, bh uint64, f *ipFilter) (usable, refresh, renew bool, reasons []string) { +func (c *Contractor) isUsableContract(cfg api.AutopilotConfig, s rhpv2.HostSettings, pt rhpv3.HostPriceTable, rs api.RedundancySettings, contract api.Contract, inSet bool, bh uint64, f *hostSet) (usable, refresh, renew bool, reasons []string) { usable = true if bh > contract.EndHeight() { reasons = append(reasons, errContractExpired.Error()) diff --git a/autopilot/contractor/ipfilter.go b/autopilot/contractor/hostset.go similarity index 59% rename from autopilot/contractor/ipfilter.go rename to autopilot/contractor/hostset.go index 6ae60124e..bc3932767 100644 --- a/autopilot/contractor/ipfilter.go +++ b/autopilot/contractor/hostset.go @@ -12,27 +12,27 @@ var ( ) type ( - ipFilter struct { + hostSet struct { subnetToHostKey map[string]string logger *zap.SugaredLogger } ) -func (f *ipFilter) HasRedundantIP(host api.Host) bool { +func (hs *hostSet) HasRedundantIP(host api.Host) bool { // validate host subnets if len(host.Subnets) == 0 { - f.logger.Errorf("host %v has no subnet, treating its IP %v as redundant", host.PublicKey, host.NetAddress) + hs.logger.Errorf("host %v has no subnet, treating its IP %v as redundant", host.PublicKey, host.NetAddress) return true } else if len(host.Subnets) > 2 { - f.logger.Errorf("host %v has more than 2 subnets, treating its IP %v as redundant", host.PublicKey, errHostTooManySubnets) + hs.logger.Errorf("host %v has more than 2 subnets, treating its IP %v as redundant", host.PublicKey, errHostTooManySubnets) return true } // check if we know about this subnet var knownHost string for _, subnet := range host.Subnets { - if knownHost = f.subnetToHostKey[subnet]; knownHost != "" { + if knownHost = hs.subnetToHostKey[subnet]; knownHost != "" { break } } @@ -44,8 +44,8 @@ func (f *ipFilter) HasRedundantIP(host api.Host) bool { return false } -func (f *ipFilter) Add(host api.Host) { +func (hs *hostSet) Add(host api.Host) { for _, subnet := range host.Subnets { - f.subnetToHostKey[subnet] = host.PublicKey.String() + hs.subnetToHostKey[subnet] = host.PublicKey.String() } } diff --git a/autopilot/contractor/state.go b/autopilot/contractor/state.go index be80727eb..1a03697e7 100644 --- a/autopilot/contractor/state.go +++ b/autopilot/contractor/state.go @@ -107,8 +107,8 @@ func (ctx *mCtx) WantedContracts() uint64 { return ctx.state.AP.Config.Contracts.Amount } -func (ctx *mCtx) IsContractInSet(contract api.Contract) bool { - return ctx.state.ContractsConfig().IsContractInSet(contract) +func (ctx *mCtx) Set() string { + return ctx.state.ContractsConfig().Set } func (ctx *mCtx) SortContractsForMaintenance(contracts []api.Contract) {