Skip to content

Commit

Permalink
autopilot: refactor scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan authored and ChrisSchinnerl committed Jul 31, 2024
1 parent 5815d9e commit 5abb25b
Show file tree
Hide file tree
Showing 9 changed files with 504 additions and 576 deletions.
7 changes: 7 additions & 0 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,10 @@ func ParseDownloadRange(req *http.Request) (DownloadRange, error) {
}
return dr, nil
}

func (r RHPScanResponse) Error() error {
if r.ScanError != "" {
return errors.New(r.ScanError)
}
return nil
}
47 changes: 25 additions & 22 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/autopilot/contractor"
"go.sia.tech/renterd/autopilot/scanner"
"go.sia.tech/renterd/build"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
Expand Down Expand Up @@ -101,7 +102,7 @@ type Autopilot struct {
a *accounts
c *contractor.Contractor
m *migrator
s *scanner
s scanner.Scanner

tickerDuration time.Duration
wg sync.WaitGroup
Expand All @@ -122,9 +123,8 @@ type Autopilot struct {
}

// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) {
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (_ *Autopilot, err error) {
shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background())

ap := &Autopilot{
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)),
id: id,
Expand All @@ -139,19 +139,12 @@ func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat tim

pruningAlertIDs: make(map[types.FileContractID]types.Hash256),
}
scanner, err := newScanner(
ap,
scannerBatchSize,
scannerNumThreads,
scannerScanInterval,
scannerTimeoutInterval,
scannerTimeoutMinTimeout,
)

ap.s, err = scanner.New(ap.bus, scannerBatchSize, scannerNumThreads, scannerScanInterval, ap.logger)
if err != nil {
return nil, err
return
}

ap.s = scanner
ap.c = contractor.New(bus, bus, ap.logger, revisionSubmissionBuffer, revisionBroadcastInterval)
ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker)
ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval, revisionSubmissionBuffer)
Expand Down Expand Up @@ -253,9 +246,10 @@ func (ap *Autopilot) Run() error {
defer ap.logger.Info("autopilot iteration ended")

// initiate a host scan - no need to be synced or configured for scanning
ap.s.tryUpdateTimeout()
ap.s.tryPerformHostScan(ap.shutdownCtx, w, forceScan)
forceScan = false // reset forceScan
ap.s.Scan(ap.shutdownCtx, w, forceScan)

// reset forceScans
forceScan = false

// block until consensus is synced
if synced, blocked, interrupted := ap.blockUntilSynced(ap.ticker.C); !synced {
Expand All @@ -267,7 +261,7 @@ func (ap *Autopilot) Run() error {
return
} else if blocked {
if scanning, _ := ap.s.Status(); !scanning {
ap.s.tryPerformHostScan(ap.shutdownCtx, w, true)
ap.s.Scan(ap.shutdownCtx, w, true)
}
}

Expand All @@ -288,8 +282,8 @@ func (ap *Autopilot) Run() error {
return
}

// prune hosts that have been offline for too long
ap.s.PruneHosts(ap.shutdownCtx, autopilot.Config.Hosts)
// update the scanner with the hosts config
ap.s.UpdateHostsConfig(autopilot.Config.Hosts)

// Log worker id chosen for this maintenance iteration.
workerID, err := w.ID(ap.shutdownCtx)
Expand Down Expand Up @@ -360,7 +354,7 @@ func (ap *Autopilot) Run() error {
}

// Shutdown shuts down the autopilot.
func (ap *Autopilot) Shutdown(_ context.Context) error {
func (ap *Autopilot) Shutdown(ctx context.Context) error {
ap.startStopMu.Lock()
defer ap.startStopMu.Unlock()

Expand All @@ -369,6 +363,7 @@ func (ap *Autopilot) Shutdown(_ context.Context) error {
ap.shutdownCtxCancel()
close(ap.triggerChan)
ap.wg.Wait()
ap.s.Shutdown(ctx)
ap.startTime = time.Time{}
}
return nil
Expand Down Expand Up @@ -695,8 +690,16 @@ func (ap *Autopilot) configHandlerPUT(jc jape.Context) {
autopilot.Config = cfg
}

// update the autopilot and interrupt migrations if necessary
if err := jc.Check("failed to update autopilot config", ap.bus.UpdateAutopilot(jc.Request.Context(), autopilot)); err == nil && contractSetChanged {
// update the autopilot
if jc.Check("failed to update autopilot config", ap.bus.UpdateAutopilot(jc.Request.Context(), autopilot)) != nil {
return
}

// update the scanner with the hosts config
ap.s.UpdateHostsConfig(cfg.Hosts)

// interrupt migrations if necessary
if contractSetChanged {
ap.m.SignalMaintenanceFinished()
}
}
Expand Down
50 changes: 0 additions & 50 deletions autopilot/percentile.go

This file was deleted.

Loading

0 comments on commit 5abb25b

Please sign in to comment.