From 55780ec5483107244b61e7bf63ec63c877a3cd59 Mon Sep 17 00:00:00 2001 From: Peter-Jan Brone Date: Tue, 19 Dec 2023 09:20:24 +0100 Subject: [PATCH] Contract Lock Ctx (#839) * worker: derive contract lock ctx * worker: pass shutdown ctx * autopilot: improve error handling --- autopilot/autopilot.go | 18 ++++++++++++------ worker/contract_lock.go | 6 +++--- worker/uploader.go | 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 3e15bb66b..e72c3c17a 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -271,7 +271,9 @@ func (ap *Autopilot) Run() error { // perform maintenance setChanged, err := ap.c.performContractMaintenance(ctx, w) - if err != nil { + if err != nil && isErr(err, context.Canceled) { + return + } else if err != nil { ap.logger.Errorf("contract maintenance failed, err: %v", err) } maintenanceSuccess := err == nil @@ -288,8 +290,6 @@ func (ap *Autopilot) Run() error { ap.logger.Debug("account refills loop launched") go ap.a.refillWorkersAccountsLoop(ap.shutdownCtx) }) - } else { - ap.logger.Errorf("contract maintenance failed, err: %v", err) } // migration @@ -376,7 +376,9 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configure cancel() // if the config was not found, or we were unable to fetch it, keep blocking - if err != nil && strings.Contains(err.Error(), api.ErrAutopilotNotFound.Error()) { + if isErr(err, context.Canceled) { + return + } else if isErr(err, api.ErrAutopilotNotFound) { once.Do(func() { ap.logger.Info("autopilot is waiting to be configured...") }) } else if err != nil { ap.logger.Errorf("autopilot is unable to fetch its configuration from the bus, err: %v", err) @@ -407,7 +409,9 @@ func (ap *Autopilot) blockUntilOnline() (online bool) { online = len(peers) > 0 cancel() - if err != nil { + if isErr(err, context.Canceled) { + return + } else if err != nil { ap.logger.Errorf("failed to get peers, err: %v", err) } else if !online { once.Do(func() { ap.logger.Info("autopilot is waiting on the bus to connect to peers...") }) @@ -439,7 +443,9 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, block cancel() // if an error occurred, or if we're not synced, we continue - if err != nil { + if isErr(err, context.Canceled) { + return + } else if err != nil { ap.logger.Errorf("failed to get consensus state, err: %v", err) } else if !synced { once.Do(func() { ap.logger.Info("autopilot is waiting for consensus to sync...") }) diff --git a/worker/contract_lock.go b/worker/contract_lock.go index 787e388b2..f5115d37f 100644 --- a/worker/contract_lock.go +++ b/worker/contract_lock.go @@ -31,8 +31,8 @@ type contractLock struct { stopWG sync.WaitGroup } -func newContractLock(fcid types.FileContractID, lockID uint64, d time.Duration, locker ContractLocker, logger *zap.SugaredLogger) *contractLock { - ctx, cancel := context.WithCancel(context.Background()) +func newContractLock(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration, locker ContractLocker, logger *zap.SugaredLogger) *contractLock { + ctx, cancel := context.WithCancel(ctx) cl := &contractLock{ lockID: lockID, fcid: fcid, @@ -56,7 +56,7 @@ func (w *worker) acquireContractLock(ctx context.Context, fcid types.FileContrac if err != nil { return nil, err } - return newContractLock(fcid, lockID, w.contractLockingDuration, w.bus, w.logger), nil + return newContractLock(w.shutdownCtx, fcid, lockID, w.contractLockingDuration, w.bus, w.logger), nil } func (w *worker) withContractLock(ctx context.Context, fcid types.FileContractID, priority int, fn func() error) error { diff --git a/worker/uploader.go b/worker/uploader.go index 7f0c46b0c..6057a52e5 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -198,7 +198,7 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, } // defer the release - lock := newContractLock(fcid, lockID, req.contractLockDuration, u.cl, u.logger) + lock := newContractLock(u.shutdownCtx, fcid, lockID, req.contractLockDuration, u.cl, u.logger) defer func() { ctx, cancel := context.WithTimeout(u.shutdownCtx, 10*time.Second) lock.Release(ctx)