Skip to content

Commit

Permalink
Merge pull request #692 from SiaFoundation/pj/ftue
Browse files Browse the repository at this point in the history
Initial Host Scan
  • Loading branch information
ChrisSchinnerl authored Nov 7, 2023
2 parents 281806e + 88ec1f5 commit 7a2b452
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 88 deletions.
2 changes: 2 additions & 0 deletions autopilot/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func (a *accounts) refillWorkerAccounts(w Worker) {
span.SetStatus(codes.Error, "failed to fetch contracts")
a.l.Errorw(fmt.Sprintf("failed to fetch contracts for refill: %v", err))
return
} else if len(contracts) == 0 {
return
}

// fetch all contract set contracts
Expand Down
21 changes: 14 additions & 7 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newAccountLowBalanceAlert(address types.Address, balance, allowance types.C

func newAccountRefillAlert(id rhpv3.Account, contract api.ContractMetadata, err refillError) alerts.Alert {
data := map[string]interface{}{
"error": err,
"error": err.Error(),
"accountID": id.String(),
"contractID": contract.ID.String(),
"hostKey": contract.HostKey.String(),
Expand Down Expand Up @@ -101,7 +101,7 @@ func newContractRenewalFailedAlert(contract api.ContractMetadata, interrupted bo
Severity: severity,
Message: "Contract renewal failed",
Data: map[string]interface{}{
"error": err,
"error": err.Error(),
"renewalsInterrupted": interrupted,
"contractID": contract.ID.String(),
"hostKey": contract.HostKey.String(),
Expand All @@ -111,6 +111,11 @@ func newContractRenewalFailedAlert(contract api.ContractMetadata, interrupted bo
}

func newContractSetChangeAlert(name string, added, removed int, removedReasons map[string]string) alerts.Alert {
var hint string
if removed > 0 {
hint = "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set."
}

return alerts.Alert{
ID: randomAlertID(),
Severity: alerts.SeverityInfo,
Expand All @@ -120,7 +125,7 @@ func newContractSetChangeAlert(name string, added, removed int, removedReasons m
"added": added,
"removed": removed,
"removals": removedReasons,
"hint": "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set.",
"hint": hint,
},
Timestamp: time.Now(),
}
Expand All @@ -136,17 +141,19 @@ func newOngoingMigrationsAlert(n int) alerts.Alert {
}

func newSlabMigrationFailedAlert(slab object.Slab, health float64, err error) alerts.Alert {
severity := alerts.SeverityWarning
if health < 0.5 {
severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
} else if health < 0.5 {
severity = alerts.SeverityWarning
}

return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slab),
Severity: severity,
Message: "Slab migration failed",
Data: map[string]interface{}{
"error": err,
"error": err.Error(),
"health": health,
"slabKey": slab.Key.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
Expand All @@ -162,7 +169,7 @@ func newRefreshHealthFailedAlert(err error) alerts.Alert {
Message: "Health refresh failed",
Data: map[string]interface{}{
"migrationsInterrupted": true,
"error": err,
"error": err.Error(),
},
Timestamp: time.Now(),
}
Expand Down
75 changes: 63 additions & 12 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type Bus interface {
// consensus
ConsensusState(ctx context.Context) (api.ConsensusState, error)

// syncer
SyncerPeers(ctx context.Context) (resp []string, err error)

// objects
ObjectsBySlabKey(ctx context.Context, bucket string, key object.EncryptionKey) (objects []api.ObjectMetadata, err error)
RefreshHealth(ctx context.Context) error
Expand Down Expand Up @@ -185,13 +188,19 @@ func (ap *Autopilot) Run() error {
}
ap.startTime = time.Now()
ap.stopChan = make(chan struct{})
ap.triggerChan = make(chan bool)
ap.triggerChan = make(chan bool, 1)
ap.ticker = time.NewTicker(ap.tickerDuration)

ap.wg.Add(1)
defer ap.wg.Done()
ap.startStopMu.Unlock()

// block until the autopilot is online
if online := ap.blockUntilOnline(); !online {
ap.logger.Error("autopilot stopped before it was able to come online")
return nil
}

var forceScan bool
var launchAccountRefillsOnce sync.Once
for {
Expand All @@ -209,23 +218,27 @@ func (ap *Autopilot) Run() error {
// reset forceScan
forceScan = false

// block until the autopilot is configured
if configured, interrupted := ap.blockUntilConfigured(ap.ticker.C); !configured {
// block until consensus is synced
if synced, blocked, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced {
if interrupted {
close(tickerFired)
return
}
ap.logger.Error("autopilot stopped before it was able to confirm it was configured in the bus")
ap.logger.Error("autopilot stopped before consensus was synced")
return
} else if blocked {
if scanning, _ := ap.s.Status(); !scanning {
ap.s.tryPerformHostScan(ctx, w, true)
}
}

// block until consensus is synced
if synced, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced {
// block until the autopilot is configured
if configured, interrupted := ap.blockUntilConfigured(ap.ticker.C); !configured {
if interrupted {
close(tickerFired)
return
}
ap.logger.Error("autopilot stopped before consensus was synced")
ap.logger.Error("autopilot stopped before it was able to confirm it was configured in the bus")
return
}

Expand Down Expand Up @@ -375,31 +388,69 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configure
}
}

func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, interrupted bool) {
func (ap *Autopilot) blockUntilOnline() (online bool) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

var once sync.Once

for {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
peers, err := ap.bus.SyncerPeers(ctx)
online = len(peers) > 0
cancel()

if err != nil {
ap.logger.Errorf("failed to get peers, err: %v", err)
} else if !online {
once.Do(func() { ap.logger.Info("autopilot is waiting to come online...") })
}

if err != nil || !online {
select {
case <-ap.stopChan:
return
case <-ticker.C:
continue
}
}
return
}
}

func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, blocked, interrupted bool) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

var once sync.Once

for {
// try and fetch consensus
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
cs, err := ap.bus.ConsensusState(ctx)
synced = cs.Synced
cancel()

// if an error occurred, or if we're not synced, we continue
if err != nil {
ap.logger.Errorf("failed to get consensus state, err: %v", err)
} else if !synced {
once.Do(func() { ap.logger.Info("autopilot is waiting for consensus to sync...") })
}
if err != nil || !cs.Synced {

if err != nil || !synced {
blocked = true
select {
case <-ap.stopChan:
return false, false
return
case <-interrupt:
return false, true
interrupted = true
return
case <-ticker.C:
continue
}
}
return true, false
return
}
}

Expand Down
19 changes: 14 additions & 5 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,22 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) (
// check if we need to form contracts and add them to the contract set
var formed []types.FileContractID
if uint64(len(updatedSet)) < threshold {
formed, err = c.runContractFormations(ctx, w, candidates, usedHosts, unusableHosts, state.cfg.Contracts.Amount-uint64(len(updatedSet)), &remaining)
// no need to try and form contracts if wallet is completely empty
wallet, err := c.ap.bus.Wallet(ctx)
if err != nil {
c.logger.Errorf("failed to form contracts, err: %v", err) // continue
c.logger.Errorf("failed to fetch wallet, err: %v", err)
return false, err
} else if wallet.Confirmed.IsZero() {
c.logger.Warn("contract formations skipped, wallet is empty")
} else {
for _, fc := range formed {
updatedSet = append(updatedSet, fc)
contractData[fc] = 0
formed, err = c.runContractFormations(ctx, w, candidates, usedHosts, unusableHosts, state.cfg.Contracts.Amount-uint64(len(updatedSet)), &remaining)
if err != nil {
c.logger.Errorf("failed to form contracts, err: %v", err) // continue
} else {
for _, fc := range formed {
updatedSet = append(updatedSet, fc)
contractData[fc] = 0
}
}
}
}
Expand Down
46 changes: 41 additions & 5 deletions autopilot/hostinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/worker"
)

Expand Down Expand Up @@ -66,16 +67,51 @@ func (c *contractor) HostInfo(ctx context.Context, hostKey types.PublicKey) (api
}, nil
}

func (c *contractor) hostInfoFromCache(ctx context.Context, host hostdb.Host) (hi hostInfo, found bool) {
// grab host details from cache
c.mu.Lock()
hi, found = c.cachedHostInfo[host.PublicKey]
storedData := c.cachedDataStored[host.PublicKey]
minScore := c.cachedMinScore
c.mu.Unlock()

// return early if the host info is not cached
if !found {
return
}

// try and refresh the host info if it got scanned in the meantime, this
// inconsistency would resolve itself but trying to update it here improves
// first time user experience
if host.Scanned && hi.UnusableResult.notcompletingscan > 0 {
cs, err := c.ap.bus.ConsensusState(ctx)
if err != nil {
c.logger.Error("failed to fetch consensus state from bus: %v", err)
} else {
state := c.ap.State()
gc := worker.NewGougingChecker(state.gs, cs, state.fee, state.cfg.Contracts.Period, state.cfg.Contracts.RenewWindow)
isUsable, unusableResult := isUsableHost(state.cfg, state.rs, gc, host, minScore, storedData)
hi = hostInfo{
Usable: isUsable,
UnusableResult: unusableResult,
}

// update cache
c.mu.Lock()
c.cachedHostInfo[host.PublicKey] = hi
c.mu.Unlock()
}
}

return
}

func (c *contractor) HostInfos(ctx context.Context, filterMode, usabilityMode, addressContains string, keyIn []types.PublicKey, offset, limit int) ([]api.HostHandlerResponse, error) {
// declare helper to decide whether to keep a host.
if !isValidUsabilityFilterMode(usabilityMode) {
return nil, fmt.Errorf("invalid usability mode: '%v', options are 'usable', 'unusable' or an empty string for no filter", usabilityMode)
}

c.mu.Lock()
hostInfo := c.cachedHostInfo
c.mu.Unlock()

keep := func(usable bool) bool {
switch usabilityMode {
case api.UsabilityFilterModeUsable:
Expand Down Expand Up @@ -115,7 +151,7 @@ func (c *contractor) HostInfos(ctx context.Context, filterMode, usabilityMode, a
// decide how many of the returned hosts to keep.
var keptHosts int
for _, host := range hosts {
hi, cached := hostInfo[host.PublicKey]
hi, cached := c.hostInfoFromCache(ctx, host)
if !cached {
// when the filterMode is "all" we include uncached hosts and
// set IsChecked = false.
Expand Down
Loading

0 comments on commit 7a2b452

Please sign in to comment.