Skip to content

Commit

Permalink
Contractor Refactor (#656)
Browse files Browse the repository at this point in the history
* contractor: fix logging

* contractor: fix nil map and copy

* contractor: add comment

* contractor: have candidateHosts adjust the lowest score if there are no candidates

* contractor: refactor it more thoroughly

* contractor: pass used hosts

* autopilot: implement Error for refillError

* autopilot: implement MR remarks

* testing: cleanup PR

* contractor: avoid parsing

---------

Co-authored-by: Chris Schinnerl <[email protected]>
  • Loading branch information
peterjan and ChrisSchinnerl authored Oct 16, 2023
1 parent 20c3d81 commit c92e343
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 141 deletions.
7 changes: 7 additions & 0 deletions autopilot/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ type refillError struct {
keysAndValues []interface{}
}

func (err *refillError) Error() string {
if err.err == nil {
return ""
}
return err.err.Error()
}

func (err *refillError) Is(target error) bool {
return errors.Is(err.err, target)
}
Expand Down
172 changes: 79 additions & 93 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ type (
UnusableResult unusableHostResult
}

scoredHost struct {
host hostdb.Host
score float64
}

contractInfo struct {
contract api.Contract
settings rhpv2.HostSettings
Expand Down Expand Up @@ -221,10 +226,16 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) (
return false, err
}

// min score to pass checks.
// fetch candidate hosts
candidates, unusableHosts, err := c.candidateHosts(ctx, hosts, usedHosts, hostData, math.SmallestNonzeroFloat64) // avoid 0 score hosts
if err != nil {
return false, err
}

// min score to pass checks
var minScore float64
if len(hosts) > 0 {
minScore, err = c.managedFindMinAllowedHostScores(ctx, w, hosts, hostData, state.cfg.Contracts.Amount)
minScore, err = c.calculateMinScore(ctx, candidates, state.cfg.Contracts.Amount)
if err != nil {
return false, fmt.Errorf("failed to determine min score for contract check: %w", err)
}
Expand Down Expand Up @@ -343,7 +354,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) (
// check if we need to form contracts and add them to the contract set
var formed []types.FileContractID
if uint64(len(updatedSet)) < threshold {
formed, err = c.runContractFormations(ctx, w, hosts, usedHosts, state.cfg.Contracts.Amount-uint64(len(updatedSet)), &remaining, minScore)
formed, err = c.runContractFormations(ctx, w, candidates, usedHosts, unusableHosts, state.cfg.Contracts.Amount-uint64(len(updatedSet)), &remaining)
if err != nil {
c.logger.Errorf("failed to form contracts, err: %v", err) // continue
} else {
Expand Down Expand Up @@ -770,7 +781,7 @@ func (c *contractor) runContractChecks(ctx context.Context, w Worker, contracts
return toKeep, toArchive, toStopUsing, toRefresh, toRenew, nil
}

func (c *contractor) runContractFormations(ctx context.Context, w Worker, hosts []hostdb.Host, usedHosts map[types.PublicKey]struct{}, missing uint64, budget *types.Currency, minScore float64) ([]types.FileContractID, error) {
func (c *contractor) runContractFormations(ctx context.Context, w Worker, candidates scoredHosts, usedHosts map[types.PublicKey]struct{}, unusableHosts unusableHostResult, missing uint64, budget *types.Currency) ([]types.FileContractID, error) {
ctx, span := tracing.Tracer.Start(ctx, "runContractFormations")
defer span.End()

Expand Down Expand Up @@ -798,11 +809,22 @@ func (c *contractor) runContractFormations(ctx context.Context, w Worker, hosts
)
}()

// fetch candidate hosts
// select candidates
wanted := int(addLeeway(missing, leewayPctCandidateHosts))
candidates, _, err := c.candidateHosts(ctx, w, hosts, usedHosts, make(map[types.PublicKey]uint64), wanted, minScore)
if err != nil {
return nil, err
selected := candidates.randSelectByScore(wanted)

// print warning if we couldn't find enough hosts were found
c.logger.Debugf("looking for %d candidate hosts", wanted)
if len(selected) < wanted {
msg := "no candidate hosts found"
if len(selected) > 0 {
msg = fmt.Sprintf("only found %d candidate host(s) out of the %d we wanted", len(selected), wanted)
}
if len(candidates) >= wanted {
c.logger.Warnw(msg, unusableHosts.keysAndValues()...)
} else {
c.logger.Debugw(msg, unusableHosts.keysAndValues()...)
}
}

// fetch consensus state
Expand All @@ -818,18 +840,18 @@ func (c *contractor) runContractFormations(ctx context.Context, w Worker, hosts
// prepare an IP filter that contains all used hosts
ipFilter := c.newIPFilter()
if shouldFilter {
for _, h := range hosts {
if _, used := usedHosts[h.PublicKey]; used {
_ = ipFilter.IsRedundantIP(h.NetAddress, h.PublicKey)
for _, h := range candidates {
if _, used := usedHosts[h.host.PublicKey]; used {
_ = ipFilter.IsRedundantIP(h.host.NetAddress, h.host.PublicKey)
}
}
}

// calculate min/max contract funds
minInitialContractFunds, maxInitialContractFunds := initialContractFundingMinMax(state.cfg)

for h := 0; missing > 0 && h < len(candidates); h++ {
host := candidates[h]
for h := 0; missing > 0 && h < len(selected); h++ {
host := selected[h].host

// break if the autopilot is stopped
if c.ap.isStopped() {
Expand Down Expand Up @@ -1179,65 +1201,53 @@ func (c *contractor) renewFundingEstimate(ctx context.Context, ci contractInfo,
return cappedEstimatedCost, nil
}

func (c *contractor) managedFindMinAllowedHostScores(ctx context.Context, w Worker, hosts []hostdb.Host, storedData map[types.PublicKey]uint64, numContracts uint64) (float64, error) {
// Pull a new set of hosts from the hostdb that could be used as a new set
// to match the allowance. The lowest scoring host of these new hosts will
// be used as a baseline for determining whether our existing contracts are
// worthwhile.
var lowestScores []float64
buffer := 50
for i := 0; i < 5; i++ {
candidates, scores, err := c.candidateHosts(ctx, w, hosts, make(map[types.PublicKey]struct{}), storedData, int(numContracts)+int(buffer), math.SmallestNonzeroFloat64) // avoid 0 score hosts
if err != nil {
return 0, err
}
if len(candidates) == 0 {
c.logger.Warn("min host score is set to the smallest non-zero float because there are no candidate hosts")
return math.SmallestNonzeroFloat64, nil
}
func (c *contractor) calculateMinScore(ctx context.Context, candidates []scoredHost, numContracts uint64) (float64, error) {
// return early if there's no hosts
if len(candidates) == 0 {
c.logger.Warn("min host score is set to the smallest non-zero float because there are no candidate hosts")
return math.SmallestNonzeroFloat64, nil
}

// Find the minimum score that a host is allowed to have to be considered
// good for upload.
// do multiple rounds to select the lowest score
var lowestScores []float64
for r := 0; r < 5; r++ {
lowestScore := math.MaxFloat64
for _, score := range scores {
if score < lowestScore {
lowestScore = score
for _, host := range scoredHosts(candidates).randSelectByScore(int(numContracts) + 50) { // buffer
if host.score < lowestScore {
lowestScore = host.score
}
}
lowestScores = append(lowestScores, lowestScore)
}

// compute the min score
lowestScore, err := stats.Float64Data(lowestScores).Median()
if err != nil {
return 0, err
}
minScore := lowestScore / minAllowedScoreLeeway

c.logger.Infow("finished computing minScore",
"minScore", minScore,
"lowestScore", lowestScore)
return minScore, nil
}

func (c *contractor) candidateHosts(ctx context.Context, w Worker, hosts []hostdb.Host, usedHosts map[types.PublicKey]struct{}, storedData map[types.PublicKey]uint64, wanted int, minScore float64) ([]hostdb.Host, []float64, error) {
c.logger.Debugf("looking for %d candidate hosts", wanted)

// nothing to do
if wanted == 0 {
return nil, nil, nil
}

state := c.ap.State()
func (c *contractor) candidateHosts(ctx context.Context, hosts []hostdb.Host, usedHosts map[types.PublicKey]struct{}, storedData map[types.PublicKey]uint64, minScore float64) ([]scoredHost, unusableHostResult, error) {
start := time.Now()

// fetch consensus state
cs, err := c.ap.bus.ConsensusState(ctx)
if err != nil {
return nil, nil, err
return nil, unusableHostResult{}, err
}

// create a gouging checker
state := c.ap.State()
gc := worker.NewGougingChecker(state.gs, cs, state.fee, state.cfg.Contracts.Period, state.cfg.Contracts.RenewWindow)

// create list of candidate hosts
var candidates []hostdb.Host
// select unused hosts that passed a scan
var unused []hostdb.Host
var excluded, notcompletedscan int
for _, h := range hosts {
// filter out used hosts
Expand All @@ -1250,20 +1260,19 @@ func (c *contractor) candidateHosts(ctx context.Context, w Worker, hosts []hostd
notcompletedscan++
continue
}
candidates = append(candidates, h)
unused = append(unused, h)
}

c.logger.Debugw(fmt.Sprintf("selected %d candidate hosts out of %d", len(candidates), len(hosts)),
c.logger.Debugw(fmt.Sprintf("selected %d (potentially) usable hosts for scoring out of %d", len(unused), len(hosts)),
"excluded", excluded,
"notcompletedscan", notcompletedscan)
"notcompletedscan", notcompletedscan,
"used", len(usedHosts))

// score all candidate hosts
start := time.Now()
var results unusableHostResult
scores := make([]float64, 0, len(candidates))
scored := make([]hostdb.Host, 0, len(candidates))
// score all unused hosts
var unusableHostResult unusableHostResult
var unusable, zeros int
for _, h := range candidates {
var candidates []scoredHost
for _, h := range unused {
// NOTE: use the price table stored on the host for gouging checks when
// looking for candidate hosts, fetching the price table on the fly here
// slows contract maintenance down way too much, we re-evaluate the host
Expand All @@ -1273,49 +1282,26 @@ func (c *contractor) candidateHosts(ctx context.Context, w Worker, hosts []hostd
// NOTE: ignore the pricetable's HostBlockHeight by setting it to our
// own blockheight
h.PriceTable.HostBlockHeight = cs.BlockHeight
if usable, result := isUsableHost(state.cfg, state.rs, gc, h, minScore, storedData[h.PublicKey]); usable {
scored = append(scored, h)
scores = append(scores, result.scoreBreakdown.Score())
} else {
results.merge(result)
if result.scoreBreakdown.Score() == 0 {
zeros++
}
unusable++
usable, result := isUsableHost(state.cfg, state.rs, gc, h, minScore, storedData[h.PublicKey])
if usable {
candidates = append(candidates, scoredHost{h, result.scoreBreakdown.Score()})
continue
}
}

c.logger.Debugw(fmt.Sprintf("scored %d candidate hosts out of %v, took %v", len(scored), len(candidates), time.Since(start)),
"zeroscore", zeros,
"unusable", unusable)

// select hosts
var selectedHosts []hostdb.Host
var selectedScores []float64
for len(selectedHosts) < wanted && len(scored) > 0 {
i := randSelectByWeight(scores)
selectedHosts = append(selectedHosts, scored[i])
selectedScores = append(selectedScores, scores[i])

// remove selected host
scored[i], scored = scored[len(scored)-1], scored[:len(scored)-1]
scores[i], scores = scores[len(scores)-1], scores[:len(scores)-1]
}

// print warning if no candidate hosts were found
if len(selectedHosts) < wanted {
msg := "no candidate hosts found"
if len(selectedHosts) > 0 {
msg = fmt.Sprintf("only found %d candidate host(s) out of the %d we wanted", len(selectedHosts), wanted)
}
if len(candidates) >= wanted {
c.logger.Warnw(msg, results.keysAndValues()...)
} else {
c.logger.Debugw(msg, results.keysAndValues()...)
// keep track of unusable host results
unusableHostResult.merge(result)
if result.scoreBreakdown.Score() == 0 {
zeros++
}
unusable++
}

return selectedHosts, selectedScores, nil
c.logger.Debugw(fmt.Sprintf("scored %d unused hosts out of %v, took %v", len(candidates), len(unused), time.Since(start)),
"zeroscore", zeros,
"unusable", unusable,
"used", len(usedHosts))

return candidates, unusableHostResult, nil
}

func (c *contractor) renewContract(ctx context.Context, w Worker, ci contractInfo, budget *types.Currency) (cm api.ContractMetadata, proceed bool, err error) {
Expand Down
57 changes: 57 additions & 0 deletions autopilot/hosts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package autopilot

import "lukechampine.com/frand"

type (
scoredHosts []scoredHost
)

func (hosts scoredHosts) randSelectByScore(n int) (selected []scoredHost) {
if len(hosts) < n {
n = len(hosts)
} else if n < 0 {
return nil
}

used := make(map[int]struct{})
for len(selected) < n {
// map indices properly
imap := make(map[int]int)

// deep copy
var candidates []scoredHost
for i, h := range hosts {
if _, used := used[i]; !used {
candidates = append(candidates, h)
imap[len(candidates)-1] = i
}
}

// normalize
var total float64
for _, h := range candidates {
total += h.score
}
for i := range candidates {
candidates[i].score = candidates[i].score / total
}

// select
sI := len(candidates) - 1
r := frand.Float64()
var sum float64
for i, host := range candidates {
sum += host.score
if r < sum {
sI = i
break
}
}

// update
used[imap[sI]] = struct{}{}
selected = append(selected, hosts[imap[sI]])
}

return
}
Loading

0 comments on commit c92e343

Please sign in to comment.