From 2756576a4012b80fbde57a34bcd5a7675ced39f2 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 19 Feb 2024 11:54:19 +0100 Subject: [PATCH] worker: use background context for unlocking accounts --- worker/rhpv3.go | 179 +++++++++++++++++++++++------------------------- 1 file changed, 86 insertions(+), 93 deletions(-) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 03f67c6f6..5fbcd3ad6 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -337,19 +337,17 @@ type ( // accounts stores the balance and other metrics of accounts that the // worker maintains with a host. accounts struct { - as AccountStore - key types.PrivateKey - shutdownCtx context.Context + as AccountStore + key types.PrivateKey } // account contains information regarding a specific account of the // worker. account struct { - as AccountStore - id rhpv3.Account - key types.PrivateKey - host types.PublicKey - shutdownCtx context.Context + as AccountStore + id rhpv3.Account + key types.PrivateKey + host types.PublicKey } ) @@ -358,9 +356,8 @@ func (w *worker) initAccounts(as AccountStore) { panic("accounts already initialized") // developer error } w.accounts = &accounts{ - as: as, - key: w.deriveSubKey("accountkey"), - shutdownCtx: w.shutdownCtx, + as: as, + key: w.deriveSubKey("accountkey"), } } @@ -376,117 +373,113 @@ func (w *worker) initTransportPool() { func (a *accounts) ForHost(hk types.PublicKey) *account { accountID := rhpv3.Account(a.deriveAccountKey(hk).PublicKey()) return &account{ - as: a.as, - id: accountID, - key: a.key, - host: hk, - shutdownCtx: a.shutdownCtx, + as: a.as, + id: accountID, + key: a.key, + host: hk, } } -// WithDeposit increases the balance of an account by the amount returned by -// amtFn if amtFn doesn't return an error. -func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error { - _, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration) +func withAccountLock(ctx context.Context, as AccountStore, id rhpv3.Account, hk types.PublicKey, exclusive bool, fn func(a api.Account) error) error { + acc, lockID, err := as.LockAccount(ctx, id, hk, exclusive, accountLockingDuration) if err != nil { return err } + defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() + select { + case <-ctx.Done(): + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), time.Minute) + defer cancel() + default: + } + as.UnlockAccount(ctx, acc.ID, lockID) }() - amt, err := amtFn() - if err != nil { - return err - } - return a.as.AddBalance(ctx, a.id, a.host, amt.Big()) + return fn(acc) } -func (a *account) Balance(ctx context.Context) (types.Currency, error) { - account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration) - if err != nil { - return types.Currency{}, err - } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() +// Balance returns the account balance. +func (a *account) Balance(ctx context.Context) (balance types.Currency, err error) { + err = withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error { + balance = types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64()) + return nil + }) + return +} - return types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64()), nil +// WithDeposit increases the balance of an account by the amount returned by +// amtFn if amtFn doesn't return an error. +func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error { + return withAccountLock(ctx, a.as, a.id, a.host, false, func(_ api.Account) error { + amt, err := amtFn() + if err != nil { + return err + } + return a.as.AddBalance(ctx, a.id, a.host, amt.Big()) + }) +} + +// WithSync syncs an accounts balance with the bus. To do so, the account is +// locked while the balance is fetched through balanceFn. +func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error { + return withAccountLock(ctx, a.as, a.id, a.host, true, func(_ api.Account) error { + balance, err := balanceFn() + if err != nil { + return err + } + return a.as.SetBalance(ctx, a.id, a.host, balance.Big()) + }) } // WithWithdrawal decreases the balance of an account by the amount returned by // amtFn. The amount is still withdrawn if amtFn returns an error since some // costs are non-refundable. func (a *account) WithWithdrawal(ctx context.Context, amtFn func() (types.Currency, error)) error { - account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration) - if err != nil { - return err - } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() + return withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error { + // return early if the account needs to sync + if account.RequiresSync { + return fmt.Errorf("%w; account requires resync", errBalanceInsufficient) + } - // return early if the account needs to sync - if account.RequiresSync { - return fmt.Errorf("%w; account requires resync", errBalanceInsufficient) - } + // return early if our account is not funded + if account.Balance.Cmp(big.NewInt(0)) <= 0 { + return errBalanceInsufficient + } - // return early if our account is not funded - if account.Balance.Cmp(big.NewInt(0)) <= 0 { - return errBalanceInsufficient - } + // execute amtFn + amt, err := amtFn() + if isBalanceInsufficient(err) { + // in case of an insufficient balance, we schedule a sync + if scheduleErr := a.scheduleSync(); scheduleErr != nil { + err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, scheduleErr) + } + } - // execute amtFn - amt, err := amtFn() - if isBalanceInsufficient(err) { - // in case of an insufficient balance, we schedule a sync - scheduleCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - defer cancel() - err2 := a.as.ScheduleSync(scheduleCtx, a.id, a.host) - if err2 != nil { - err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, err2) + // if an amount was returned, we withdraw it. + if withdrawErr := a.withdrawFromBalance(amt); withdrawErr != nil { + err = fmt.Errorf("%w; failed to withdraw from account, error: %v", err, withdrawErr) } - } - // if the amount is zero, we are done - if amt.IsZero() { return err + }) +} + +func (a *account) withdrawFromBalance(amt types.Currency) error { + if amt.IsZero() { + return nil } - // if an amount was returned, we withdraw it. - addCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - errAdd := a.as.AddBalance(addCtx, a.id, a.host, new(big.Int).Neg(amt.Big())) - if errAdd != nil { - err = fmt.Errorf("%w; failed to add balance to account, error: %v", err, errAdd) - } - return err + return a.as.AddBalance(ctx, a.id, a.host, new(big.Int).Neg(amt.Big())) } -// WithSync syncs an accounts balance with the bus. To do so, the account is -// locked while the balance is fetched through balanceFn. -func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error { - _, lockID, err := a.as.LockAccount(ctx, a.id, a.host, true, accountLockingDuration) - if err != nil { - return err - } - defer func() { - unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second) - a.as.UnlockAccount(unlockCtx, a.id, lockID) - cancel() - }() - - balance, err := balanceFn() - if err != nil { - return err - } - return a.as.SetBalance(ctx, a.id, a.host, balance.Big()) +func (a *account) scheduleSync() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + return a.as.ScheduleSync(ctx, a.id, a.host) } // deriveAccountKey derives an account plus key for a given host and worker.