Skip to content

Commit

Permalink
autopilot: add stop ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Nov 29, 2023
1 parent 2d74e9c commit f9ff83b
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 89 deletions.
131 changes: 46 additions & 85 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/webhooks"
"go.uber.org/zap"
"lukechampine.com/frand"
)

type Bus interface {
Expand Down Expand Up @@ -91,22 +90,6 @@ type Bus interface {
WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (id types.TransactionID, err error)
}

type Worker interface {
Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error)
Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error)
ID(ctx context.Context) (string, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error)

RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error)
RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error)
RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error)
RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (hostdb.HostPriceTable, error)
RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error)
RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, hostIP string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error)
RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error)
RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error)
}

type Autopilot struct {
id string

Expand All @@ -126,11 +109,13 @@ type Autopilot struct {
stateMu sync.Mutex
state state

startStopMu sync.Mutex
startTime time.Time
stopChan chan struct{}
ticker *time.Ticker
triggerChan chan bool
startStopMu sync.Mutex
startTime time.Time
stopCtx context.Context
stopCtxCancel context.CancelFunc
stopChan chan struct{}
ticker *time.Ticker
triggerChan chan bool
}

// state holds a bunch of variables that are used by the autopilot and updated
Expand All @@ -144,36 +129,35 @@ type state struct {
period uint64
}

// workerPool contains all workers known to the autopilot. Users can call
// withWorker to execute a function with a worker of the pool or withWorkers to
// sequentially run a function on all workers. Due to the RWMutex this will
// never block during normal operations. However, during an update of the
// workerpool, this allows us to guarantee that all workers have finished their
// tasks by calling acquiring an exclusive lock on the pool before updating it.
// That way the caller who updated the pool can rely on the autopilot not using
// a worker that was removed during the update after the update operation
// returns.
type workerPool struct {
mu sync.RWMutex
workers []Worker
}
// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) {
ap := &Autopilot{
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)),
id: id,
bus: bus,
logger: logger.Sugar().Named(api.DefaultAutopilotID),
workers: newWorkerPool(workers),

func newWorkerPool(workers []Worker) *workerPool {
return &workerPool{
workers: workers,
tickerDuration: heartbeat,
}
scanner, err := newScanner(
ap,
scannerBatchSize,
scannerNumThreads,
scannerScanInterval,
scannerTimeoutInterval,
scannerTimeoutMinTimeout,
)
if err != nil {
return nil, err
}
}

func (wp *workerPool) withWorker(workerFunc func(Worker)) {
wp.mu.RLock()
defer wp.mu.RUnlock()
workerFunc(wp.workers[frand.Intn(len(wp.workers))])
}
ap.s = scanner
ap.c = newContractor(ap, revisionSubmissionBuffer, revisionBroadcastInterval)
ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker)
ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval)

func (wp *workerPool) withWorkers(workerFunc func([]Worker)) {
wp.mu.RLock()
defer wp.mu.RUnlock()
workerFunc(wp.workers)
return ap, nil
}

// Handler returns an HTTP handler that serves the autopilot api.
Expand All @@ -194,6 +178,7 @@ func (ap *Autopilot) Run() error {
ap.startStopMu.Unlock()
return errors.New("already running")
}
ap.stopCtx, ap.stopCtxCancel = context.WithCancel(context.Background())
ap.startTime = time.Now()
ap.stopChan = make(chan struct{})
ap.triggerChan = make(chan bool, 1)
Expand Down Expand Up @@ -338,12 +323,19 @@ func (ap *Autopilot) Shutdown(_ context.Context) error {
ap.ticker.Stop()
close(ap.stopChan)
close(ap.triggerChan)
ap.stopCtxCancel()
ap.wg.Wait()
ap.startTime = time.Time{}
}
return nil
}

func (ap *Autopilot) StartTime() time.Time {
ap.startStopMu.Lock()
defer ap.startStopMu.Unlock()
return ap.startTime
}

func (ap *Autopilot) State() state {
ap.stateMu.Lock()
defer ap.stateMu.Unlock()
Expand All @@ -362,12 +354,6 @@ func (ap *Autopilot) Trigger(forceScan bool) bool {
}
}

func (ap *Autopilot) StartTime() time.Time {
ap.startStopMu.Lock()
defer ap.startStopMu.Unlock()
return ap.startTime
}

func (ap *Autopilot) Uptime() (dur time.Duration) {
ap.startStopMu.Lock()
defer ap.startStopMu.Unlock()
Expand Down Expand Up @@ -585,6 +571,12 @@ func (ap *Autopilot) updateState(ctx context.Context) error {
return nil
}

func (ap *Autopilot) withTimeout(fn func(ctx context.Context), timeout time.Duration) {
ctx, cancel := context.WithTimeout(ap.stopCtx, timeout)
defer cancel()
fn(ctx)
}

func (ap *Autopilot) isStopped() bool {
select {
case <-ap.stopChan:
Expand Down Expand Up @@ -643,37 +635,6 @@ func (ap *Autopilot) triggerHandlerPOST(jc jape.Context) {
})
}

// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) {
ap := &Autopilot{
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)),
id: id,
bus: bus,
logger: logger.Sugar().Named(api.DefaultAutopilotID),
workers: newWorkerPool(workers),

tickerDuration: heartbeat,
}
scanner, err := newScanner(
ap,
scannerBatchSize,
scannerNumThreads,
scannerScanInterval,
scannerTimeoutInterval,
scannerTimeoutMinTimeout,
)
if err != nil {
return nil, err
}

ap.s = scanner
ap.c = newContractor(ap, revisionSubmissionBuffer, revisionBroadcastInterval)
ap.m = newMigrator(ap, migrationHealthCutoff, migratorParallelSlabsPerWorker)
ap.a = newAccounts(ap, ap.bus, ap.bus, ap.workers, ap.logger, accountsRefillInterval)

return ap, nil
}

func (ap *Autopilot) hostHandlerGET(jc jape.Context) {
var hostKey types.PublicKey
if jc.DecodeParam("hostKey", &hostKey) != nil {
Expand Down
7 changes: 6 additions & 1 deletion autopilot/contract_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (c *contractor) performContractPruning(wp *workerPool) {
var metrics pruneMetrics
wp.withWorker(func(w Worker) {
for _, contract := range res.Contracts {
// return if we're stopped
if c.ap.isStopped() {
return
}

// skip if there's nothing to prune
if contract.Prunable == 0 {
continue
Expand Down Expand Up @@ -165,7 +170,7 @@ func (c *contractor) performContractPruning(wp *workerPool) {

func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneResult {
// create a sane timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*timeoutPruneContract)
ctx, cancel := context.WithTimeout(c.ap.stopCtx, 2*timeoutPruneContract)
defer cancel()

// fetch the host
Expand Down
7 changes: 4 additions & 3 deletions autopilot/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ func (s *scanner) isScanning() bool {
}

func newTestScanner(b *mockBus, w *mockWorker) *scanner {
ap := &Autopilot{
stopChan: make(chan struct{}),
}
ap := &Autopilot{}
ap.stopChan = make(chan struct{})
ap.stopCtx, ap.stopCtxCancel = context.WithCancel(context.Background())

return &scanner{
ap: ap,
bus: b,
Expand Down
63 changes: 63 additions & 0 deletions autopilot/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package autopilot

import (
"context"
"sync"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/object"
"lukechampine.com/frand"
)

type Worker interface {
Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error)
Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error)
ID(ctx context.Context) (string, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error)

RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error)
RHPForm(ctx context.Context, endHeight uint64, hk types.PublicKey, hostIP string, renterAddress types.Address, renterFunds types.Currency, hostCollateral types.Currency) (rhpv2.ContractRevision, []types.Transaction, error)
RHPFund(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string, balance types.Currency) (err error)
RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (hostdb.HostPriceTable, error)
RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (prunable, pruned, remaining uint64, err error)
RHPRenew(ctx context.Context, fcid types.FileContractID, endHeight uint64, hk types.PublicKey, hostIP string, hostAddress, renterAddress types.Address, renterFunds, newCollateral types.Currency, windowSize uint64) (rhpv2.ContractRevision, []types.Transaction, error)
RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error)
RHPSync(ctx context.Context, contractID types.FileContractID, hostKey types.PublicKey, hostIP, siamuxAddr string) (err error)
}

// workerPool contains all workers known to the autopilot. Users can call
// withWorker to execute a function with a worker of the pool or withWorkers to
// sequentially run a function on all workers. Due to the RWMutex this will
// never block during normal operations. However, during an update of the
// workerpool, this allows us to guarantee that all workers have finished their
// tasks by calling acquiring an exclusive lock on the pool before updating it.
// That way the caller who updated the pool can rely on the autopilot not using
// a worker that was removed during the update after the update operation
// returns.
type workerPool struct {
mu sync.RWMutex
workers []Worker
}

func newWorkerPool(workers []Worker) *workerPool {
return &workerPool{
workers: workers,
}
}

func (wp *workerPool) withWorker(workerFunc func(Worker)) {
wp.mu.RLock()
defer wp.mu.RUnlock()
workerFunc(wp.workers[frand.Intn(len(wp.workers))])
}

func (wp *workerPool) withWorkers(workerFunc func([]Worker)) {
wp.mu.RLock()
defer wp.mu.RUnlock()
workerFunc(wp.workers)
}

0 comments on commit f9ff83b

Please sign in to comment.