Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of Context.Background()'s #827

Merged
merged 7 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 2 additions & 29 deletions autopilot/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/tracing"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -95,7 +92,7 @@ func (a *accounts) refillWorkersAccountsLoop(ctx context.Context) {
}

a.w.withWorker(func(w Worker) {
a.refillWorkerAccounts(w)
a.refillWorkerAccounts(ctx, w)
})
}
}
Expand All @@ -105,27 +102,20 @@ func (a *accounts) refillWorkersAccountsLoop(ctx context.Context) {
// is used for every host. If a slow host's account is still being refilled by a
// goroutine from a previous call, refillWorkerAccounts will skip that account
// until the previously launched goroutine returns.
func (a *accounts) refillWorkerAccounts(w Worker) {
ctx, span := tracing.Tracer.Start(context.Background(), "refillWorkerAccounts")
defer span.End()

func (a *accounts) refillWorkerAccounts(ctx context.Context, w Worker) {
// fetch config
state := a.ap.State()

// fetch worker id
workerID, err := w.ID(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to fetch worker id")
a.l.Errorw(fmt.Sprintf("failed to fetch worker id for refill: %v", err))
return
}

// fetch all contracts
contracts, err := a.c.Contracts(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to fetch contracts")
a.l.Errorw(fmt.Sprintf("failed to fetch contracts for refill: %v", err))
return
} else if len(contracts) == 0 {
Expand All @@ -135,8 +125,6 @@ func (a *accounts) refillWorkerAccounts(w Worker) {
// fetch all contract set contracts
contractSetContracts, err := a.c.ContractSetContracts(ctx, state.cfg.Contracts.Set)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, fmt.Sprintf("failed to fetch contracts for set '%s'", state.cfg.Contracts.Set))
a.l.Errorw(fmt.Sprintf("failed to fetch contract set contracts: %v", err))
return
}
Expand Down Expand Up @@ -212,17 +200,6 @@ func refillWorkerAccount(ctx context.Context, a AccountStore, w Worker, workerID
}
}

// add tracing
ctx, span := tracing.Tracer.Start(ctx, "refillAccount")
span.SetAttributes(attribute.Stringer("host", contract.HostKey))
defer func() {
if rerr != nil {
span.RecordError(rerr.err)
span.SetStatus(codes.Error, "failed to refill account")
}
span.End()
}()

// fetch the account
accountID, err := w.Account(ctx, contract.HostKey)
if err != nil {
Expand All @@ -236,10 +213,6 @@ func refillWorkerAccount(ctx context.Context, a AccountStore, w Worker, workerID
return
}

// update span
span.SetAttributes(attribute.Stringer("account", account.ID))
span.SetAttributes(attribute.Stringer("balance", account.Balance))

// check if a host is potentially cheating before refilling.
// We only check against the max drift if the account's drift is
// negative because we don't care if we have more money than
Expand Down
59 changes: 31 additions & 28 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
Expand All @@ -20,7 +19,6 @@ import (
"go.sia.tech/renterd/build"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/tracing"
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/webhooks"
"go.uber.org/zap"
Expand Down Expand Up @@ -109,12 +107,12 @@ type Autopilot struct {
stateMu sync.Mutex
state state

startStopMu sync.Mutex
startTime time.Time
stopCtx context.Context
stopCtxCancel context.CancelFunc
ticker *time.Ticker
triggerChan chan bool
startStopMu sync.Mutex
startTime time.Time
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
ticker *time.Ticker
triggerChan chan bool
}

// state holds a bunch of variables that are used by the autopilot and updated
Expand All @@ -130,13 +128,18 @@ type state struct {

// New initializes an Autopilot.
func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat time.Duration, scannerScanInterval time.Duration, scannerBatchSize, scannerNumThreads uint64, migrationHealthCutoff float64, accountsRefillInterval time.Duration, revisionSubmissionBuffer, migratorParallelSlabsPerWorker uint64, revisionBroadcastInterval time.Duration) (*Autopilot, error) {
shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background())

ap := &Autopilot{
alerts: alerts.WithOrigin(bus, fmt.Sprintf("autopilot.%s", id)),
id: id,
bus: bus,
logger: logger.Sugar().Named(api.DefaultAutopilotID),
workers: newWorkerPool(workers),

shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,

tickerDuration: heartbeat,
}
scanner, err := newScanner(
Expand All @@ -161,14 +164,14 @@ func New(id string, bus Bus, workers []Worker, logger *zap.Logger, heartbeat tim

// Handler returns an HTTP handler that serves the autopilot api.
func (ap *Autopilot) Handler() http.Handler {
return jape.Mux(tracing.TracingMiddleware(api.DefaultAutopilotID, map[string]jape.Handler{
return jape.Mux(map[string]jape.Handler{
"GET /config": ap.configHandlerGET,
"PUT /config": ap.configHandlerPUT,
"POST /hosts": ap.hostsHandlerPOST,
"GET /host/:hostKey": ap.hostHandlerGET,
"GET /state": ap.stateHandlerGET,
"POST /trigger": ap.triggerHandlerPOST,
}))
})
}

func (ap *Autopilot) Run() error {
Expand All @@ -178,7 +181,6 @@ func (ap *Autopilot) Run() error {
return errors.New("already running")
}
ap.startTime = time.Now()
ap.stopCtx, ap.stopCtxCancel = context.WithCancel(context.Background())
ap.triggerChan = make(chan bool, 1)
ap.ticker = time.NewTicker(ap.tickerDuration)

Expand All @@ -205,8 +207,10 @@ func (ap *Autopilot) Run() error {
tickerFired := make(chan struct{})
ap.workers.withWorker(func(w Worker) {
defer ap.logger.Info("autopilot iteration ended")
ctx, span := tracing.Tracer.Start(context.Background(), "Autopilot Iteration")
defer span.End()

// create a new context for this iteration
ctx, cancel := context.WithCancel(ap.shutdownCtx)
defer cancel()

// initiate a host scan - no need to be synced or configured for scanning
ap.s.tryUpdateTimeout()
Expand Down Expand Up @@ -239,13 +243,12 @@ func (ap *Autopilot) Run() error {
return
}

// Trace/Log worker id chosen for this maintenance iteration.
// Log worker id chosen for this maintenance iteration.
workerID, err := w.ID(ctx)
if err != nil {
ap.logger.Errorf("aborting maintenance, failed to fetch worker id, err: %v", err)
return
}
span.SetAttributes(attribute.String("worker", workerID))
ap.logger.Infof("using worker %s for iteration", workerID)

// update the loop state
Expand Down Expand Up @@ -283,7 +286,7 @@ func (ap *Autopilot) Run() error {
if maintenanceSuccess {
launchAccountRefillsOnce.Do(func() {
ap.logger.Debug("account refills loop launched")
go ap.a.refillWorkersAccountsLoop(ap.stopCtx)
go ap.a.refillWorkersAccountsLoop(ap.shutdownCtx)
})
} else {
ap.logger.Errorf("contract maintenance failed, err: %v", err)
Expand All @@ -301,7 +304,7 @@ func (ap *Autopilot) Run() error {
})

select {
case <-ap.stopCtx.Done():
case <-ap.shutdownCtx.Done():
return nil
case forceScan = <-ap.triggerChan:
ap.logger.Info("autopilot iteration triggered")
Expand All @@ -319,7 +322,7 @@ func (ap *Autopilot) Shutdown(_ context.Context) error {

if ap.isRunning() {
ap.ticker.Stop()
ap.stopCtxCancel()
ap.shutdownCtxCancel()
close(ap.triggerChan)
ap.wg.Wait()
ap.startTime = time.Time{}
Expand Down Expand Up @@ -368,7 +371,7 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configure

for {
// try and fetch the config
ctx, cancel := context.WithTimeout(ap.stopCtx, 30*time.Second)
ctx, cancel := context.WithTimeout(ap.shutdownCtx, 30*time.Second)
_, err := ap.bus.Autopilot(ctx, ap.id)
cancel()

Expand All @@ -380,7 +383,7 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configure
}
if err != nil {
select {
case <-ap.stopCtx.Done():
case <-ap.shutdownCtx.Done():
return false, false
case <-interrupt:
return false, true
Expand All @@ -399,7 +402,7 @@ func (ap *Autopilot) blockUntilOnline() (online bool) {
var once sync.Once

for {
ctx, cancel := context.WithTimeout(ap.stopCtx, 30*time.Second)
ctx, cancel := context.WithTimeout(ap.shutdownCtx, 30*time.Second)
peers, err := ap.bus.SyncerPeers(ctx)
online = len(peers) > 0
cancel()
Expand All @@ -412,7 +415,7 @@ func (ap *Autopilot) blockUntilOnline() (online bool) {

if err != nil || !online {
select {
case <-ap.stopCtx.Done():
case <-ap.shutdownCtx.Done():
return
case <-ticker.C:
continue
Expand All @@ -430,7 +433,7 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, block

for {
// try and fetch consensus
ctx, cancel := context.WithTimeout(ap.stopCtx, 30*time.Second)
ctx, cancel := context.WithTimeout(ap.shutdownCtx, 30*time.Second)
cs, err := ap.bus.ConsensusState(ctx)
synced = cs.Synced
cancel()
Expand All @@ -445,7 +448,7 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, block
if err != nil || !synced {
blocked = true
select {
case <-ap.stopCtx.Done():
case <-ap.shutdownCtx.Done():
return
case <-interrupt:
interrupted = true
Expand All @@ -459,7 +462,7 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, block
}

func (ap *Autopilot) tryScheduleTriggerWhenFunded() error {
ctx, cancel := context.WithTimeout(ap.stopCtx, 30*time.Second)
ctx, cancel := context.WithTimeout(ap.shutdownCtx, 30*time.Second)
wallet, err := ap.bus.Wallet(ctx)
cancel()

Expand All @@ -480,13 +483,13 @@ func (ap *Autopilot) tryScheduleTriggerWhenFunded() error {
defer ticker.Stop()
for {
select {
case <-ap.stopCtx.Done():
case <-ap.shutdownCtx.Done():
return
case <-ticker.C:
}

// fetch wallet info
ctx, cancel := context.WithTimeout(ap.stopCtx, 30*time.Second)
ctx, cancel := context.WithTimeout(ap.shutdownCtx, 30*time.Second)
if wallet, err = ap.bus.Wallet(ctx); err != nil {
ap.logger.Errorf("failed to get wallet info, err: %v", err)
}
Expand Down Expand Up @@ -583,7 +586,7 @@ func (ap *Autopilot) updateState(ctx context.Context) error {

func (ap *Autopilot) isStopped() bool {
select {
case <-ap.stopCtx.Done():
case <-ap.shutdownCtx.Done():
return true
default:
return false
Expand Down
8 changes: 4 additions & 4 deletions autopilot/contract_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (pr pruneResult) toMetric() api.ContractPruneMetric {

func (c *contractor) fetchPrunableContracts() (prunable []api.ContractPrunableData, _ error) {
// use a sane timeout
ctx, cancel := context.WithTimeout(c.ap.stopCtx, time.Minute)
ctx, cancel := context.WithTimeout(c.ap.shutdownCtx, time.Minute)
defer cancel()

// fetch prunable data
Expand Down Expand Up @@ -156,7 +156,7 @@ func (c *contractor) performContractPruning(wp *workerPool) {
}

// handle alert
ctx, cancel := context.WithTimeout(c.ap.stopCtx, time.Minute)
ctx, cancel := context.WithTimeout(c.ap.shutdownCtx, time.Minute)
if id, alert := result.toAlert(); alert != nil {
c.ap.RegisterAlert(ctx, *alert)
} else {
Expand All @@ -170,7 +170,7 @@ func (c *contractor) performContractPruning(wp *workerPool) {
})

// record metrics
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(c.ap.shutdownCtx, time.Minute)
if err := c.ap.bus.RecordContractPruneMetric(ctx, metrics...); err != nil {
c.logger.Error(err)
}
Expand All @@ -182,7 +182,7 @@ func (c *contractor) performContractPruning(wp *workerPool) {

func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneResult {
// create a sane timeout
ctx, cancel := context.WithTimeout(c.ap.stopCtx, 2*timeoutPruneContract)
ctx, cancel := context.WithTimeout(c.ap.shutdownCtx, 2*timeoutPruneContract)
defer cancel()

// fetch the host
Expand Down
Loading