Skip to content

Commit

Permalink
worker: use background context for unlocking accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Feb 19, 2024
1 parent 6dc7acc commit 2756576
Showing 1 changed file with 86 additions and 93 deletions.
179 changes: 86 additions & 93 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,19 +337,17 @@ type (
// accounts stores the balance and other metrics of accounts that the
// worker maintains with a host.
accounts struct {
as AccountStore
key types.PrivateKey
shutdownCtx context.Context
as AccountStore
key types.PrivateKey
}

// account contains information regarding a specific account of the
// worker.
account struct {
as AccountStore
id rhpv3.Account
key types.PrivateKey
host types.PublicKey
shutdownCtx context.Context
as AccountStore
id rhpv3.Account
key types.PrivateKey
host types.PublicKey
}
)

Expand All @@ -358,9 +356,8 @@ func (w *worker) initAccounts(as AccountStore) {
panic("accounts already initialized") // developer error
}
w.accounts = &accounts{
as: as,
key: w.deriveSubKey("accountkey"),
shutdownCtx: w.shutdownCtx,
as: as,
key: w.deriveSubKey("accountkey"),
}
}

Expand All @@ -376,117 +373,113 @@ func (w *worker) initTransportPool() {
func (a *accounts) ForHost(hk types.PublicKey) *account {
accountID := rhpv3.Account(a.deriveAccountKey(hk).PublicKey())
return &account{
as: a.as,
id: accountID,
key: a.key,
host: hk,
shutdownCtx: a.shutdownCtx,
as: a.as,
id: accountID,
key: a.key,
host: hk,
}
}

// WithDeposit increases the balance of an account by the amount returned by
// amtFn if amtFn doesn't return an error.
func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error {
_, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration)
func withAccountLock(ctx context.Context, as AccountStore, id rhpv3.Account, hk types.PublicKey, exclusive bool, fn func(a api.Account) error) error {
acc, lockID, err := as.LockAccount(ctx, id, hk, exclusive, accountLockingDuration)
if err != nil {
return err
}

defer func() {
unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
a.as.UnlockAccount(unlockCtx, a.id, lockID)
cancel()
select {
case <-ctx.Done():
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), time.Minute)
defer cancel()
default:
}
as.UnlockAccount(ctx, acc.ID, lockID)
}()

amt, err := amtFn()
if err != nil {
return err
}
return a.as.AddBalance(ctx, a.id, a.host, amt.Big())
return fn(acc)
}

func (a *account) Balance(ctx context.Context) (types.Currency, error) {
account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration)
if err != nil {
return types.Currency{}, err
}
defer func() {
unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
a.as.UnlockAccount(unlockCtx, a.id, lockID)
cancel()
}()
// Balance returns the account balance.
func (a *account) Balance(ctx context.Context) (balance types.Currency, err error) {
err = withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error {
balance = types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64())
return nil
})
return
}

return types.NewCurrency(account.Balance.Uint64(), new(big.Int).Rsh(account.Balance, 64).Uint64()), nil
// WithDeposit increases the balance of an account by the amount returned by
// amtFn if amtFn doesn't return an error.
func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, error)) error {
return withAccountLock(ctx, a.as, a.id, a.host, false, func(_ api.Account) error {
amt, err := amtFn()
if err != nil {
return err
}
return a.as.AddBalance(ctx, a.id, a.host, amt.Big())
})
}

// WithSync syncs an accounts balance with the bus. To do so, the account is
// locked while the balance is fetched through balanceFn.
func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error {
return withAccountLock(ctx, a.as, a.id, a.host, true, func(_ api.Account) error {
balance, err := balanceFn()
if err != nil {
return err
}
return a.as.SetBalance(ctx, a.id, a.host, balance.Big())
})
}

// WithWithdrawal decreases the balance of an account by the amount returned by
// amtFn. The amount is still withdrawn if amtFn returns an error since some
// costs are non-refundable.
func (a *account) WithWithdrawal(ctx context.Context, amtFn func() (types.Currency, error)) error {
account, lockID, err := a.as.LockAccount(ctx, a.id, a.host, false, accountLockingDuration)
if err != nil {
return err
}
defer func() {
unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
a.as.UnlockAccount(unlockCtx, a.id, lockID)
cancel()
}()
return withAccountLock(ctx, a.as, a.id, a.host, false, func(account api.Account) error {
// return early if the account needs to sync
if account.RequiresSync {
return fmt.Errorf("%w; account requires resync", errBalanceInsufficient)
}

// return early if the account needs to sync
if account.RequiresSync {
return fmt.Errorf("%w; account requires resync", errBalanceInsufficient)
}
// return early if our account is not funded
if account.Balance.Cmp(big.NewInt(0)) <= 0 {
return errBalanceInsufficient
}

// return early if our account is not funded
if account.Balance.Cmp(big.NewInt(0)) <= 0 {
return errBalanceInsufficient
}
// execute amtFn
amt, err := amtFn()
if isBalanceInsufficient(err) {
// in case of an insufficient balance, we schedule a sync
if scheduleErr := a.scheduleSync(); scheduleErr != nil {
err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, scheduleErr)
}
}

// execute amtFn
amt, err := amtFn()
if isBalanceInsufficient(err) {
// in case of an insufficient balance, we schedule a sync
scheduleCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
defer cancel()
err2 := a.as.ScheduleSync(scheduleCtx, a.id, a.host)
if err2 != nil {
err = fmt.Errorf("%w; failed to set requiresSync flag on bus, error: %v", err, err2)
// if an amount was returned, we withdraw it.
if withdrawErr := a.withdrawFromBalance(amt); withdrawErr != nil {
err = fmt.Errorf("%w; failed to withdraw from account, error: %v", err, withdrawErr)
}
}

// if the amount is zero, we are done
if amt.IsZero() {
return err
})
}

func (a *account) withdrawFromBalance(amt types.Currency) error {
if amt.IsZero() {
return nil
}

// if an amount was returned, we withdraw it.
addCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
errAdd := a.as.AddBalance(addCtx, a.id, a.host, new(big.Int).Neg(amt.Big()))
if errAdd != nil {
err = fmt.Errorf("%w; failed to add balance to account, error: %v", err, errAdd)
}
return err
return a.as.AddBalance(ctx, a.id, a.host, new(big.Int).Neg(amt.Big()))
}

// WithSync syncs an accounts balance with the bus. To do so, the account is
// locked while the balance is fetched through balanceFn.
func (a *account) WithSync(ctx context.Context, balanceFn func() (types.Currency, error)) error {
_, lockID, err := a.as.LockAccount(ctx, a.id, a.host, true, accountLockingDuration)
if err != nil {
return err
}
defer func() {
unlockCtx, cancel := context.WithTimeout(a.shutdownCtx, 10*time.Second)
a.as.UnlockAccount(unlockCtx, a.id, lockID)
cancel()
}()

balance, err := balanceFn()
if err != nil {
return err
}
return a.as.SetBalance(ctx, a.id, a.host, balance.Big())
func (a *account) scheduleSync() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
return a.as.ScheduleSync(ctx, a.id, a.host)
}

// deriveAccountKey derives an account plus key for a given host and worker.
Expand Down

0 comments on commit 2756576

Please sign in to comment.