diff --git a/api/worker.go b/api/worker.go index ca2ae30f3..894fd0c60 100644 --- a/api/worker.go +++ b/api/worker.go @@ -298,3 +298,10 @@ func ParseDownloadRange(req *http.Request) (DownloadRange, error) { } return dr, nil } + +func (r RHPScanResponse) Error() error { + if r.ScanError != "" { + return errors.New(r.ScanError) + } + return nil +} diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 997c645b2..d16e0e8aa 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -17,6 +17,7 @@ import ( "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/autopilot/contractor" + "go.sia.tech/renterd/autopilot/scanner" "go.sia.tech/renterd/build" "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" @@ -101,7 +102,7 @@ type Autopilot struct { a *accounts c *contractor.Contractor m *migrator - s *scanner + s scanner.Scanner tickerDuration time.Duration wg sync.WaitGroup @@ -122,9 +123,8 @@ type Autopilot struct { } // New initializes an Autopilot. -func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) { +func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (_ *Autopilot, err error) { shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background()) - ap := &Autopilot{ alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)), id: id, @@ -139,19 +139,12 @@ func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat tim pruningAlertIDs: make(map[types.FileContractID]types.Hash256), } - scanner, err := newScanner( - ap, - scannerBatchSize, - scannerNumThreads, - scannerScanInterval, - scannerTimeoutInterval, - scannerTimeoutMinTimeout, - ) + + ap.s, err = scanner.New(ap.bus, scannerBatchSize, scannerNumThreads, scannerScanInterval, ap.logger) if err != nil { - return nil, err + return } - ap.s = scanner ap.c = contractor.New(bus, bus, ap.logger, revisionSubmissionBuffer, revisionBroadcastInterval) ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker) ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval, revisionSubmissionBuffer) @@ -253,9 +246,10 @@ func (ap *Autopilot) Run() error { defer ap.logger.Info("autopilot iteration ended") // initiate a host scan - no need to be synced or configured for scanning - ap.s.tryUpdateTimeout() - ap.s.tryPerformHostScan(ap.shutdownCtx, w, forceScan) - forceScan = false // reset forceScan + ap.s.Scan(ap.shutdownCtx, w, forceScan) + + // reset forceScans + forceScan = false // block until consensus is synced if synced, blocked, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced { @@ -267,7 +261,7 @@ func (ap *Autopilot) Run() error { return } else if blocked { if scanning, _ := ap.s.Status(); !scanning { - ap.s.tryPerformHostScan(ap.shutdownCtx, w, true) + ap.s.Scan(ap.shutdownCtx, w, true) } } @@ -288,8 +282,8 @@ func (ap *Autopilot) Run() error { return } - // prune hosts that have been offline for too long - ap.s.PruneHosts(ap.shutdownCtx, autopilot.Config.Hosts) + // update the scanner with the hosts config + ap.s.UpdateHostsConfig(autopilot.Config.Hosts) // Log worker id chosen for this maintenance iteration. workerID, err := w.ID(ap.shutdownCtx) @@ -360,7 +354,7 @@ func (ap *Autopilot) Run() error { } // Shutdown shuts down the autopilot. -func (ap *Autopilot) Shutdown(_ context.Context) error { +func (ap *Autopilot) Shutdown(ctx context.Context) error { ap.startStopMu.Lock() defer ap.startStopMu.Unlock() @@ -369,6 +363,7 @@ func (ap *Autopilot) Shutdown(_ context.Context) error { ap.shutdownCtxCancel() close(ap.triggerChan) ap.wg.Wait() + ap.s.Shutdown(ctx) ap.startTime = time.Time{} } return nil @@ -695,8 +690,16 @@ func (ap *Autopilot) configHandlerPUT(jc jape.Context) { autopilot.Config = cfg } - // update the autopilot and interrupt migrations if necessary - if err := jc.Check("failed to update autopilot config", ap.bus.UpdateAutopilot(jc.Request.Context(), autopilot)); err == nil && contractSetChanged { + // update the autopilot + if jc.Check("failed to update autopilot config", ap.bus.UpdateAutopilot(jc.Request.Context(), autopilot)) != nil { + return + } + + // update the scanner with the hosts config + ap.s.UpdateHostsConfig(cfg.Hosts) + + // interrupt migrations if necessary + if contractSetChanged { ap.m.SignalMaintenanceFinished() } } diff --git a/autopilot/percentile.go b/autopilot/percentile.go deleted file mode 100644 index eb3526b9a..000000000 --- a/autopilot/percentile.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2014-2020 Montana Flynn (https://montanaflynn.com) -package autopilot - -import ( - "errors" - "math" - "sort" -) - -var ( - errEmptyInput = errors.New("input must not be empty") - errOutOfBounds = errors.New("input is outside of range") -) - -func percentile(input []float64, percent float64) (float64, error) { - // validate input - if len(input) == 0 { - return math.NaN(), errEmptyInput - } - if percent <= 0 || percent > 100 { - return math.NaN(), errOutOfBounds - } - - // return early if we only have one - if len(input) == 1 { - return input[0], nil - } - - // deep copy the input and sort - input = append([]float64{}, input...) - sort.Float64s(input) - - // multiply percent by length of input - index := (percent / 100) * float64(len(input)) - - // check if the index is a whole number, if so return that input - if index == float64(int64(index)) { - i := int(index) - return input[i-1], nil - } - - // if the index is greater than one, return the average of the index and the value prior - if index > 1 { - i := int(index) - avg := (input[i-1] + input[i]) / 2 - return avg, nil - } - - return math.NaN(), errOutOfBounds -} diff --git a/autopilot/scanner.go b/autopilot/scanner.go deleted file mode 100644 index d6450d7e9..000000000 --- a/autopilot/scanner.go +++ /dev/null @@ -1,339 +0,0 @@ -package autopilot - -import ( - "context" - "errors" - "strings" - "sync" - "sync/atomic" - "time" - - rhpv2 "go.sia.tech/core/rhp/v2" - "go.sia.tech/core/types" - "go.sia.tech/renterd/api" - "go.sia.tech/renterd/internal/utils" - "go.uber.org/zap" -) - -const ( - scannerTimeoutInterval = 10 * time.Minute - scannerTimeoutMinTimeout = 10 * time.Second - - trackerMinDataPoints = 25 - trackerNumDataPoints = 1000 - trackerTimeoutPercentile = 99 -) - -type ( - scanner struct { - // TODO: use the actual bus and worker interfaces when they've consolidated - // a bit, we currently use inline interfaces to avoid having to update the - // scanner tests with every interface change - bus interface { - SearchHosts(ctx context.Context, opts api.SearchHostOptions) ([]api.Host, error) - HostsForScanning(ctx context.Context, opts api.HostsForScanningOptions) ([]api.HostAddress, error) - RemoveOfflineHosts(ctx context.Context, minRecentScanFailures uint64, maxDowntime time.Duration) (uint64, error) - } - - tracker *tracker - logger *zap.SugaredLogger - ap *Autopilot - wg sync.WaitGroup - - scanBatchSize uint64 - scanThreads uint64 - scanMinInterval time.Duration - - timeoutMinInterval time.Duration - timeoutMinTimeout time.Duration - - mu sync.Mutex - scanning bool - scanningLastStart time.Time - timeout time.Duration - timeoutLastUpdate time.Time - interruptScanChan chan struct{} - } - scanWorker interface { - RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) - } - - scanReq struct { - hostKey types.PublicKey - hostIP string - } - - scanResp struct { - hostKey types.PublicKey - settings rhpv2.HostSettings - err error - } - - tracker struct { - threshold uint64 - percentile float64 - - mu sync.Mutex - count uint64 - timings []float64 - } -) - -func newTracker(threshold, total uint64, percentile float64) *tracker { - return &tracker{ - threshold: threshold, - percentile: percentile, - timings: make([]float64, total), - } -} - -func (t *tracker) addDataPoint(duration time.Duration) { - if duration == 0 { - return - } - - t.mu.Lock() - defer t.mu.Unlock() - - t.timings[t.count%uint64(len(t.timings))] = float64(duration.Milliseconds()) - - // NOTE: we silently overflow and disregard the threshold being reapplied - // when we overflow entirely, since we only ever increment the count with 1 - // it will never happen - t.count += 1 -} - -func (t *tracker) timeout() time.Duration { - t.mu.Lock() - defer t.mu.Unlock() - if t.count < uint64(t.threshold) { - return 0 - } - - percentile, err := percentile(t.timings, t.percentile) - if err != nil { - return 0 - } - - return time.Duration(percentile) * time.Millisecond -} - -func newScanner(ap *Autopilot, scanBatchSize, scanThreads uint64, scanMinInterval, timeoutMinInterval, timeoutMinTimeout time.Duration) (*scanner, error) { - if scanBatchSize == 0 { - return nil, errors.New("scanner batch size has to be greater than zero") - } - if scanThreads == 0 { - return nil, errors.New("scanner threads has to be greater than zero") - } - - return &scanner{ - bus: ap.bus, - tracker: newTracker( - trackerMinDataPoints, - trackerNumDataPoints, - trackerTimeoutPercentile, - ), - logger: ap.logger.Named("scanner"), - ap: ap, - - interruptScanChan: make(chan struct{}), - - scanBatchSize: scanBatchSize, - scanThreads: scanThreads, - scanMinInterval: scanMinInterval, - - timeoutMinInterval: timeoutMinInterval, - timeoutMinTimeout: timeoutMinTimeout, - }, nil -} - -func (s *scanner) Status() (bool, time.Time) { - s.mu.Lock() - defer s.mu.Unlock() - return s.scanning, s.scanningLastStart -} - -func (s *scanner) isInterrupted() bool { - select { - case <-s.interruptScanChan: - return true - default: - return false - } -} - -func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bool) { - if s.ap.isStopped() { - return - } - - scanType := "host scan" - if force { - scanType = "forced scan" - } - - s.mu.Lock() - if force { - close(s.interruptScanChan) - s.mu.Unlock() - - s.logger.Infof("waiting for ongoing scan to complete") - s.wg.Wait() - - s.mu.Lock() - s.interruptScanChan = make(chan struct{}) - } else if s.scanning || !s.isScanRequired() { - s.mu.Unlock() - return - } - s.scanningLastStart = time.Now() - s.scanning = true - s.mu.Unlock() - - s.logger.Infof("%s started", scanType) - - s.wg.Add(1) - s.ap.wg.Add(1) - go func(st string) { - defer s.wg.Done() - defer s.ap.wg.Done() - - for resp := range s.launchScanWorkers(ctx, w, s.launchHostScans()) { - if s.isInterrupted() || s.ap.isStopped() { - break - } - if resp.err != nil && !strings.Contains(resp.err.Error(), "connection refused") { - s.logger.Error(resp.err) - } - } - s.mu.Lock() - s.scanning = false - s.logger.Infof("%s finished after %v", st, time.Since(s.scanningLastStart)) - s.mu.Unlock() - }(scanType) -} - -func (s *scanner) PruneHosts(ctx context.Context, cfg api.HostsConfig) { - maxDowntime := time.Duration(cfg.MaxDowntimeHours) * time.Hour - minRecentScanFailures := cfg.MinRecentScanFailures - if maxDowntime > 0 { - s.logger.Debugf("removing hosts that have been offline for more than %v and have failed at least %d scans", maxDowntime, minRecentScanFailures) - removed, err := s.bus.RemoveOfflineHosts(ctx, minRecentScanFailures, maxDowntime) - if err != nil { - s.logger.Errorf("error occurred while removing offline hosts, err: %v", err) - } else if removed > 0 { - s.logger.Infof("removed %v offline hosts", removed) - } - } -} - -func (s *scanner) tryUpdateTimeout() { - s.mu.Lock() - defer s.mu.Unlock() - if !s.isTimeoutUpdateRequired() { - return - } - - updated := s.tracker.timeout() - if updated < s.timeoutMinTimeout { - s.logger.Infof("updated timeout is lower than min timeout, %v<%v", updated, s.timeoutMinTimeout) - updated = s.timeoutMinTimeout - } - - if s.timeout != updated { - s.logger.Infof("updated timeout %v->%v", s.timeout, updated) - s.timeout = updated - } - s.timeoutLastUpdate = time.Now() -} - -func (s *scanner) launchHostScans() chan scanReq { - reqChan := make(chan scanReq, s.scanBatchSize) - go func() { - defer close(reqChan) - - var offset int - var exhausted bool - cutoff := time.Now().Add(-s.scanMinInterval) - for !s.ap.isStopped() && !exhausted { - // fetch next batch - hosts, err := s.bus.HostsForScanning(s.ap.shutdownCtx, api.HostsForScanningOptions{ - MaxLastScan: api.TimeRFC3339(cutoff), - Offset: offset, - Limit: int(s.scanBatchSize), - }) - if err != nil { - s.logger.Errorf("could not get hosts for scanning, err: %v", err) - break - } - if len(hosts) == 0 { - s.logger.Debug("no hosts to scan") - break - } - if len(hosts) < int(s.scanBatchSize) { - exhausted = true - } - - s.logger.Infof("scanning %d hosts in range %d-%d", len(hosts), offset, offset+int(s.scanBatchSize)) - offset += int(s.scanBatchSize) - - // add batch to scan queue - for _, h := range hosts { - select { - case <-s.ap.shutdownCtx.Done(): - return - case reqChan <- scanReq{ - hostKey: h.PublicKey, - hostIP: h.NetAddress, - }: - } - } - } - }() - - return reqChan -} - -func (s *scanner) launchScanWorkers(ctx context.Context, w scanWorker, reqs chan scanReq) chan scanResp { - respChan := make(chan scanResp, s.scanThreads) - liveThreads := s.scanThreads - - for i := uint64(0); i < s.scanThreads; i++ { - go func() { - for req := range reqs { - if s.ap.isStopped() { - break // shutdown - } - - scan, err := w.RHPScan(ctx, req.hostKey, req.hostIP, s.currentTimeout()) - if err != nil { - break // abort - } else if !utils.IsErr(errors.New(scan.ScanError), utils.ErrIOTimeout) && scan.Ping > 0 { - s.tracker.addDataPoint(time.Duration(scan.Ping)) - } - - respChan <- scanResp{req.hostKey, scan.Settings, err} - } - - if atomic.AddUint64(&liveThreads, ^uint64(0)) == 0 { - close(respChan) - } - }() - } - - return respChan -} - -func (s *scanner) isScanRequired() bool { - return s.scanningLastStart.IsZero() || time.Since(s.scanningLastStart) > s.scanMinInterval/20 // check 20 times per minInterval, so every 30 minutes -} - -func (s *scanner) isTimeoutUpdateRequired() bool { - return s.timeoutLastUpdate.IsZero() || time.Since(s.timeoutLastUpdate) > s.timeoutMinInterval -} - -func (s *scanner) currentTimeout() time.Duration { - s.mu.Lock() - defer s.mu.Unlock() - return s.timeout -} diff --git a/autopilot/scanner/scanner.go b/autopilot/scanner/scanner.go new file mode 100644 index 000000000..15e82a035 --- /dev/null +++ b/autopilot/scanner/scanner.go @@ -0,0 +1,306 @@ +package scanner + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/stats" + "go.uber.org/zap" +) + +const ( + DefaultScanTimeout = 10 * time.Second +) + +type ( + HostScanner interface { + RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) + } + + HostStore interface { + HostsForScanning(ctx context.Context, opts api.HostsForScanningOptions) ([]api.HostAddress, error) + RemoveOfflineHosts(ctx context.Context, minRecentScanFailures uint64, maxDowntime time.Duration) (uint64, error) + } + + Scanner interface { + Scan(ctx context.Context, w HostScanner, force bool) + Shutdown(ctx context.Context) error + Status() (bool, time.Time) + UpdateHostsConfig(cfg api.HostsConfig) + } +) + +type ( + scanner struct { + hs HostStore + + scanBatchSize int + scanThreads int + scanInterval time.Duration + + statsHostPingMS *stats.DataPoints + + shutdownChan chan struct{} + wg sync.WaitGroup + + logger *zap.SugaredLogger + + mu sync.Mutex + hostsCfg *api.HostsConfig + + scanning bool + scanningLastStart time.Time + + interruptChan chan struct{} + } + + scanJob struct { + hostKey types.PublicKey + hostIP string + } +) + +func New(hs HostStore, scanBatchSize, scanThreads uint64, scanMinInterval time.Duration, logger *zap.SugaredLogger) (Scanner, error) { + if scanBatchSize == 0 { + return nil, errors.New("scanner batch size has to be greater than zero") + } + if scanThreads == 0 { + return nil, errors.New("scanner threads has to be greater than zero") + } + return &scanner{ + hs: hs, + + scanBatchSize: int(scanBatchSize), + scanThreads: int(scanThreads), + scanInterval: scanMinInterval, + + statsHostPingMS: stats.NoDecay(), + logger: logger.Named("scanner"), + + interruptChan: make(chan struct{}), + shutdownChan: make(chan struct{}), + }, nil +} + +func (s *scanner) Scan(ctx context.Context, w HostScanner, force bool) { + if s.canSkipScan(force) { + s.logger.Debug("host scan skipped") + return + } + + cutoff := time.Now() + if !force { + cutoff = cutoff.Add(-s.scanInterval) + } + + s.logger.Infow("scan started", + "batch", s.scanBatchSize, + "force", force, + "threads", s.scanThreads, + "cutoff", cutoff, + ) + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + hosts := s.fetchHosts(ctx, cutoff) + scanned := s.scanHosts(ctx, w, hosts) + removed := s.removeOfflineHosts(ctx) + + s.mu.Lock() + defer s.mu.Unlock() + s.scanning = false + s.logger.Infow("scan finished", + "force", force, + "duration", time.Since(s.scanningLastStart), + "pingMSAvg", s.statsHostPingMS.Average(), + "pingMSP90", s.statsHostPingMS.P90(), + "removed", removed, + "scanned", scanned) + }() +} + +func (s *scanner) Shutdown(ctx context.Context) error { + defer close(s.shutdownChan) + + waitChan := make(chan struct{}) + go func() { + s.wg.Wait() + close(waitChan) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-waitChan: + } + + return nil +} + +func (s *scanner) Status() (bool, time.Time) { + s.mu.Lock() + defer s.mu.Unlock() + return s.scanning, s.scanningLastStart +} + +func (s *scanner) UpdateHostsConfig(cfg api.HostsConfig) { + s.mu.Lock() + defer s.mu.Unlock() + s.hostsCfg = &cfg +} + +func (s *scanner) fetchHosts(ctx context.Context, cutoff time.Time) chan scanJob { + jobsChan := make(chan scanJob, s.scanBatchSize) + go func() { + defer close(jobsChan) + + var exhausted bool + for offset := 0; !exhausted; offset += s.scanBatchSize { + hosts, err := s.hs.HostsForScanning(ctx, api.HostsForScanningOptions{ + MaxLastScan: api.TimeRFC3339(cutoff), + Offset: offset, + Limit: s.scanBatchSize, + }) + if err != nil { + s.logger.Errorf("could not get hosts for scanning, err: %v", err) + return + } else if len(hosts) < s.scanBatchSize { + exhausted = true + } + + s.logger.Debugf("fetched %d hosts for scanning", len(hosts)) + for _, h := range hosts { + select { + case <-s.interruptChan: + return + case <-s.shutdownChan: + return + case jobsChan <- scanJob{ + hostKey: h.PublicKey, + hostIP: h.NetAddress, + }: + } + } + } + }() + + return jobsChan +} + +func (s *scanner) scanHosts(ctx context.Context, w HostScanner, hosts chan scanJob) (scanned uint64) { + // define worker + worker := func() { + for h := range hosts { + if s.isShutdown() || s.isInterrupted() { + break // shutdown + } + + scan, err := w.RHPScan(ctx, h.hostKey, h.hostIP, DefaultScanTimeout) + if err != nil { + s.logger.Errorw("worker stopped", zap.Error(err), "hk", h.hostKey) + break // abort + } else if err := scan.Error(); err != nil { + s.logger.Debugw("host scan failed", zap.Error(err), "hk", h.hostKey, "ip", h.hostIP) + } else { + s.statsHostPingMS.Track(float64(time.Duration(scan.Ping).Milliseconds())) + atomic.AddUint64(&scanned, 1) + } + } + } + + // launch all workers + var wg sync.WaitGroup + for t := 0; t < s.scanThreads; t++ { + wg.Add(1) + go func() { + worker() + wg.Done() + }() + } + + // wait until they're done + wg.Wait() + + s.statsHostPingMS.Recompute() + return +} + +func (s *scanner) isInterrupted() bool { + select { + case <-s.interruptChan: + return true + default: + } + return false +} + +func (s *scanner) isShutdown() bool { + select { + case <-s.shutdownChan: + return true + default: + } + return false +} + +func (s *scanner) removeOfflineHosts(ctx context.Context) (removed uint64) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.hostsCfg == nil { + s.logger.Info("no hosts config set, skipping removal of offline hosts") + return + } + + maxDowntime := time.Duration(s.hostsCfg.MaxDowntimeHours) * time.Hour + if maxDowntime == 0 { + s.logger.Info("hosts config has no max downtime set, skipping removal of offline hosts") + return + } + + s.logger.Infow("removing offline hosts", + "maxDowntime", maxDowntime, + "minRecentScanFailures", s.hostsCfg.MinRecentScanFailures) + + var err error + removed, err = s.hs.RemoveOfflineHosts(ctx, s.hostsCfg.MinRecentScanFailures, maxDowntime) + if err != nil { + s.logger.Errorw("removing offline hosts failed", zap.Error(err), "maxDowntime", maxDowntime, "minRecentScanFailures", s.hostsCfg.MinRecentScanFailures) + return + } + + return +} + +func (s *scanner) canSkipScan(force bool) bool { + if s.isShutdown() { + return true + } + + s.mu.Lock() + if force { + close(s.interruptChan) + s.mu.Unlock() + + s.logger.Infof("host scan interrupted, waiting for ongoing scan to complete") + s.wg.Wait() + + s.mu.Lock() + s.interruptChan = make(chan struct{}) + } else if s.scanning || time.Since(s.scanningLastStart) < s.scanInterval { + s.mu.Unlock() + return true + } + s.scanningLastStart = time.Now() + s.scanning = true + s.mu.Unlock() + + return false +} diff --git a/autopilot/scanner/scanner_test.go b/autopilot/scanner/scanner_test.go new file mode 100644 index 000000000..c9997162f --- /dev/null +++ b/autopilot/scanner/scanner_test.go @@ -0,0 +1,162 @@ +package scanner + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/test" + "go.uber.org/zap" +) + +const ( + testBatchSize = 40 + testNumThreads = 3 +) + +type mockHostStore struct { + hosts []api.Host + + mu sync.Mutex + scans []string + removals []string +} + +func (hs *mockHostStore) HostsForScanning(ctx context.Context, opts api.HostsForScanningOptions) ([]api.HostAddress, error) { + hs.mu.Lock() + defer hs.mu.Unlock() + hs.scans = append(hs.scans, fmt.Sprintf("%d-%d", opts.Offset, opts.Offset+opts.Limit)) + + start := opts.Offset + if start > len(hs.hosts) { + return nil, nil + } + + end := opts.Offset + opts.Limit + if end > len(hs.hosts) { + end = len(hs.hosts) + } + + var hostAddresses []api.HostAddress + for _, h := range hs.hosts[start:end] { + hostAddresses = append(hostAddresses, api.HostAddress{ + NetAddress: h.NetAddress, + PublicKey: h.PublicKey, + }) + } + return hostAddresses, nil +} + +func (hs *mockHostStore) RemoveOfflineHosts(ctx context.Context, minRecentScanFailures uint64, maxDowntime time.Duration) (uint64, error) { + hs.mu.Lock() + defer hs.mu.Unlock() + hs.removals = append(hs.removals, fmt.Sprintf("%d-%d", minRecentScanFailures, maxDowntime)) + return 0, nil +} + +func (hs *mockHostStore) state() ([]string, []string) { + hs.mu.Lock() + defer hs.mu.Unlock() + return hs.scans, hs.removals +} + +type mockHostScanner struct { + blockChan chan struct{} + + mu sync.Mutex + scanCount int +} + +func (w *mockHostScanner) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, _ time.Duration) (api.RHPScanResponse, error) { + if w.blockChan != nil { + <-w.blockChan + } + + w.mu.Lock() + defer w.mu.Unlock() + w.scanCount++ + + return api.RHPScanResponse{}, nil +} + +func TestScanner(t *testing.T) { + // create mock store + hs := &mockHostStore{hosts: test.NewHosts(100)} + + // create test scanner + s, err := New(hs, testBatchSize, testNumThreads, time.Minute, zap.NewNop().Sugar()) + if err != nil { + t.Fatal(err) + } + defer s.Shutdown(context.Background()) + + // assert it's not scanning + scanning, _ := s.Status() + if scanning { + t.Fatal("unexpected") + } + + // initiate a host scan using a worker that blocks + w := &mockHostScanner{blockChan: make(chan struct{})} + s.Scan(context.Background(), w, false) + + // assert it's scanning + scanning, _ = s.Status() + if !scanning { + t.Fatal("unexpected") + } + + // unblock the worker and sleep + close(w.blockChan) + time.Sleep(time.Second) + + // assert the scan is done + scanning, _ = s.Status() + if scanning { + t.Fatal("unexpected") + } + + // assert we did not remove offline hosts + if _, removals := hs.state(); len(removals) != 0 { + t.Fatalf("unexpected removals, %v != 0", len(removals)) + } + + // assert the scanner made 3 batch reqs + if scans, _ := hs.state(); len(scans) != 3 { + t.Fatalf("unexpected number of requests, %v != 3", len(scans)) + } else if scans[0] != "0-40" || scans[1] != "40-80" || scans[2] != "80-120" { + t.Fatalf("unexpected requests, %v", scans) + } + + // assert we scanned 100 hosts + if w.scanCount != 100 { + t.Fatalf("unexpected number of scans, %v != 100", w.scanCount) + } + + // assert we prevent starting a host scan immediately after a scan was done + s.Scan(context.Background(), w, false) + scanning, _ = s.Status() + if scanning { + t.Fatal("unexpected") + } + + // update the hosts config + s.UpdateHostsConfig(api.HostsConfig{ + MinRecentScanFailures: 10, + MaxDowntimeHours: 1, + }) + + s.Scan(context.Background(), w, true) + time.Sleep(time.Second) + + // assert we removed offline hosts + if _, removals := hs.state(); len(removals) != 1 { + t.Fatalf("unexpected removals, %v != 1", len(removals)) + } else if removals[0] != "10-3600000000000" { + t.Fatalf("unexpected removals, %v", removals) + } +} diff --git a/autopilot/scanner_test.go b/autopilot/scanner_test.go deleted file mode 100644 index 2dc113df2..000000000 --- a/autopilot/scanner_test.go +++ /dev/null @@ -1,161 +0,0 @@ -package autopilot - -import ( - "context" - "fmt" - "sync" - "testing" - "time" - - "go.sia.tech/core/types" - "go.sia.tech/renterd/api" - "go.sia.tech/renterd/internal/test" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -type mockBus struct { - hosts []api.Host - reqs []string -} - -func (b *mockBus) SearchHosts(ctx context.Context, opts api.SearchHostOptions) ([]api.Host, error) { - b.reqs = append(b.reqs, fmt.Sprintf("%d-%d", opts.Offset, opts.Offset+opts.Limit)) - - start := opts.Offset - if start > len(b.hosts) { - return nil, nil - } - - end := opts.Offset + opts.Limit - if end > len(b.hosts) { - end = len(b.hosts) - } - - return b.hosts[start:end], nil -} - -func (b *mockBus) HostsForScanning(ctx context.Context, opts api.HostsForScanningOptions) ([]api.HostAddress, error) { - hosts, err := b.SearchHosts(ctx, api.SearchHostOptions{ - Offset: opts.Offset, - Limit: opts.Limit, - }) - if err != nil { - return nil, err - } - var hostAddresses []api.HostAddress - for _, h := range hosts { - hostAddresses = append(hostAddresses, api.HostAddress{ - NetAddress: h.NetAddress, - PublicKey: h.PublicKey, - }) - } - return hostAddresses, nil -} - -func (b *mockBus) RemoveOfflineHosts(ctx context.Context, minRecentScanFailures uint64, maxDowntime time.Duration) (uint64, error) { - return 0, nil -} - -type mockWorker struct { - blockChan chan struct{} - - mu sync.Mutex - scanCount int -} - -func (w *mockWorker) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, _ time.Duration) (api.RHPScanResponse, error) { - if w.blockChan != nil { - <-w.blockChan - } - - w.mu.Lock() - defer w.mu.Unlock() - w.scanCount++ - - return api.RHPScanResponse{}, nil -} - -func (w *mockWorker) RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string) (api.HostPriceTable, error) { - return api.HostPriceTable{}, nil -} - -func TestScanner(t *testing.T) { - // prepare 100 hosts - hosts := test.NewHosts(100) - - // init new scanner - b := &mockBus{hosts: hosts} - w := &mockWorker{blockChan: make(chan struct{})} - s := newTestScanner(b) - - // assert it started a host scan - s.tryPerformHostScan(context.Background(), w, false) - if !s.isScanning() { - t.Fatal("unexpected") - } - - // unblock the worker and sleep - close(w.blockChan) - time.Sleep(time.Second) - - // assert the scan is done - if s.isScanning() { - t.Fatal("unexpected") - } - - // assert the scanner made 3 batch reqs - if len(b.reqs) != 3 { - t.Fatalf("unexpected number of requests, %v != 3", len(b.reqs)) - } - if b.reqs[0] != "0-40" || b.reqs[1] != "40-80" || b.reqs[2] != "80-120" { - t.Fatalf("unexpected requests, %v", b.reqs) - } - - // assert we scanned 100 hosts - if w.scanCount != 100 { - t.Fatalf("unexpected number of scans, %v != 100", w.scanCount) - } - - // assert we prevent starting a host scan immediately after a scan was done - s.tryPerformHostScan(context.Background(), w, false) - if s.isScanning() { - t.Fatal("unexpected") - } - - // reset the scanner - s.scanningLastStart = time.Time{} - - // assert it started a host scan - s.tryPerformHostScan(context.Background(), w, false) - if !s.isScanning() { - t.Fatal("unexpected") - } -} - -func (s *scanner) isScanning() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.scanning -} - -func newTestScanner(b *mockBus) *scanner { - ap := &Autopilot{} - ap.shutdownCtx, ap.shutdownCtxCancel = context.WithCancel(context.Background()) - return &scanner{ - ap: ap, - bus: b, - logger: zap.New(zapcore.NewNopCore()).Sugar(), - tracker: newTracker( - trackerMinDataPoints, - trackerNumDataPoints, - trackerTimeoutPercentile, - ), - - interruptScanChan: make(chan struct{}), - - scanBatchSize: 40, - scanThreads: 3, - scanMinInterval: time.Minute, - } -} diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 632bccf9c..cd4099091 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -137,9 +137,9 @@ var ( Heartbeat: 30 * time.Minute, MigrationHealthCutoff: 0.75, RevisionBroadcastInterval: 7 * 24 * time.Hour, - ScannerBatchSize: 1000, - ScannerInterval: 24 * time.Hour, - ScannerNumThreads: 100, + ScannerBatchSize: 100, + ScannerInterval: 4 * time.Hour, + ScannerNumThreads: 10, MigratorParallelSlabsPerWorker: 1, }, S3: config.S3{ diff --git a/internal/utils/net.go b/internal/utils/net.go index a23bd98ab..0b4706609 100644 --- a/internal/utils/net.go +++ b/internal/utils/net.go @@ -49,7 +49,7 @@ func ResolveHostIP(ctx context.Context, hostIP string) (subnets []string, privat // filter out hosts associated with more than two addresses or two of the same type if len(addrs) > 2 || (len(addrs) == 2) && (len(addrs[0].IP) == len(addrs[1].IP)) { - return nil, false, ErrHostTooManyAddresses + return nil, false, fmt.Errorf("%w: %+v", ErrHostTooManyAddresses, addrs) } // parse out subnets