diff --git a/autopilot/accounts.go b/autopilot/accounts.go index 573e75fdf..457ea7881 100644 --- a/autopilot/accounts.go +++ b/autopilot/accounts.go @@ -128,6 +128,8 @@ func (a *accounts) refillWorkerAccounts(w Worker) { span.SetStatus(codes.Error, "failed to fetch contracts") a.l.Errorw(fmt.Sprintf("failed to fetch contracts for refill: %v", err)) return + } else if len(contracts) == 0 { + return } // fetch all contract set contracts diff --git a/autopilot/alerts.go b/autopilot/alerts.go index 8561b7b30..61e89a557 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -72,7 +72,7 @@ func newAccountLowBalanceAlert(address types.Address, balance, allowance types.C func newAccountRefillAlert(id rhpv3.Account, contract api.ContractMetadata, err refillError) alerts.Alert { data := map[string]interface{}{ - "error": err, + "error": err.Error(), "accountID": id.String(), "contractID": contract.ID.String(), "hostKey": contract.HostKey.String(), @@ -101,7 +101,7 @@ func newContractRenewalFailedAlert(contract api.ContractMetadata, interrupted bo Severity: severity, Message: "Contract renewal failed", Data: map[string]interface{}{ - "error": err, + "error": err.Error(), "renewalsInterrupted": interrupted, "contractID": contract.ID.String(), "hostKey": contract.HostKey.String(), @@ -111,6 +111,11 @@ func newContractRenewalFailedAlert(contract api.ContractMetadata, interrupted bo } func newContractSetChangeAlert(name string, added, removed int, removedReasons map[string]string) alerts.Alert { + var hint string + if removed > 0 { + hint = "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set." + } + return alerts.Alert{ ID: randomAlertID(), Severity: alerts.SeverityInfo, @@ -120,7 +125,7 @@ func newContractSetChangeAlert(name string, added, removed int, removedReasons m "added": added, "removed": removed, "removals": removedReasons, - "hint": "A high churn rate can lead to a lot of unnecessary migrations, it might be necessary to tweak your configuration depending on the reason hosts are being discarded from the set.", + "hint": hint, }, Timestamp: time.Now(), } @@ -136,9 +141,11 @@ func newOngoingMigrationsAlert(n int) alerts.Alert { } func newSlabMigrationFailedAlert(slab object.Slab, health float64, err error) alerts.Alert { - severity := alerts.SeverityWarning - if health < 0.5 { + severity := alerts.SeverityError + if health < 0.25 { severity = alerts.SeverityCritical + } else if health < 0.5 { + severity = alerts.SeverityWarning } return alerts.Alert{ @@ -146,7 +153,7 @@ func newSlabMigrationFailedAlert(slab object.Slab, health float64, err error) al Severity: severity, Message: "Slab migration failed", Data: map[string]interface{}{ - "error": err, + "error": err.Error(), "health": health, "slabKey": slab.Key.String(), "hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.", @@ -162,7 +169,7 @@ func newRefreshHealthFailedAlert(err error) alerts.Alert { Message: "Health refresh failed", Data: map[string]interface{}{ "migrationsInterrupted": true, - "error": err, + "error": err.Error(), }, Timestamp: time.Now(), } diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index bf8791471..b522c3acd 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -70,6 +70,9 @@ type Bus interface { // consensus ConsensusState(ctx context.Context) (api.ConsensusState, error) + // syncer + SyncerPeers(ctx context.Context) (resp []string, err error) + // objects ObjectsBySlabKey(ctx context.Context, bucket string, key object.EncryptionKey) (objects []api.ObjectMetadata, err error) RefreshHealth(ctx context.Context) error @@ -185,13 +188,19 @@ func (ap *Autopilot) Run() error { } ap.startTime = time.Now() ap.stopChan = make(chan struct{}) - ap.triggerChan = make(chan bool) + ap.triggerChan = make(chan bool, 1) ap.ticker = time.NewTicker(ap.tickerDuration) ap.wg.Add(1) defer ap.wg.Done() ap.startStopMu.Unlock() + // block until the autopilot is online + if online := ap.blockUntilOnline(); !online { + ap.logger.Error("autopilot stopped before it was able to come online") + return nil + } + var forceScan bool var launchAccountRefillsOnce sync.Once for { @@ -209,23 +218,27 @@ func (ap *Autopilot) Run() error { // reset forceScan forceScan = false - // block until the autopilot is configured - if configured, interrupted := ap.blockUntilConfigured(ap.ticker.C); !configured { + // block until consensus is synced + if synced, blocked, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced { if interrupted { close(tickerFired) return } - ap.logger.Error("autopilot stopped before it was able to confirm it was configured in the bus") + ap.logger.Error("autopilot stopped before consensus was synced") return + } else if blocked { + if scanning, _ := ap.s.Status(); !scanning { + ap.s.tryPerformHostScan(ctx, w, true) + } } - // block until consensus is synced - if synced, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced { + // block until the autopilot is configured + if configured, interrupted := ap.blockUntilConfigured(ap.ticker.C); !configured { if interrupted { close(tickerFired) return } - ap.logger.Error("autopilot stopped before consensus was synced") + ap.logger.Error("autopilot stopped before it was able to confirm it was configured in the bus") return } @@ -375,31 +388,69 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configure } } -func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, interrupted bool) { +func (ap *Autopilot) blockUntilOnline() (online bool) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + var once sync.Once + + for { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + peers, err := ap.bus.SyncerPeers(ctx) + online = len(peers) > 0 + cancel() + + if err != nil { + ap.logger.Errorf("failed to get peers, err: %v", err) + } else if !online { + once.Do(func() { ap.logger.Info("autopilot is waiting to come online...") }) + } + + if err != nil || !online { + select { + case <-ap.stopChan: + return + case <-ticker.C: + continue + } + } + return + } +} + +func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, blocked, interrupted bool) { ticker := time.NewTicker(time.Second) defer ticker.Stop() + var once sync.Once + for { // try and fetch consensus ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) cs, err := ap.bus.ConsensusState(ctx) + synced = cs.Synced cancel() // if an error occurred, or if we're not synced, we continue if err != nil { ap.logger.Errorf("failed to get consensus state, err: %v", err) + } else if !synced { + once.Do(func() { ap.logger.Info("autopilot is waiting for consensus to sync...") }) } - if err != nil || !cs.Synced { + + if err != nil || !synced { + blocked = true select { case <-ap.stopChan: - return false, false + return case <-interrupt: - return false, true + interrupted = true + return case <-ticker.C: continue } } - return true, false + return } } diff --git a/autopilot/contractor.go b/autopilot/contractor.go index d30addadb..c1ec2e2c8 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -347,13 +347,22 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) ( // check if we need to form contracts and add them to the contract set var formed []types.FileContractID if uint64(len(updatedSet)) < threshold { - formed, err = c.runContractFormations(ctx, w, candidates, usedHosts, unusableHosts, state.cfg.Contracts.Amount-uint64(len(updatedSet)), &remaining) + // no need to try and form contracts if wallet is completely empty + wallet, err := c.ap.bus.Wallet(ctx) if err != nil { - c.logger.Errorf("failed to form contracts, err: %v", err) // continue + c.logger.Errorf("failed to fetch wallet, err: %v", err) + return false, err + } else if wallet.Confirmed.IsZero() { + c.logger.Warn("contract formations skipped, wallet is empty") } else { - for _, fc := range formed { - updatedSet = append(updatedSet, fc) - contractData[fc] = 0 + formed, err = c.runContractFormations(ctx, w, candidates, usedHosts, unusableHosts, state.cfg.Contracts.Amount-uint64(len(updatedSet)), &remaining) + if err != nil { + c.logger.Errorf("failed to form contracts, err: %v", err) // continue + } else { + for _, fc := range formed { + updatedSet = append(updatedSet, fc) + contractData[fc] = 0 + } } } } diff --git a/autopilot/hostinfo.go b/autopilot/hostinfo.go index 459b82ac4..82efa1d61 100644 --- a/autopilot/hostinfo.go +++ b/autopilot/hostinfo.go @@ -6,6 +6,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/hostdb" "go.sia.tech/renterd/worker" ) @@ -66,16 +67,51 @@ func (c *contractor) HostInfo(ctx context.Context, hostKey types.PublicKey) (api }, nil } +func (c *contractor) hostInfoFromCache(ctx context.Context, host hostdb.Host) (hi hostInfo, found bool) { + // grab host details from cache + c.mu.Lock() + hi, found = c.cachedHostInfo[host.PublicKey] + storedData := c.cachedDataStored[host.PublicKey] + minScore := c.cachedMinScore + c.mu.Unlock() + + // return early if the host info is not cached + if !found { + return + } + + // try and refresh the host info if it got scanned in the meantime, this + // inconsistency would resolve itself but trying to update it here improves + // first time user experience + if host.Scanned && hi.UnusableResult.notcompletingscan > 0 { + cs, err := c.ap.bus.ConsensusState(ctx) + if err != nil { + c.logger.Error("failed to fetch consensus state from bus: %v", err) + } else { + state := c.ap.State() + gc := worker.NewGougingChecker(state.gs, cs, state.fee, state.cfg.Contracts.Period, state.cfg.Contracts.RenewWindow) + isUsable, unusableResult := isUsableHost(state.cfg, state.rs, gc, host, minScore, storedData) + hi = hostInfo{ + Usable: isUsable, + UnusableResult: unusableResult, + } + + // update cache + c.mu.Lock() + c.cachedHostInfo[host.PublicKey] = hi + c.mu.Unlock() + } + } + + return +} + func (c *contractor) HostInfos(ctx context.Context, filterMode, usabilityMode, addressContains string, keyIn []types.PublicKey, offset, limit int) ([]api.HostHandlerResponse, error) { // declare helper to decide whether to keep a host. if !isValidUsabilityFilterMode(usabilityMode) { return nil, fmt.Errorf("invalid usability mode: '%v', options are 'usable', 'unusable' or an empty string for no filter", usabilityMode) } - c.mu.Lock() - hostInfo := c.cachedHostInfo - c.mu.Unlock() - keep := func(usable bool) bool { switch usabilityMode { case api.UsabilityFilterModeUsable: @@ -115,7 +151,7 @@ func (c *contractor) HostInfos(ctx context.Context, filterMode, usabilityMode, a // decide how many of the returned hosts to keep. var keptHosts int for _, host := range hosts { - hi, cached := hostInfo[host.PublicKey] + hi, cached := c.hostInfoFromCache(ctx, host) if !cached { // when the filterMode is "all" we include uncached hosts and // set IsChecked = false. diff --git a/autopilot/scanner.go b/autopilot/scanner.go index 8b258ab3c..7f04fe116 100644 --- a/autopilot/scanner.go +++ b/autopilot/scanner.go @@ -16,11 +16,9 @@ import ( ) const ( - // TODO: make these configurable scannerTimeoutInterval = 10 * time.Minute scannerTimeoutMinTimeout = time.Second * 5 - // TODO: make these configurable trackerMinDataPoints = 25 trackerNumDataPoints = 1000 trackerTimeoutPercentile = 99 @@ -40,6 +38,7 @@ type ( tracker *tracker logger *zap.SugaredLogger ap *Autopilot + wg sync.WaitGroup scanBatchSize uint64 scanThreads uint64 @@ -54,6 +53,7 @@ type ( scanningLastStart time.Time timeout time.Duration timeoutLastUpdate time.Time + interruptScanChan chan struct{} } scanWorker interface { RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) @@ -137,6 +137,8 @@ func newScanner(ap *Autopilot, scanBatchSize, scanMinRecentFailures, scanThreads logger: ap.logger.Named("scanner"), ap: ap, + interruptScanChan: make(chan struct{}), + scanBatchSize: scanBatchSize, scanThreads: scanThreads, scanMinInterval: scanMinInterval, @@ -153,27 +155,54 @@ func (s *scanner) Status() (bool, time.Time) { return s.scanning, s.scanningLastStart } +func (s *scanner) isInterrupted() bool { + select { + case <-s.interruptScanChan: + return true + default: + return false + } +} + func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bool) bool { if s.ap.isStopped() { return false } + scanType := "host scan" + if force { + scanType = "forced scan" + } + s.mu.Lock() - if !force && (s.scanning || !s.isScanRequired()) { + if force { + close(s.interruptScanChan) + s.mu.Unlock() + + s.logger.Infof("waiting for ongoing scan to complete") + s.wg.Wait() + + s.mu.Lock() + s.interruptScanChan = make(chan struct{}) + } else if s.scanning || !s.isScanRequired() { s.mu.Unlock() return false } - - s.logger.Info("host scan started") s.scanningLastStart = time.Now() s.scanning = true s.mu.Unlock() - maxDowntimeHours := s.ap.State().cfg.Hosts.MaxDowntimeHours + s.logger.Infof("%s started", scanType) + maxDowntime := time.Duration(s.ap.State().cfg.Hosts.MaxDowntimeHours) * time.Hour - go func() { + s.wg.Add(1) + go func(st string) { + defer s.wg.Done() + + var interrupted bool for resp := range s.launchScanWorkers(ctx, w, s.launchHostScans()) { - if s.ap.isStopped() { + if s.isInterrupted() || s.ap.isStopped() { + interrupted = true break } if resp.err != nil && !strings.Contains(resp.err.Error(), "connection refused") { @@ -181,23 +210,21 @@ func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bo } } - if !s.ap.isStopped() && maxDowntimeHours > 0 { - s.logger.Debugf("removing hosts that have been offline for more than %v hours", maxDowntimeHours) - maxDowntime := time.Hour * time.Duration(maxDowntimeHours) + if !interrupted && maxDowntime > 0 { + s.logger.Debugf("removing hosts that have been offline for more than %v", maxDowntime) removed, err := s.bus.RemoveOfflineHosts(ctx, s.scanMinRecentFailures, maxDowntime) - if removed > 0 { - s.logger.Infof("removed %v offline hosts", removed) - } if err != nil { s.logger.Errorf("error occurred while removing offline hosts, err: %v", err) + } else if removed > 0 { + s.logger.Infof("removed %v offline hosts", removed) } } s.mu.Lock() s.scanning = false - s.logger.Debugf("host scan finished after %v", time.Since(s.scanningLastStart)) + s.logger.Debugf("%s finished after %v", st, time.Since(s.scanningLastStart)) s.mu.Unlock() - }() + }(scanType) return true } @@ -284,9 +311,11 @@ func (s *scanner) launchScanWorkers(ctx context.Context, w scanWorker, reqs chan scan, err := w.RHPScan(ctx, req.hostKey, req.hostIP, s.currentTimeout()) if err != nil { break // abort + } else if !isErr(errors.New(scan.ScanError), errIOTimeout) && scan.Ping > 0 { + s.tracker.addDataPoint(time.Duration(scan.Ping)) } + respChan <- scanResp{req.hostKey, scan.Settings, err} - s.tracker.addDataPoint(time.Duration(scan.Ping)) } if atomic.AddUint64(&liveThreads, ^uint64(0)) == 0 { diff --git a/autopilot/scanner_test.go b/autopilot/scanner_test.go index 31f52f6e8..b30aea014 100644 --- a/autopilot/scanner_test.go +++ b/autopilot/scanner_test.go @@ -153,6 +153,8 @@ func newTestScanner(b *mockBus, w *mockWorker) *scanner { trackerTimeoutPercentile, ), + interruptScanChan: make(chan struct{}), + scanBatchSize: 40, scanThreads: 3, scanMinInterval: time.Minute, diff --git a/internal/testing/pruning_test.go b/internal/testing/pruning_test.go index 0a4c9e096..7903c7dcc 100644 --- a/internal/testing/pruning_test.go +++ b/internal/testing/pruning_test.go @@ -21,18 +21,8 @@ func TestHostPruning(t *testing.T) { t.SkipNow() } - ctx := context.Background() - - // update the min scan interval to ensure the scanner scans all hosts on - // every iteration of the autopilot loop, this ensures we try and remove - // offline hosts in every autopilot loop - apCfg := testApCfg() - apCfg.ScannerInterval = 0 - // create a new test cluster - cluster := newTestCluster(t, testClusterOptions{ - autopilotCfg: &apCfg, - }) + cluster := newTestCluster(t, clusterOptsDefault) defer cluster.Shutdown() b := cluster.Bus w := cluster.Worker @@ -55,23 +45,6 @@ func TestHostPruning(t *testing.T) { tt.OK(b.RecordHostScans(context.Background(), his)) } - // create a helper function that waits for an autopilot loop to finish - waitForAutopilotLoop := func() { - t.Helper() - var nTriggered int - tt.Retry(10, 500*time.Millisecond, func() error { - triggered, err := a.Trigger(true) - tt.OK(err) - if triggered { - nTriggered++ - if nTriggered > 1 { - return nil - } - } - return errors.New("autopilot loop has not finished") - }) - } - // add a host hosts := cluster.AddHosts(1) h1 := hosts[0] @@ -84,20 +57,32 @@ func TestHostPruning(t *testing.T) { tt.OKAll(w.RHPScan(context.Background(), h1.PublicKey(), h.NetAddress, 0)) // block the host - tt.OK(b.UpdateHostBlocklist(ctx, []string{h1.PublicKey().String()}, nil, false)) + tt.OK(b.UpdateHostBlocklist(context.Background(), []string{h1.PublicKey().String()}, nil, false)) // remove it from the cluster manually - cluster.hosts = cluster.hosts[1:] - tt.OK(hosts[0].Close()) + cluster.RemoveHost(h1) // shut down the worker manually, this will flush any interactions - tt.OK(cluster.workerShutdownFns[1](context.Background())) - cluster.workerShutdownFns = cluster.workerShutdownFns[:1] + cluster.ShutdownWorker(context.Background()) // record 9 failed interactions, right before the pruning threshold, and // wait for the autopilot loop to finish at least once recordFailedInteractions(9, h1.PublicKey()) - waitForAutopilotLoop() + + // assert the autopilot loop ran at least once by successfully triggering it twice + remaining := 2 + tt.Retry(100, 50*time.Millisecond, func() error { + triggered, err := a.Trigger(true) + tt.OK(err) + + if triggered { + remaining-- + } + if remaining > 0 { + return errors.New("failed to trigger the autopilot loop") + } + return nil + }) // assert the host was not pruned hostss, err := b.Hosts(context.Background(), api.GetHostsOptions{}) @@ -109,14 +94,16 @@ func TestHostPruning(t *testing.T) { // record one more failed interaction, this should push the host over the // pruning threshold recordFailedInteractions(1, h1.PublicKey()) - waitForAutopilotLoop() - // assert the host was not pruned - hostss, err = b.Hosts(context.Background(), api.GetHostsOptions{}) - tt.OK(err) - if len(hostss) != 0 { - t.Fatalf("host was not pruned, %+v", hostss[0].Interactions) - } + // assert the host was pruned + tt.Retry(10, time.Second, func() error { + hostss, err = b.Hosts(context.Background(), api.GetHostsOptions{}) + tt.OK(err) + if len(hostss) != 0 { + return fmt.Errorf("host was not pruned, %+v", hostss[0].Interactions) + } + return nil + }) // assert validation on MaxDowntimeHours ap, err := b.Autopilot(context.Background(), api.DefaultAutopilotID) diff --git a/stores/wallet.go b/stores/wallet.go index 866d3e701..b4aa8a48c 100644 --- a/stores/wallet.go +++ b/stores/wallet.go @@ -67,7 +67,7 @@ func (s *SQLStore) UnspentSiacoinElements(matured bool) ([]wallet.SiacoinElement tx := s.db var elems []dbSiacoinElement if matured { - tx = tx.Where("maturity_height < ?", height) + tx = tx.Where("maturity_height <= ?", height) } if err := tx.Find(&elems).Error; err != nil { return nil, err