diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index c61721a33..77fd67d63 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -24,7 +24,6 @@ import ( "go.sia.tech/renterd/wallet" "go.sia.tech/renterd/webhooks" "go.uber.org/zap" - "lukechampine.com/frand" ) type Bus interface { @@ -91,22 +90,6 @@ type Bus interface { WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (id types.TransactionID, err error) } -type Worker interface { - Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error) - Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error) - ID(ctx context.Context) (string, error) - MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error) - - RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error) - RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error) - RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error) - RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (hostdb.HostPriceTable, error) - RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error) - RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, hostIP string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error) - RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) - RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error) -} - type Autopilot struct { id string @@ -126,11 +109,13 @@ type Autopilot struct { stateMu sync.Mutex state state - startStopMu sync.Mutex - startTime time.Time - stopChan chan struct{} - ticker *time.Ticker - triggerChan chan bool + startStopMu sync.Mutex + startTime time.Time + stopCtx context.Context + stopCtxCancel context.CancelFunc + stopChan chan struct{} + ticker *time.Ticker + triggerChan chan bool } // state holds a bunch of variables that are used by the autopilot and updated @@ -144,36 +129,35 @@ type state struct { period uint64 } -// workerPool contains all workers known to the autopilot. Users can call -// withWorker to execute a function with a worker of the pool or withWorkers to -// sequentially run a function on all workers. Due to the RWMutex this will -// never block during normal operations. However, during an update of the -// workerpool, this allows us to guarantee that all workers have finished their -// tasks by calling acquiring an exclusive lock on the pool before updating it. -// That way the caller who updated the pool can rely on the autopilot not using -// a worker that was removed during the update after the update operation -// returns. -type workerPool struct { - mu sync.RWMutex - workers []Worker -} +// New initializes an Autopilot. +func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) { + ap := &Autopilot{ + alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)), + id: id, + bus: bus, + logger: logger.Sugar().Named(api.DefaultAutopilotID), + workers: newWorkerPool(workers), -func newWorkerPool(workers []Worker) *workerPool { - return &workerPool{ - workers: workers, + tickerDuration: heartbeat, + } + scanner, err := newScanner( + ap, + scannerBatchSize, + scannerNumThreads, + scannerScanInterval, + scannerTimeoutInterval, + scannerTimeoutMinTimeout, + ) + if err != nil { + return nil, err } -} -func (wp *workerPool) withWorker(workerFunc func(Worker)) { - wp.mu.RLock() - defer wp.mu.RUnlock() - workerFunc(wp.workers[frand.Intn(len(wp.workers))]) -} + ap.s = scanner + ap.c = newContractor(ap, revisionSubmissionBuffer, revisionBroadcastInterval) + ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker) + ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval) -func (wp *workerPool) withWorkers(workerFunc func([]Worker)) { - wp.mu.RLock() - defer wp.mu.RUnlock() - workerFunc(wp.workers) + return ap, nil } // Handler returns an HTTP handler that serves the autopilot api. @@ -194,6 +178,7 @@ func (ap *Autopilot) Run() error { ap.startStopMu.Unlock() return errors.New("already running") } + ap.stopCtx, ap.stopCtxCancel = context.WithCancel(context.Background()) ap.startTime = time.Now() ap.stopChan = make(chan struct{}) ap.triggerChan = make(chan bool, 1) @@ -338,12 +323,19 @@ func (ap *Autopilot) Shutdown(_ context.Context) error { ap.ticker.Stop() close(ap.stopChan) close(ap.triggerChan) + ap.stopCtxCancel() ap.wg.Wait() ap.startTime = time.Time{} } return nil } +func (ap *Autopilot) StartTime() time.Time { + ap.startStopMu.Lock() + defer ap.startStopMu.Unlock() + return ap.startTime +} + func (ap *Autopilot) State() state { ap.stateMu.Lock() defer ap.stateMu.Unlock() @@ -362,12 +354,6 @@ func (ap *Autopilot) Trigger(forceScan bool) bool { } } -func (ap *Autopilot) StartTime() time.Time { - ap.startStopMu.Lock() - defer ap.startStopMu.Unlock() - return ap.startTime -} - func (ap *Autopilot) Uptime() (dur time.Duration) { ap.startStopMu.Lock() defer ap.startStopMu.Unlock() @@ -585,6 +571,12 @@ func (ap *Autopilot) updateState(ctx context.Context) error { return nil } +func (ap *Autopilot) withTimeout(fn func(ctx context.Context), timeout time.Duration) { + ctx, cancel := context.WithTimeout(ap.stopCtx, timeout) + defer cancel() + fn(ctx) +} + func (ap *Autopilot) isStopped() bool { select { case <-ap.stopChan: @@ -643,37 +635,6 @@ func (ap *Autopilot) triggerHandlerPOST(jc jape.Context) { }) } -// New initializes an Autopilot. -func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) { - ap := &Autopilot{ - alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)), - id: id, - bus: bus, - logger: logger.Sugar().Named(api.DefaultAutopilotID), - workers: newWorkerPool(workers), - - tickerDuration: heartbeat, - } - scanner, err := newScanner( - ap, - scannerBatchSize, - scannerNumThreads, - scannerScanInterval, - scannerTimeoutInterval, - scannerTimeoutMinTimeout, - ) - if err != nil { - return nil, err - } - - ap.s = scanner - ap.c = newContractor(ap, revisionSubmissionBuffer, revisionBroadcastInterval) - ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker) - ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval) - - return ap, nil -} - func (ap *Autopilot) hostHandlerGET(jc jape.Context) { var hostKey types.PublicKey if jc.DecodeParam("hostKey", &hostKey) != nil { diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index 243db548a..7230fcae4 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -125,6 +125,11 @@ func (c *contractor) performContractPruning(wp *workerPool) { var metrics pruneMetrics wp.withWorker(func(w Worker) { for _, contract := range res.Contracts { + // return if we're stopped + if c.ap.isStopped() { + return + } + // skip if there's nothing to prune if contract.Prunable == 0 { continue @@ -165,7 +170,7 @@ func (c *contractor) performContractPruning(wp *workerPool) { func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneResult { // create a sane timeout - ctx, cancel := context.WithTimeout(context.Background(), 2*timeoutPruneContract) + ctx, cancel := context.WithTimeout(c.ap.stopCtx, 2*timeoutPruneContract) defer cancel() // fetch the host diff --git a/autopilot/scanner_test.go b/autopilot/scanner_test.go index b30aea014..6d6f51df0 100644 --- a/autopilot/scanner_test.go +++ b/autopilot/scanner_test.go @@ -140,9 +140,10 @@ func (s *scanner) isScanning() bool { } func newTestScanner(b *mockBus, w *mockWorker) *scanner { - ap := &Autopilot{ - stopChan: make(chan struct{}), - } + ap := &Autopilot{} + ap.stopChan = make(chan struct{}) + ap.stopCtx, ap.stopCtxCancel = context.WithCancel(context.Background()) + return &scanner{ ap: ap, bus: b, diff --git a/autopilot/workerpool.go b/autopilot/workerpool.go new file mode 100644 index 000000000..d50cdc397 --- /dev/null +++ b/autopilot/workerpool.go @@ -0,0 +1,63 @@ +package autopilot + +import ( + "context" + "sync" + "time" + + rhpv2 "go.sia.tech/core/rhp/v2" + rhpv3 "go.sia.tech/core/rhp/v3" + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/hostdb" + "go.sia.tech/renterd/object" + "lukechampine.com/frand" +) + +type Worker interface { + Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error) + Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error) + ID(ctx context.Context) (string, error) + MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error) + + RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error) + RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error) + RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error) + RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (hostdb.HostPriceTable, error) + RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error) + RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, hostIP string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error) + RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) + RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error) +} + +// workerPool contains all workers known to the autopilot. Users can call +// withWorker to execute a function with a worker of the pool or withWorkers to +// sequentially run a function on all workers. Due to the RWMutex this will +// never block during normal operations. However, during an update of the +// workerpool, this allows us to guarantee that all workers have finished their +// tasks by calling acquiring an exclusive lock on the pool before updating it. +// That way the caller who updated the pool can rely on the autopilot not using +// a worker that was removed during the update after the update operation +// returns. +type workerPool struct { + mu sync.RWMutex + workers []Worker +} + +func newWorkerPool(workers []Worker) *workerPool { + return &workerPool{ + workers: workers, + } +} + +func (wp *workerPool) withWorker(workerFunc func(Worker)) { + wp.mu.RLock() + defer wp.mu.RUnlock() + workerFunc(wp.workers[frand.Intn(len(wp.workers))]) +} + +func (wp *workerPool) withWorkers(workerFunc func([]Worker)) { + wp.mu.RLock() + defer wp.mu.RUnlock() + workerFunc(wp.workers) +}