Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop using the worker's shutdown context for unlocking accounts #981

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 86 additions & 93 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,19 +337,17 @@ type (
// accounts stores the balance and other metrics of accounts that the
// worker maintains with a host.
accounts struct {
as AccountStore
key types.PrivateKey
shutdownCtx context.Context
as AccountStore
key types.PrivateKey
}

// account contains information regarding a specific account of the
// worker.
account struct {
as AccountStore
id rhpv3.Account
key types.PrivateKey
host types.PublicKey
shutdownCtx context.Context
as AccountStore
id rhpv3.Account
key types.PrivateKey
host types.PublicKey
}
)

Expand All @@ -358,9 +356,8 @@ func (w *worker) initAccounts(as AccountStore) {
panic("accounts already initialized") // developer error
}
w.accounts = &accounts{
as: as,
key: w.deriveSubKey("accountkey"),
shutdownCtx: w.shutdownCtx,
as: as,
key: w.deriveSubKey("accountkey"),
}
}

Expand All @@ -376,117 +373,113 @@ func (w *worker) initTransportPool() {
func (a *accounts) ForHost(hk types.PublicKey) *account {
accountID := rhpv3.Account(a.deriveAccountKey(hk).PublicKey())
return &account{
as: a.as,
id: accountID,
key: a.key,
host: hk,
shutdownCtx: a.shutdownCtx,
as: a.as,
id: accountID,
key: a.key,
host: hk,
}
}

// WithDeposit increases the balance of an account by the amount returned by
// amtFn if amtFn doesn't return an error.
func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error {
_, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration)
func withAccountLock(ctx context.Context, as AccountStore, id rhpv3.Account, hk types.PublicKey, exclusive bool, fn func(a api.Account) error) error {
acc, lockID, err := as.LockAccount(ctx, id, hk, exclusive, accountLockingDuration)
if err != nil {
return err
}

defer func() {
unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
a.as.UnlockAccount(unlockCtx, a.id, lockID)
cancel()
select {
case <-ctx.Done():
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), time.Minute)
peterjan marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
default:
}
as.UnlockAccount(ctx, acc.ID, lockID)
}()

amt, err := amtFn()
if err != nil {
return err
}
return a.as.AddBalance(ctx, a.id, a.host, amt.Big())
return fn(acc)
}

func (a *account) Balance(ctx context.Context) (types.Currency, error) {
account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration)
if err != nil {
return types.Currency{}, err
}
defer func() {
unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
a.as.UnlockAccount(unlockCtx, a.id, lockID)
cancel()
}()
// Balance returns the account balance.
func (a *account) Balance(ctx context.Context) (balance types.Currency, err error) {
err = withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error {
balance = types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64())
return nil
})
return
}

return types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64()), nil
// WithDeposit increases the balance of an account by the amount returned by
// amtFn if amtFn doesn't return an error.
func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error {
return withAccountLock(ctx, a.as, a.id, a.host, false, func(_ api.Account) error {
amt, err := amtFn()
if err != nil {
return err
}
return a.as.AddBalance(ctx, a.id, a.host, amt.Big())
})
}

// WithSync syncs an accounts balance with the bus. To do so, the account is
// locked while the balance is fetched through balanceFn.
func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error {
return withAccountLock(ctx, a.as, a.id, a.host, true, func(_ api.Account) error {
balance, err := balanceFn()
if err != nil {
return err
}
return a.as.SetBalance(ctx, a.id, a.host, balance.Big())
})
}

// WithWithdrawal decreases the balance of an account by the amount returned by
// amtFn. The amount is still withdrawn if amtFn returns an error since some
// costs are non-refundable.
func (a *account) WithWithdrawal(ctx context.Context, amtFn func() (types.Currency, error)) error {
account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration)
if err != nil {
return err
}
defer func() {
unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
a.as.UnlockAccount(unlockCtx, a.id, lockID)
cancel()
}()
return withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error {
// return early if the account needs to sync
if account.RequiresSync {
return fmt.Errorf("%w; account requires resync", errBalanceInsufficient)
}

// return early if the account needs to sync
if account.RequiresSync {
return fmt.Errorf("%w; account requires resync", errBalanceInsufficient)
}
// return early if our account is not funded
if account.Balance.Cmp(big.NewInt(0)) <= 0 {
return errBalanceInsufficient
}

// return early if our account is not funded
if account.Balance.Cmp(big.NewInt(0)) <= 0 {
return errBalanceInsufficient
}
// execute amtFn
amt, err := amtFn()
if isBalanceInsufficient(err) {
// in case of an insufficient balance, we schedule a sync
if scheduleErr := a.scheduleSync(); scheduleErr != nil {
err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, scheduleErr)
}
}

// execute amtFn
amt, err := amtFn()
if isBalanceInsufficient(err) {
// in case of an insufficient balance, we schedule a sync
scheduleCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
defer cancel()
err2 := a.as.ScheduleSync(scheduleCtx, a.id, a.host)
if err2 != nil {
err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, err2)
// if an amount was returned, we withdraw it.
if withdrawErr := a.withdrawFromBalance(amt); withdrawErr != nil {
err = fmt.Errorf("%w; failed to withdraw from account, error: %v", err, withdrawErr)
}
}

// if the amount is zero, we are done
if amt.IsZero() {
return err
})
}

func (a *account) withdrawFromBalance(amt types.Currency) error {
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
if amt.IsZero() {
return nil
}

// if an amount was returned, we withdraw it.
addCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
errAdd := a.as.AddBalance(addCtx, a.id, a.host, new(big.Int).Neg(amt.Big()))
if errAdd != nil {
err = fmt.Errorf("%w; failed to add balance to account, error: %v", err, errAdd)
}
return err
return a.as.AddBalance(ctx, a.id, a.host, new(big.Int).Neg(amt.Big()))
}

// WithSync syncs an accounts balance with the bus. To do so, the account is
// locked while the balance is fetched through balanceFn.
func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error {
_, lockID, err := a.as.LockAccount(ctx, a.id, a.host, true, accountLockingDuration)
if err != nil {
return err
}
defer func() {
unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
a.as.UnlockAccount(unlockCtx, a.id, lockID)
cancel()
}()

balance, err := balanceFn()
if err != nil {
return err
}
return a.as.SetBalance(ctx, a.id, a.host, balance.Big())
func (a *account) scheduleSync() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
return a.as.ScheduleSync(ctx, a.id, a.host)
}

// deriveAccountKey derives an account plus key for a given host and worker.
Expand Down
Loading