diff --git a/api/bus.go b/api/bus.go index a0f33dcf0..93c359ce5 100644 --- a/api/bus.go +++ b/api/bus.go @@ -3,6 +3,7 @@ package api import ( "errors" + rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" ) @@ -45,6 +46,16 @@ type ( ) type ( + AccountsFundRequest struct { + AccountID rhpv3.Account `json:"accountID"` + Amount types.Currency `json:"amount"` + ContractID types.FileContractID `json:"contractID"` + } + + AccountsFundResponse struct { + Deposit types.Currency `json:"deposit"` + } + AccountsSaveRequest struct { Accounts []Account `json:"accounts"` } diff --git a/bus/bus.go b/bus/bus.go index 838628a59..065912c15 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -31,6 +31,7 @@ import ( "go.sia.tech/renterd/internal/rhp" rhp2 "go.sia.tech/renterd/internal/rhp/v2" rhp3 "go.sia.tech/renterd/internal/rhp/v3" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" "go.sia.tech/renterd/stores/sql" "go.sia.tech/renterd/webhooks" @@ -42,6 +43,7 @@ const ( defaultWalletRecordMetricInterval = 5 * time.Minute defaultPinUpdateInterval = 5 * time.Minute defaultPinRateWindow = 6 * time.Hour + lockingPriorityFunding = 40 lockingPriorityRenew = 80 stdTxnSize = 1200 // bytes ) @@ -297,7 +299,7 @@ type ( type Bus struct { startTime time.Time - masterKey [32]byte + masterKey utils.MasterKey alerts alerts.Alerter alertMgr AlertManager @@ -378,8 +380,9 @@ func New(ctx context.Context, masterKey [32]byte, am AlertManager, wm WebhooksMa // Handler returns an HTTP handler that serves the bus API. func (b *Bus) Handler() http.Handler { return jape.Mux(map[string]jape.Handler{ - "GET /accounts": b.accountsHandlerGET, - "POST /accounts": b.accountsHandlerPOST, + "GET /accounts": b.accountsHandlerGET, + "POST /accounts": b.accountsHandlerPOST, + "POST /accounts/fund": b.accountsFundHandler, "GET /alerts": b.handleGETAlerts, "POST /alerts/dismiss": b.handlePOSTAlertsDismiss, diff --git a/bus/client/accounts.go b/bus/client/accounts.go index 11ce58ca2..9742d2a0d 100644 --- a/bus/client/accounts.go +++ b/bus/client/accounts.go @@ -4,6 +4,8 @@ import ( "context" "net/url" + rhpv3 "go.sia.tech/core/rhp/v3" + "go.sia.tech/core/types" "go.sia.tech/renterd/api" ) @@ -15,6 +17,16 @@ func (c *Client) Accounts(ctx context.Context, owner string) (accounts []api.Acc return } +func (c *Client) FundAccount(ctx context.Context, account rhpv3.Account, fcid types.FileContractID, amount types.Currency) (types.Currency, error) { + var resp api.AccountsFundResponse + err := c.c.WithContext(ctx).POST("/accounts/fund", api.AccountsFundRequest{ + AccountID: account, + Amount: amount, + ContractID: fcid, + }, &resp) + return resp.Deposit, err +} + // UpdateAccounts saves all accounts. func (c *Client) UpdateAccounts(ctx context.Context, accounts []api.Account) (err error) { err = c.c.WithContext(ctx).POST("/accounts", api.AccountsSaveRequest{ diff --git a/bus/routes.go b/bus/routes.go index f1302fef1..b207c1bc0 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -15,6 +15,8 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" + rhp3 "go.sia.tech/renterd/internal/rhp/v3" + ibus "go.sia.tech/renterd/internal/bus" "go.sia.tech/renterd/internal/gouging" @@ -41,6 +43,87 @@ func (b *Bus) fetchSetting(ctx context.Context, key string, value interface{}) e return nil } +func (b *Bus) accountsFundHandler(jc jape.Context) { + var req api.AccountsFundRequest + if jc.Decode(&req) != nil { + return + } + + // contract metadata + cm, err := b.ms.Contract(jc.Request.Context(), req.ContractID) + if jc.Check("failed to fetch contract metadata", err) != nil { + return + } + + rk := b.masterKey.DeriveContractKey(cm.HostKey) + + // acquire contract + lockID, err := b.contractLocker.Acquire(jc.Request.Context(), lockingPriorityFunding, req.ContractID, math.MaxInt64) + if jc.Check("failed to acquire lock", err) != nil { + return + } + defer b.contractLocker.Release(req.ContractID, lockID) + + // latest revision + rev, err := b.rhp3.Revision(jc.Request.Context(), req.ContractID, cm.HostKey, cm.SiamuxAddr) + if jc.Check("failed to fetch contract revision", err) != nil { + return + } + + // ensure we have at least 2H in the contract to cover the costs + if types.NewCurrency64(2).Cmp(rev.ValidRenterPayout()) >= 0 { + jc.Error(fmt.Errorf("insufficient funds to fund account: %v <= %v", rev.ValidRenterPayout(), types.NewCurrency64(2)), http.StatusBadRequest) + return + } + + // price table + pt, err := b.rhp3.PriceTable(jc.Request.Context(), cm.HostKey, cm.SiamuxAddr, rhp3.PreparePriceTableContractPayment(&rev, req.AccountID, rk)) + if jc.Check("failed to fetch price table", err) != nil { + return + } + + // check only the FundAccountCost + if types.NewCurrency64(1).Cmp(pt.FundAccountCost) < 0 { + jc.Error(fmt.Errorf("%w: host is gouging on FundAccountCost", gouging.ErrPriceTableGouging), http.StatusServiceUnavailable) + return + } + + // cap the deposit by what's left in the contract + deposit := req.Amount + cost := pt.FundAccountCost + availableFunds := rev.ValidRenterPayout().Sub(cost) + if deposit.Cmp(availableFunds) > 0 { + deposit = availableFunds + } + + // fund the account + err = b.rhp3.FundAccount(jc.Request.Context(), &rev, cm.HostKey, cm.SiamuxAddr, deposit, req.AccountID, pt.HostPriceTable, rk) + if jc.Check("failed to fund account", err) != nil { + return + } + + // record spending + err = b.ms.RecordContractSpending(jc.Request.Context(), []api.ContractSpendingRecord{ + { + ContractSpending: api.ContractSpending{ + FundAccount: deposit.Add(cost), + }, + ContractID: rev.ParentID, + RevisionNumber: rev.RevisionNumber, + Size: rev.Filesize, + + MissedHostPayout: rev.MissedHostPayout(), + ValidRenterPayout: rev.ValidRenterPayout(), + }, + }) + if err != nil { + b.logger.Error("failed to record contract spending", zap.Error(err)) + } + jc.Encode(api.AccountsFundResponse{ + Deposit: deposit, + }) +} + func (b *Bus) consensusAcceptBlock(jc jape.Context) { var block types.Block if jc.Decode(&block) != nil { diff --git a/internal/rhp/v3/rhp.go b/internal/rhp/v3/rhp.go index 44d4c91bc..c723d4117 100644 --- a/internal/rhp/v3/rhp.go +++ b/internal/rhp/v3/rhp.go @@ -167,13 +167,13 @@ func (c *Client) Renew(ctx context.Context, gc gouging.Checker, rev types.FileCo return } -func (c *Client) SyncAccount(ctx context.Context, rev *types.FileContractRevision, hk types.PublicKey, siamuxAddr string, accID rhpv3.Account, pt rhpv3.SettingsID, rk types.PrivateKey) (balance types.Currency, _ error) { +func (c *Client) SyncAccount(ctx context.Context, rev *types.FileContractRevision, hk types.PublicKey, siamuxAddr string, accID rhpv3.Account, pt rhpv3.HostPriceTable, rk types.PrivateKey) (balance types.Currency, _ error) { return balance, c.tpool.withTransport(ctx, hk, siamuxAddr, func(ctx context.Context, t *transportV3) error { - payment, err := payByContract(rev, types.NewCurrency64(1), accID, rk) + payment, err := payByContract(rev, pt.AccountBalanceCost, accID, rk) if err != nil { return err } - balance, err = rpcAccountBalance(ctx, t, &payment, accID, pt) + balance, err = rpcAccountBalance(ctx, t, &payment, accID, pt.UID) return err }) } diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 6d47d0590..ec4bfabd4 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -1384,7 +1384,7 @@ func TestEphemeralAccountSync(t *testing.T) { if len(accounts) != 1 || accounts[0].ID != acc.ID { t.Fatal("account should exist") } else if accounts[0].CleanShutdown || !accounts[0].RequiresSync { - t.Fatalf("account shouldn't be marked as clean shutdown or not require a sync, got %v", accounts[0]) + t.Fatal("account shouldn't be marked as clean shutdown or not require a sync, got", accounts[0].CleanShutdown, accounts[0].RequiresSync) } // assert account was funded diff --git a/internal/utils/keys.go b/internal/utils/keys.go new file mode 100644 index 000000000..0122491a0 --- /dev/null +++ b/internal/utils/keys.go @@ -0,0 +1,66 @@ +package utils + +import ( + "fmt" + + "go.sia.tech/core/types" + "golang.org/x/crypto/blake2b" +) + +type ( + MasterKey [32]byte + AccountsKey types.PrivateKey +) + +// DeriveAccountsKey derives an accounts key from a masterkey which is used +// to derive individual account keys from. +func (key *MasterKey) DeriveAccountsKey(workerID string) AccountsKey { + keyPath := fmt.Sprintf("accounts/%s", workerID) + return AccountsKey(key.deriveSubKey(keyPath)) +} + +// DeriveContractKey derives a contract key from a masterkey which is used to +// form, renew and revise contracts. +func (key *MasterKey) DeriveContractKey(hostKey types.PublicKey) types.PrivateKey { + seed := blake2b.Sum256(append(key.deriveSubKey("renterkey"), hostKey[:]...)) + pk := types.NewPrivateKeyFromSeed(seed[:]) + for i := range seed { + seed[i] = 0 + } + return pk +} + +// deriveSubKey can be used to derive a sub-masterkey from the worker's +// masterkey to use for a specific purpose. Such as deriving more keys for +// ephemeral accounts. +func (key *MasterKey) deriveSubKey(purpose string) types.PrivateKey { + seed := blake2b.Sum256(append(key[:], []byte(purpose)...)) + pk := types.NewPrivateKeyFromSeed(seed[:]) + for i := range seed { + seed[i] = 0 + } + return pk +} + +// DeriveAccountKey derives an account plus key for a given host and worker. +// Each worker has its own account for a given host. That makes concurrency +// around keeping track of an accounts balance and refilling it a lot easier in +// a multi-worker setup. +func (key *AccountsKey) DeriveAccountKey(hk types.PublicKey) types.PrivateKey { + index := byte(0) // not used yet but can be used to derive more than 1 account per host + + // Append the host for which to create it and the index to the + // corresponding sub-key. + subKey := *key + data := make([]byte, 0, len(subKey)+len(hk)+1) + data = append(data, subKey[:]...) + data = append(data, hk[:]...) + data = append(data, index) + + seed := types.HashBytes(data) + pk := types.NewPrivateKeyFromSeed(seed[:]) + for i := range seed { + seed[i] = 0 + } + return pk +} diff --git a/internal/worker/accounts.go b/internal/worker/accounts.go index 76613b757..1022075f1 100644 --- a/internal/worker/accounts.go +++ b/internal/worker/accounts.go @@ -13,6 +13,7 @@ import ( "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" rhp3 "go.sia.tech/renterd/internal/rhp/v3" + "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" ) @@ -31,8 +32,11 @@ var ( ) type ( - AccountMgrWorker interface { - FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string, balance types.Currency) error + AccountFunder interface { + FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, desired types.Currency) error + } + + AccountSyncer interface { SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error } @@ -53,11 +57,12 @@ type ( type ( AccountMgr struct { alerts alerts.Alerter - w AccountMgrWorker + funder AccountFunder + syncer AccountSyncer dc DownloadContracts cs ConsensusState s AccountStore - key types.PrivateKey + key utils.AccountsKey logger *zap.SugaredLogger owner string refillInterval time.Duration @@ -87,13 +92,14 @@ type ( // NewAccountManager creates a new account manager. It will load all accounts // from the given store and mark the shutdown as unclean. When Shutdown is // called it will save all accounts. -func NewAccountManager(key types.PrivateKey, owner string, alerter alerts.Alerter, w AccountMgrWorker, cs ConsensusState, dc DownloadContracts, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) { +func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alerter, funder AccountFunder, syncer AccountSyncer, cs ConsensusState, dc DownloadContracts, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) { logger := l.Named("accounts").Sugar() shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) a := &AccountMgr{ alerts: alerter, - w: w, + funder: funder, + syncer: syncer, cs: cs, dc: dc, s: s, @@ -177,7 +183,7 @@ func (a *AccountMgr) account(hk types.PublicKey) *Account { defer a.mu.Unlock() // Derive account key. - accKey := deriveAccountKey(a.key, hk) + accKey := a.key.DeriveAccountKey(hk) accID := rhpv3.Account(accKey.PublicKey()) // Create account if it doesn't exist. @@ -239,7 +245,7 @@ func (a *AccountMgr) run() { a.mu.Lock() accounts := make(map[rhpv3.Account]*Account, len(saved)) for _, acc := range saved { - accKey := deriveAccountKey(a.key, acc.HostKey) + accKey := a.key.DeriveAccountKey(acc.HostKey) if rhpv3.Account(accKey.PublicKey()) != acc.ID { a.logger.Errorf("account key derivation mismatch %v != %v", accKey.PublicKey(), acc.ID) continue @@ -384,7 +390,7 @@ func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMet // check if a resync is needed if account.RequiresSync { // sync the account - err := a.w.SyncAccount(ctx, contract.ID, contract.HostKey, contract.SiamuxAddr) + err := a.syncer.SyncAccount(ctx, contract.ID, contract.HostKey, contract.SiamuxAddr) if err != nil { return fmt.Errorf("failed to sync account's balance: %w", err) } @@ -399,7 +405,7 @@ func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMet } // fund the account - err := a.w.FundAccount(ctx, contract.ID, contract.HostKey, contract.SiamuxAddr, maxBalance) + err := a.funder.FundAccount(ctx, contract.ID, contract.HostKey, maxBalance) if err != nil { return fmt.Errorf("failed to fund account: %w", err) } @@ -587,29 +593,6 @@ func (a *Account) setBalance(balance *big.Int) { zap.Stringer("drift", drift)) } -// deriveAccountKey derives an account plus key for a given host and worker. -// Each worker has its own account for a given host. That makes concurrency -// around keeping track of an accounts balance and refilling it a lot easier in -// a multi-worker setup. -func deriveAccountKey(mgrKey types.PrivateKey, hostKey types.PublicKey) types.PrivateKey { - index := byte(0) // not used yet but can be used to derive more than 1 account per host - - // Append the host for which to create it and the index to the - // corresponding sub-key. - subKey := mgrKey - data := make([]byte, 0, len(subKey)+len(hostKey)+1) - data = append(data, subKey[:]...) - data = append(data, hostKey[:]...) - data = append(data, index) - - seed := types.HashBytes(data) - pk := types.NewPrivateKeyFromSeed(seed[:]) - for i := range seed { - seed[i] = 0 - } - return pk -} - func newAccountRefillAlert(id rhpv3.Account, contract api.ContractMetadata, err error, keysAndValues ...string) alerts.Alert { data := map[string]interface{}{ "error": err.Error(), diff --git a/internal/worker/accounts_test.go b/internal/worker/accounts_test.go index 207724ef1..d33e67200 100644 --- a/internal/worker/accounts_test.go +++ b/internal/worker/accounts_test.go @@ -10,6 +10,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" ) @@ -29,7 +30,7 @@ func (b *mockAccountMgrBackend) RegisterAlert(context.Context, alerts.Alert) err return nil } -func (b *mockAccountMgrBackend) FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string, balance types.Currency) error { +func (b *mockAccountMgrBackend) FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, balance types.Currency) error { return nil } func (b *mockAccountMgrBackend) SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error { @@ -59,7 +60,7 @@ func TestAccounts(t *testing.T) { }, }, } - mgr, err := NewAccountManager(types.GeneratePrivateKey(), "test", b, b, b, b, b, time.Second, zap.NewNop()) + mgr, err := NewAccountManager(utils.AccountsKey(types.GeneratePrivateKey()), "test", b, b, b, b, b, b, time.Second, zap.NewNop()) if err != nil { t.Fatal(err) } diff --git a/worker/alerts.go b/worker/alerts.go index 5ab5092b7..664698fb6 100644 --- a/worker/alerts.go +++ b/worker/alerts.go @@ -13,7 +13,7 @@ func randomAlertID() types.Hash256 { return frand.Entropy256() } -func newDownloadFailedAlert(bucket, path, prefix, marker string, offset, length, contracts int64, err error) alerts.Alert { +func newDownloadFailedAlert(bucket, path string, offset, length, contracts int64, err error) alerts.Alert { return alerts.Alert{ ID: randomAlertID(), Severity: alerts.SeverityError, @@ -21,8 +21,6 @@ func newDownloadFailedAlert(bucket, path, prefix, marker string, offset, length, Data: map[string]any{ "bucket": bucket, "path": path, - "prefix": prefix, - "marker": marker, "offset": offset, "length": length, "contracts": contracts, diff --git a/worker/host.go b/worker/host.go index b5bbb71b7..40695f8b0 100644 --- a/worker/host.go +++ b/worker/host.go @@ -214,16 +214,13 @@ func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision) return err } - // check only the unused defaults - gc, err := GougingCheckerFromContext(ctx, false) - if err != nil { - return err - } else if err := gc.CheckUnusedDefaults(pt.HostPriceTable); err != nil { - return fmt.Errorf("%w: %v", gouging.ErrPriceTableGouging, err) + // check only the AccountBalanceCost + if types.NewCurrency64(1).Cmp(pt.AccountBalanceCost) < 0 { + return fmt.Errorf("%w: host is gouging on AccountBalanceCost", gouging.ErrPriceTableGouging) } return h.acc.WithSync(func() (types.Currency, error) { - return h.client.SyncAccount(ctx, rev, h.hk, h.siamuxAddr, h.acc.ID(), pt.UID, h.renterKey) + return h.client.SyncAccount(ctx, rev, h.hk, h.siamuxAddr, h.acc.ID(), pt.HostPriceTable, h.renterKey) }) } diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 29bf576f2..feacaba6d 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -10,6 +10,7 @@ import ( "time" rhpv2 "go.sia.tech/core/rhp/v2" + rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" @@ -80,6 +81,10 @@ func newBusMock(cs *contractStoreMock, hs *hostStoreMock, os *objectStoreMock) * } } +func (b *busMock) FundAccount(ctx context.Context, acc rhpv3.Account, fcid types.FileContractID, desired types.Currency) (types.Currency, error) { + return types.ZeroCurrency, nil +} + type contractMock struct { rev types.FileContractRevision metadata api.ContractMetadata diff --git a/worker/worker.go b/worker/worker.go index 09f24073e..be27f2cc6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -36,7 +36,6 @@ import ( "go.sia.tech/renterd/worker/client" "go.sia.tech/renterd/worker/s3" "go.uber.org/zap" - "golang.org/x/crypto/blake2b" ) const ( @@ -76,7 +75,9 @@ type ( gouging.ConsensusState webhooks.Broadcaster + AccountFunder iworker.AccountStore + ContractLocker ContractStore HostStore @@ -88,6 +89,10 @@ type ( Wallet } + AccountFunder interface { + FundAccount(ctx context.Context, account rhpv3.Account, fcid types.FileContractID, amount types.Currency) (types.Currency, error) + } + ContractStore interface { Contract(ctx context.Context, id types.FileContractID) (api.ContractMetadata, error) ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error) @@ -150,18 +155,6 @@ type ( } ) -// deriveSubKey can be used to derive a sub-masterkey from the worker's -// masterkey to use for a specific purpose. Such as deriving more keys for -// ephemeral accounts. -func (w *Worker) deriveSubKey(purpose string) types.PrivateKey { - seed := blake2b.Sum256(append(w.masterKey[:], []byte(purpose)...)) - pk := types.NewPrivateKeyFromSeed(seed[:]) - for i := range seed { - seed[i] = 0 - } - return pk -} - // TODO: deriving the renter key from the host key using the master key only // works if we persist a hash of the renter's master key in the database and // compare it on startup, otherwise there's no way of knowing the derived key is @@ -175,12 +168,7 @@ func (w *Worker) deriveSubKey(purpose string) types.PrivateKey { // TODO: instead of deriving a renter key use a randomly generated salt so we're // not limited to one key per host func (w *Worker) deriveRenterKey(hostKey types.PublicKey) types.PrivateKey { - seed := blake2b.Sum256(append(w.deriveSubKey("renterkey"), hostKey[:]...)) - pk := types.NewPrivateKeyFromSeed(seed[:]) - for i := range seed { - seed[i] = 0 - } - return pk + return w.masterKey.DeriveContractKey(hostKey) } // A worker talks to Sia hosts to perform contract and storage operations within @@ -194,7 +182,7 @@ type Worker struct { allowPrivateIPs bool id string bus Bus - masterKey [32]byte + masterKey utils.MasterKey startTime time.Time eventSubscriber iworker.EventSubscriber @@ -1370,38 +1358,33 @@ func (w *Worker) headObject(ctx context.Context, bucket, path string, onlyMetada }, res, nil } -func (w *Worker) FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string, balance types.Currency) error { - // attach gouging checker - gp, err := w.cache.GougingParams(ctx) - if err != nil { - return fmt.Errorf("couldn't get gouging parameters; %w", err) - } - ctx = WithGougingChecker(ctx, w.bus, gp) - - // fund the account - err = w.withRevision(ctx, defaultRevisionFetchTimeout, fcid, hk, siamuxAddr, lockingPriorityFunding, func(rev types.FileContractRevision) (err error) { - h := w.Host(hk, rev.ParentID, siamuxAddr) - err = h.FundAccount(ctx, balance, &rev) - if rhp3.IsBalanceMaxExceeded(err) { - // sync the account - err = h.SyncAccount(ctx, &rev) - if err != nil { - w.logger.Infof(fmt.Sprintf("failed to sync account: %v", err), "host", hk) - return - } +func (w *Worker) FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, desired types.Currency) error { + // calculate the deposit amount + acc := w.accounts.ForHost(hk) + return acc.WithDeposit(func(balance types.Currency) (types.Currency, error) { + // return early if we have the desired balance + if balance.Cmp(desired) >= 0 { + return types.ZeroCurrency, nil + } + deposit := desired.Sub(balance) - // try funding the account again - err = h.FundAccount(ctx, balance, &rev) - if err != nil { - w.logger.Errorw(fmt.Sprintf("failed to fund account after syncing: %v", err), "host", hk, "balance", balance) + // fund the account + var err error + deposit, err = w.bus.FundAccount(ctx, acc.ID(), fcid, desired.Sub(balance)) + if err != nil { + if rhp3.IsBalanceMaxExceeded(err) { + acc.ScheduleSync() } + return types.ZeroCurrency, fmt.Errorf("failed to fund account with %v; %w", deposit, err) } - return + + // log the account balance after funding + w.logger.Debugw("fund account succeeded", + "balance", balance.ExactString(), + "deposit", deposit.ExactString(), + ) + return deposit, nil }) - if err != nil { - return fmt.Errorf("couldn't fund account; %w", err) - } - return nil } func (w *Worker) GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) { @@ -1450,7 +1433,7 @@ func (w *Worker) GetObject(ctx context.Context, bucket, path string, opts api.Do if !errors.Is(err, ErrShuttingDown) && !errors.Is(err, errDownloadCancelled) && !errors.Is(err, io.ErrClosedPipe) { - w.registerAlert(newDownloadFailedAlert(bucket, path, opts.Prefix, opts.Marker, offset, length, int64(len(contracts)), err)) + w.registerAlert(newDownloadFailedAlert(bucket, path, offset, length, int64(len(contracts)), err)) } return fmt.Errorf("failed to download object: %w", err) } @@ -1589,8 +1572,7 @@ func (w *Worker) initAccounts(refillInterval time.Duration) (err error) { if w.accounts != nil { panic("priceTables already initialized") // developer error } - keyPath := fmt.Sprintf("accounts/%s", w.id) - w.accounts, err = iworker.NewAccountManager(w.deriveSubKey(keyPath), w.id, w.bus, w, w.bus, w.cache, w.bus, refillInterval, w.logger.Desugar()) + w.accounts, err = iworker.NewAccountManager(w.masterKey.DeriveAccountsKey(w.id), w.id, w.bus, w, w, w.bus, w.cache, w.bus, refillInterval, w.logger.Desugar()) return err }