Skip to content

Commit

Permalink
Scanner Config (#677)
Browse files Browse the repository at this point in the history
* scanner: move minRecentScanFailures

* db: add migration 00021_defaultMinRecentScanFailures
  • Loading branch information
peterjan authored Nov 9, 2023
1 parent 84c2a6b commit 7126981
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 26 deletions.
7 changes: 4 additions & 3 deletions api/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ type (

// HostsConfig contains all hosts settings used in the autopilot.
HostsConfig struct {
AllowRedundantIPs bool `json:"allowRedundantIPs"`
MaxDowntimeHours uint64 `json:"maxDowntimeHours"`
ScoreOverrides map[types.PublicKey]float64 `json:"scoreOverrides"`
AllowRedundantIPs bool `json:"allowRedundantIPs"`
MaxDowntimeHours uint64 `json:"maxDowntimeHours"`
MinRecentScanFailures uint64 `json:"minRecentScanFailures"`
ScoreOverrides map[types.PublicKey]float64 `json:"scoreOverrides"`
}

// WalletConfig contains all wallet settings used in the autopilot.
Expand Down
3 changes: 1 addition & 2 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (ap *Autopilot) triggerHandlerPOST(jc jape.Context) {
}

// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerMinRecentFailures, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) {
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) {
ap := &Autopilot{
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)),
id: id,
Expand All @@ -603,7 +603,6 @@ func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat tim
scanner, err := newScanner(
ap,
scannerBatchSize,
scannerMinRecentFailures,
scannerNumThreads,
scannerScanInterval,
scannerTimeoutInterval,
Expand Down
3 changes: 2 additions & 1 deletion autopilot/hostscore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ var cfg = api.AutopilotConfig{
Set: api.DefaultAutopilotID,
},
Hosts: api.HostsConfig{
MaxDowntimeHours: 24 * 7 * 2,
MaxDowntimeHours: 24 * 7 * 2,
MinRecentScanFailures: 10,
},
Wallet: api.WalletConfig{
DefragThreshold: 1000,
Expand Down
22 changes: 11 additions & 11 deletions autopilot/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ type (
ap *Autopilot
wg sync.WaitGroup

scanBatchSize uint64
scanThreads uint64
scanMinInterval time.Duration
scanMinRecentFailures uint64
scanBatchSize uint64
scanThreads uint64
scanMinInterval time.Duration

timeoutMinInterval time.Duration
timeoutMinTimeout time.Duration
Expand Down Expand Up @@ -119,7 +118,7 @@ func (t *tracker) timeout() time.Duration {
return time.Duration(percentile) * time.Millisecond
}

func newScanner(ap *Autopilot, scanBatchSize, scanMinRecentFailures, scanThreads uint64, scanMinInterval, timeoutMinInterval, timeoutMinTimeout time.Duration) (*scanner, error) {
func newScanner(ap *Autopilot, scanBatchSize, scanThreads uint64, scanMinInterval, timeoutMinInterval, timeoutMinTimeout time.Duration) (*scanner, error) {
if scanBatchSize == 0 {
return nil, errors.New("scanner batch size has to be greater than zero")
}
Expand All @@ -139,10 +138,9 @@ func newScanner(ap *Autopilot, scanBatchSize, scanMinRecentFailures, scanThreads

interruptScanChan: make(chan struct{}),

scanBatchSize: scanBatchSize,
scanThreads: scanThreads,
scanMinInterval: scanMinInterval,
scanMinRecentFailures: scanMinRecentFailures,
scanBatchSize: scanBatchSize,
scanThreads: scanThreads,
scanMinInterval: scanMinInterval,

timeoutMinInterval: timeoutMinInterval,
timeoutMinTimeout: timeoutMinTimeout,
Expand Down Expand Up @@ -193,7 +191,9 @@ func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bo
s.mu.Unlock()

s.logger.Infof("%s started", scanType)
maxDowntime := time.Duration(s.ap.State().cfg.Hosts.MaxDowntimeHours) * time.Hour
hostCfg := s.ap.State().cfg.Hosts
maxDowntime := time.Duration(hostCfg.MaxDowntimeHours) * time.Hour
minRecentScanFailures := hostCfg.MinRecentScanFailures

s.wg.Add(1)
go func(st string) {
Expand All @@ -212,7 +212,7 @@ func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bo

if !interrupted && maxDowntime > 0 {
s.logger.Debugf("removing hosts that have been offline for more than %v", maxDowntime)
removed, err := s.bus.RemoveOfflineHosts(ctx, s.scanMinRecentFailures, maxDowntime)
removed, err := s.bus.RemoveOfflineHosts(ctx, minRecentScanFailures, maxDowntime)
if err != nil {
s.logger.Errorf("error occurred while removing offline hosts, err: %v", err)
} else if removed > 0 {
Expand Down
4 changes: 4 additions & 0 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ func (b *bus) hostsRemoveHandlerPOST(jc jape.Context) {
jc.Error(errors.New("maxDowntime must be non-zero"), http.StatusBadRequest)
return
}
if hrr.MinRecentScanFailures == 0 {
jc.Error(errors.New("minRecentScanFailures must be non-zero"), http.StatusBadRequest)
return
}
removed, err := b.hdb.RemoveOfflineHosts(jc.Request.Context(), hrr.MinRecentScanFailures, time.Duration(hrr.MaxDowntimeHours))
if jc.Check("couldn't remove offline hosts", err) != nil {
return
Expand Down
2 changes: 0 additions & 2 deletions cmd/renterd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ var (
RevisionBroadcastInterval: 7 * 24 * time.Hour,
ScannerBatchSize: 1000,
ScannerInterval: 24 * time.Hour,
ScannerMinRecentFailures: 10,
ScannerNumThreads: 100,
MigratorParallelSlabsPerWorker: 1,
},
Expand Down Expand Up @@ -302,7 +301,6 @@ func main() {
flag.DurationVar(&cfg.Autopilot.RevisionBroadcastInterval, "autopilot.revisionBroadcastInterval", cfg.Autopilot.RevisionBroadcastInterval, "interval at which the autopilot broadcasts contract revisions to be mined - can be overwritten using the RENTERD_AUTOPILOT_REVISION_BROADCAST_INTERVAL environment variable - setting it to 0 will disable this feature")
flag.Uint64Var(&cfg.Autopilot.ScannerBatchSize, "autopilot.scannerBatchSize", cfg.Autopilot.ScannerBatchSize, "size of the batch with which hosts are scanned")
flag.DurationVar(&cfg.Autopilot.ScannerInterval, "autopilot.scannerInterval", cfg.Autopilot.ScannerInterval, "interval at which hosts are scanned")
flag.Uint64Var(&cfg.Autopilot.ScannerMinRecentFailures, "autopilot.scannerMinRecentFailures", cfg.Autopilot.ScannerMinRecentFailures, "minimum amount of consesutive failed scans a host must have before it is removed for exceeding the max downtime")
flag.Uint64Var(&cfg.Autopilot.ScannerNumThreads, "autopilot.scannerNumThreads", cfg.Autopilot.ScannerNumThreads, "number of threads that scan hosts")
flag.Uint64Var(&cfg.Autopilot.MigratorParallelSlabsPerWorker, "autopilot.migratorParallelSlabsPerWorker", cfg.Autopilot.MigratorParallelSlabsPerWorker, "number of slabs that the autopilot migrates in parallel per worker. Can be overwritten using the RENTERD_MIGRATOR_PARALLEL_SLABS_PER_WORKER environment variable")
flag.BoolVar(&cfg.Autopilot.Enabled, "autopilot.enabled", cfg.Autopilot.Enabled, "enable/disable the autopilot - can be overwritten using the RENTERD_AUTOPILOT_ENABLED environment variable")
Expand Down
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ type (
RevisionSubmissionBuffer uint64 `yaml:"revisionSubmissionBuffer"`
ScannerInterval time.Duration `yaml:"scannerInterval"`
ScannerBatchSize uint64 `yaml:"scannerBatchSize"`
ScannerMinRecentFailures uint64 `yaml:"scannerMinRecentFailures"`
ScannerNumThreads uint64 `yaml:"scannerNumThreads"`
MigratorParallelSlabsPerWorker uint64 `yaml:"migratorParallelSlabsPerWorker"`
}
Expand Down
2 changes: 1 addition & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewWorker(cfg config.Worker, b worker.Bus, seed types.PrivateKey, l *zap.Lo
}

func NewAutopilot(cfg AutopilotConfig, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) {
ap, err := autopilot.New(cfg.ID, b, workers, l, cfg.Heartbeat, cfg.ScannerInterval, cfg.ScannerBatchSize, cfg.ScannerMinRecentFailures, cfg.ScannerNumThreads, cfg.MigrationHealthCutoff, cfg.AccountsRefillInterval, cfg.RevisionSubmissionBuffer, cfg.MigratorParallelSlabsPerWorker, cfg.RevisionBroadcastInterval)
ap, err := autopilot.New(cfg.ID, b, workers, l, cfg.Heartbeat, cfg.ScannerInterval, cfg.ScannerBatchSize, cfg.ScannerNumThreads, cfg.MigrationHealthCutoff, cfg.AccountsRefillInterval, cfg.RevisionSubmissionBuffer, cfg.MigratorParallelSlabsPerWorker, cfg.RevisionBroadcastInterval)
if err != nil {
return nil, nil, nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/testing/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ var (
Set: testContractSet,
},
Hosts: api.HostsConfig{
MaxDowntimeHours: 10,
AllowRedundantIPs: true, // allow for integration tests by default
MaxDowntimeHours: 10,
MinRecentScanFailures: 10,
AllowRedundantIPs: true, // allow for integration tests by default
},
}

Expand Down Expand Up @@ -960,7 +961,6 @@ func testApCfg() node.AutopilotConfig {
ScannerInterval: time.Second,
ScannerBatchSize: 10,
ScannerNumThreads: 1,
ScannerMinRecentFailures: 5,
},
}
}
5 changes: 3 additions & 2 deletions stores/autopilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ func TestAutopilotStore(t *testing.T) {
Set: testContractSet,
},
Hosts: api.HostsConfig{
MaxDowntimeHours: 10,
AllowRedundantIPs: true, // allow for integration tests by default
MaxDowntimeHours: 10,
MinRecentScanFailures: 10,
AllowRedundantIPs: true, // allow for integration tests by default
},
Wallet: api.WalletConfig{
DefragThreshold: 1234,
Expand Down
29 changes: 29 additions & 0 deletions stores/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error {
return performMigration00022_extendObjectID(tx, logger)
},
},
{
ID: "00023_defaultMinRecentScanFailures",
Migrate: func(tx *gorm.DB) error {
return performMigration00023_defaultMinRecentScanFailures(tx, logger)
},
},
}
// Create migrator.
m := gormigrate.New(db, gormigrate.DefaultOptions, migrations)
Expand Down Expand Up @@ -995,3 +1001,26 @@ func performMigration00022_extendObjectID(txn *gorm.DB, logger *zap.SugaredLogge
logger.Info("migration 00022_extendObjectID complete")
return nil
}

func performMigration00023_defaultMinRecentScanFailures(txn *gorm.DB, logger *zap.SugaredLogger) error {
logger.Info("performing migration 00023_defaultMinRecentScanFailures")

var autopilots []dbAutopilot
if err := txn.Model(&dbAutopilot{}).Find(&autopilots).Error; err != nil {
return err
}

for _, autopilot := range autopilots {
if autopilot.Config.Hosts.MinRecentScanFailures == 0 {
autopilot.Config.Hosts.MinRecentScanFailures = 10
if err := txn.Save(&autopilot).Error; err != nil {
logger.Errorf("failed to set default value for MinRecentScanFailures on autopilot '%v', err: %v", autopilot.Identifier, err)
return err
}
logger.Debugf("successfully defaulted MinRecentScanFailures to 10 on autopilot '%v'", autopilot.Identifier)
}
}

logger.Info("migration 00023_defaultMinRecentScanFailures complete")
return nil
}

0 comments on commit 7126981

Please sign in to comment.