From 2756576a4012b80fbde57a34bcd5a7675ced39f2 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 19 Feb 2024 11:54:19 +0100 Subject: [PATCH 1/3] worker: use background context for unlocking accounts --- worker/rhpv3.go | 179 +++++++++++++++++++++++------------------------- 1 file changed, 86 insertions(+), 93 deletions(-) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 03f67c6f6..5fbcd3ad6 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -337,19 +337,17 @@ type ( // accounts stores the balance and other metrics of accounts that the // worker maintains with a host. accounts struct { - as AccountStore - key types.PrivateKey - shutdownCtx context.Context + as AccountStore + key types.PrivateKey } // account contains information regarding a specific account of the // worker. account struct { - as AccountStore - id rhpv3.Account - key types.PrivateKey - host types.PublicKey - shutdownCtx context.Context + as AccountStore + id rhpv3.Account + key types.PrivateKey + host types.PublicKey } ) @@ -358,9 +356,8 @@ func (w *worker) initAccounts(as AccountStore) { panic("accounts already initialized") // developer error } w.accounts = &accounts{ - as: as, - key: w.deriveSubKey("accountkey"), - shutdownCtx: w.shutdownCtx, + as: as, + key: w.deriveSubKey("accountkey"), } } @@ -376,117 +373,113 @@ func (w *worker) initTransportPool() { func (a *accounts) ForHost(hk types.PublicKey) *account { accountID := rhpv3.Account(a.deriveAccountKey(hk).PublicKey()) return &account{ - as: a.as, - id: accountID, - key: a.key, - host: hk, - shutdownCtx: a.shutdownCtx, + as: a.as, + id: accountID, + key: a.key, + host: hk, } } -// WithDeposit increases the balance of an account by the amount returned by -// amtFn if amtFn doesn't return an error. -func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error { - _, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration) +func withAccountLock(ctx context.Context, as AccountStore, id rhpv3.Account, hk types.PublicKey, exclusive bool, fn func(a api.Account) error) error { + acc, lockID, err := as.LockAccount(ctx, id, hk, exclusive, accountLockingDuration) if err != nil { return err } + defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() + select { + case <-ctx.Done(): + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), time.Minute) + defer cancel() + default: + } + as.UnlockAccount(ctx, acc.ID, lockID) }() - amt, err := amtFn() - if err != nil { - return err - } - return a.as.AddBalance(ctx, a.id, a.host, amt.Big()) + return fn(acc) } -func (a *account) Balance(ctx context.Context) (types.Currency, error) { - account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration) - if err != nil { - return types.Currency{}, err - } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() +// Balance returns the account balance. +func (a *account) Balance(ctx context.Context) (balance types.Currency, err error) { + err = withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error { + balance = types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64()) + return nil + }) + return +} - return types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64()), nil +// WithDeposit increases the balance of an account by the amount returned by +// amtFn if amtFn doesn't return an error. +func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error { + return withAccountLock(ctx, a.as, a.id, a.host, false, func(_ api.Account) error { + amt, err := amtFn() + if err != nil { + return err + } + return a.as.AddBalance(ctx, a.id, a.host, amt.Big()) + }) +} + +// WithSync syncs an accounts balance with the bus. To do so, the account is +// locked while the balance is fetched through balanceFn. +func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error { + return withAccountLock(ctx, a.as, a.id, a.host, true, func(_ api.Account) error { + balance, err := balanceFn() + if err != nil { + return err + } + return a.as.SetBalance(ctx, a.id, a.host, balance.Big()) + }) } // WithWithdrawal decreases the balance of an account by the amount returned by // amtFn. The amount is still withdrawn if amtFn returns an error since some // costs are non-refundable. func (a *account) WithWithdrawal(ctx context.Context, amtFn func() (types.Currency, error)) error { - account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration) - if err != nil { - return err - } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() + return withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error { + // return early if the account needs to sync + if account.RequiresSync { + return fmt.Errorf("%w; account requires resync", errBalanceInsufficient) + } - // return early if the account needs to sync - if account.RequiresSync { - return fmt.Errorf("%w; account requires resync", errBalanceInsufficient) - } + // return early if our account is not funded + if account.Balance.Cmp(big.NewInt(0)) <= 0 { + return errBalanceInsufficient + } - // return early if our account is not funded - if account.Balance.Cmp(big.NewInt(0)) <= 0 { - return errBalanceInsufficient - } + // execute amtFn + amt, err := amtFn() + if isBalanceInsufficient(err) { + // in case of an insufficient balance, we schedule a sync + if scheduleErr := a.scheduleSync(); scheduleErr != nil { + err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, scheduleErr) + } + } - // execute amtFn - amt, err := amtFn() - if isBalanceInsufficient(err) { - // in case of an insufficient balance, we schedule a sync - scheduleCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - defer cancel() - err2 := a.as.ScheduleSync(scheduleCtx, a.id, a.host) - if err2 != nil { - err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, err2) + // if an amount was returned, we withdraw it. + if withdrawErr := a.withdrawFromBalance(amt); withdrawErr != nil { + err = fmt.Errorf("%w; failed to withdraw from account, error: %v", err, withdrawErr) } - } - // if the amount is zero, we are done - if amt.IsZero() { return err + }) +} + +func (a *account) withdrawFromBalance(amt types.Currency) error { + if amt.IsZero() { + return nil } - // if an amount was returned, we withdraw it. - addCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - errAdd := a.as.AddBalance(addCtx, a.id, a.host, new(big.Int).Neg(amt.Big())) - if errAdd != nil { - err = fmt.Errorf("%w; failed to add balance to account, error: %v", err, errAdd) - } - return err + return a.as.AddBalance(ctx, a.id, a.host, new(big.Int).Neg(amt.Big())) } -// WithSync syncs an accounts balance with the bus. To do so, the account is -// locked while the balance is fetched through balanceFn. -func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error { - _, lockID, err := a.as.LockAccount(ctx, a.id, a.host, true, accountLockingDuration) - if err != nil { - return err - } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() - - balance, err := balanceFn() - if err != nil { - return err - } - return a.as.SetBalance(ctx, a.id, a.host, balance.Big()) +func (a *account) scheduleSync() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + return a.as.ScheduleSync(ctx, a.id, a.host) } // deriveAccountKey derives an account plus key for a given host and worker. From 3f11ef698af02f1af58236c297774871401e5546 Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 22 Feb 2024 14:09:12 +0100 Subject: [PATCH 2/3] worker: implement PR remarks --- worker/rhpv3.go | 50 ++++++++++++++++--------------------------------- 1 file changed, 16 insertions(+), 34 deletions(-) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 5fbcd3ad6..ee2dfcd85 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -385,19 +385,14 @@ func withAccountLock(ctx context.Context, as AccountStore, id rhpv3.Account, hk if err != nil { return err } + err = fn(acc) - defer func() { - select { - case <-ctx.Done(): - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(context.Background(), time.Minute) - defer cancel() - default: - } - as.UnlockAccount(ctx, acc.ID, lockID) - }() + // unlock account + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + _ = as.UnlockAccount(ctx, acc.ID, lockID) // ignore error + cancel() - return fn(acc) + return nil } // Balance returns the account balance. @@ -450,38 +445,25 @@ func (a *account) WithWithdrawal(ctx context.Context, amtFn func() (types.Curren // execute amtFn amt, err := amtFn() + + // in case of an insufficient balance, we schedule a sync if isBalanceInsufficient(err) { - // in case of an insufficient balance, we schedule a sync - if scheduleErr := a.scheduleSync(); scheduleErr != nil { - err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, scheduleErr) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + err = errors.Join(err, a.as.ScheduleSync(ctx, a.id, a.host)) + cancel() } - // if an amount was returned, we withdraw it. - if withdrawErr := a.withdrawFromBalance(amt); withdrawErr != nil { - err = fmt.Errorf("%w; failed to withdraw from account, error: %v", err, withdrawErr) + // if an amount was returned, we withdraw it + if !amt.IsZero() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + err = errors.Join(err, a.as.AddBalance(ctx, a.id, a.host, new(big.Int).Neg(amt.Big()))) + cancel() } return err }) } -func (a *account) withdrawFromBalance(amt types.Currency) error { - if amt.IsZero() { - return nil - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - return a.as.AddBalance(ctx, a.id, a.host, new(big.Int).Neg(amt.Big())) -} - -func (a *account) scheduleSync() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - return a.as.ScheduleSync(ctx, a.id, a.host) -} - // deriveAccountKey derives an account plus key for a given host and worker. // Each worker has its own account for a given host. That makes concurrency // around keeping track of an accounts balance and refilling it a lot easier in From bcdeb0de465e8ac3936b314bca6b6ea641f76ce3 Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 22 Feb 2024 14:41:01 +0100 Subject: [PATCH 3/3] worker: return err --- worker/rhpv3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index ee2dfcd85..25c26e42d 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -392,7 +392,7 @@ func withAccountLock(ctx context.Context, as AccountStore, id rhpv3.Account, hk _ = as.UnlockAccount(ctx, acc.ID, lockID) // ignore error cancel() - return nil + return err } // Balance returns the account balance.