From 09a155ca98eff7bed7c070ae8a1963124fb0d585 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 30 Jul 2024 13:36:44 +0200 Subject: [PATCH 01/13] worker: withdraw PT payments, reset drift migration --- bus/accounts.go | 66 +++-- internal/sql/migrations.go | 6 + internal/test/e2e/cluster_test.go | 105 +++++--- internal/test/e2e/gouging_test.go | 2 +- .../main/migration_00014_reset_drift.sql | 1 + .../main/migration_00014_reset_drift.sql | 1 + worker/host.go | 225 ++++++++++-------- worker/host_test.go | 12 +- worker/pricetables.go | 12 +- worker/pricetables_test.go | 14 +- worker/rhpv3.go | 71 +++--- worker/worker.go | 24 +- 12 files changed, 304 insertions(+), 235 deletions(-) create mode 100644 stores/sql/mysql/migrations/main/migration_00014_reset_drift.sql create mode 100644 stores/sql/sqlite/migrations/main/migration_00014_reset_drift.sql diff --git a/bus/accounts.go b/bus/accounts.go index d072de1c7..7c8123001 100644 --- a/bus/accounts.go +++ b/bus/accounts.go @@ -147,35 +147,43 @@ func (a *accounts) AddAmount(id rhpv3.Account, hk types.PublicKey, amt *big.Int) } // SetBalance sets the balance of a given account to the provided amount. If the -// account doesn't exist, it is created. -// If an account hasn't been saved successfully upon the last shutdown, no drift -// will be added upon the first call to SetBalance. +// account doesn't exist, it is created. If an account hasn't been saved +// successfully upon the last shutdown, no drift will be added upon the first +// call to SetBalance. func (a *accounts) SetBalance(id rhpv3.Account, hk types.PublicKey, balance *big.Int) { acc := a.account(id, hk) - // Update balance and drift. acc.mu.Lock() - delta := new(big.Int).Sub(balance, acc.Balance) - balanceBefore := acc.Balance.String() - driftBefore := acc.Drift.String() + defer acc.mu.Unlock() + + // save previous values + prevBalance := new(big.Int).Set(acc.Balance) + prevDrift := new(big.Int).Set(acc.Drift) + + // update balance + acc.Balance.Set(balance) + + // update drift + drift := new(big.Int).Sub(balance, prevBalance) if acc.CleanShutdown { - acc.Drift = acc.Drift.Add(acc.Drift, delta) + acc.Drift = acc.Drift.Add(acc.Drift, drift) } - acc.Balance.Set(balance) + + // reset fields acc.CleanShutdown = true - acc.RequiresSync = false // resetting the balance resets the sync field - balanceAfter := acc.Balance.String() - acc.mu.Unlock() + acc.RequiresSync = false - // Log resets. + // log account changes a.logger.Infow("account balance was reset", - "account", acc.ID, - "host", acc.HostKey.String(), - "balanceBefore", balanceBefore, - "balanceAfter", balanceAfter, - "driftBefore", driftBefore, - "driftAfter", acc.Drift.String(), - "delta", delta.String()) + zap.Stringer("account", acc.ID), + zap.Stringer("host", acc.HostKey), + zap.Stringer("balanceBefore", prevBalance), + zap.Stringer("balanceAfter", balance), + zap.Stringer("driftBefore", prevDrift), + zap.Stringer("driftAfter", acc.Drift), + zap.Bool("firstDrift", acc.Drift.Cmp(big.NewInt(0)) != 0 && prevDrift.Cmp(big.NewInt(0)) == 0), + zap.Bool("cleanshutdown", acc.CleanShutdown), + zap.Stringer("drift", drift)) } // ScheduleSync sets the requiresSync flag of an account. @@ -245,13 +253,24 @@ func (a *accounts) Accounts() []api.Account { // ResetDrift resets the drift on an account. func (a *accounts) ResetDrift(id rhpv3.Account) error { a.mu.Lock() - account, exists := a.byID[id] + acc, exists := a.byID[id] if !exists { a.mu.Unlock() return errAccountsNotFound } a.mu.Unlock() - account.resetDrift() + + acc.mu.Lock() + driftBefore := acc.Drift.String() + acc.mu.Unlock() + + acc.resetDrift() + + a.logger.Infow("account drift was reset", + zap.Stringer("account", acc.ID), + zap.Stringer("host", acc.HostKey), + zap.String("driftBefore", driftBefore)) + return nil } @@ -280,7 +299,6 @@ func (a *accounts) account(id rhpv3.Account, hk types.PublicKey) *account { a.mu.Lock() defer a.mu.Unlock() - // Create account if it doesn't exist. acc, exists := a.byID[id] if !exists { acc = &account{ @@ -290,7 +308,7 @@ func (a *accounts) account(id rhpv3.Account, hk types.PublicKey) *account { HostKey: hk, Balance: big.NewInt(0), Drift: big.NewInt(0), - RequiresSync: false, + RequiresSync: true, // initial sync }, locks: map[uint64]*accountLock{}, } diff --git a/internal/sql/migrations.go b/internal/sql/migrations.go index cfb735de2..83d29d27c 100644 --- a/internal/sql/migrations.go +++ b/internal/sql/migrations.go @@ -193,6 +193,12 @@ var ( return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00013_coreutils_wallet", log) }, }, + { + ID: "00014_reset_drift", + Migrate: func(tx Tx) error { + return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00014_reset_drift", log) + }, + }, } } MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration { diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index a410c892c..92a6e3fbf 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -1121,54 +1121,65 @@ func TestEphemeralAccounts(t *testing.T) { t.SkipNow() } - // Create cluster - cluster := newTestCluster(t, testClusterOptions{hosts: 1}) + // run without autopilot + opts := clusterOptsDefault + opts.skipRunningAutopilot = true + + // create cluster + cluster := newTestCluster(t, opts) defer cluster.Shutdown() + + // convenience variables + b := cluster.Bus + w := cluster.Worker tt := cluster.tt - // Shut down the autopilot to prevent it from interfering. - cluster.ShutdownAutopilot(context.Background()) + tt.OK(b.UpdateSetting(context.Background(), api.SettingRedundancy, api.RedundancySettings{ + MinShards: 1, + TotalShards: 1, + })) + // add a host + hosts := cluster.AddHosts(1) + h, err := b.Host(context.Background(), hosts[0].PublicKey()) + tt.OK(err) - // Wait for contract and accounts. - contract := cluster.WaitForContracts()[0] - accounts := cluster.WaitForAccounts() + // scan the host + _, err = w.RHPScan(context.Background(), h.PublicKey, h.NetAddress, 10*time.Second) + tt.OK(err) - // Shut down the autopilot to prevent it from interfering with the test. - cluster.ShutdownAutopilot(context.Background()) + // manually form a contract with the host + cs, _ := b.ConsensusState(context.Background()) + wallet, _ := b.Wallet(context.Background()) + rev, _, err := w.RHPForm(context.Background(), cs.BlockHeight+test.AutopilotConfig.Contracts.Period+test.AutopilotConfig.Contracts.RenewWindow, h.PublicKey, h.NetAddress, wallet.Address, types.Siacoins(10), types.Siacoins(1)) + tt.OK(err) + c, err := b.AddContract(context.Background(), rev, rev.Revision.MissedHostPayout().Sub(types.Siacoins(1)), types.Siacoins(1), cs.BlockHeight, api.ContractStatePending) + tt.OK(err) - // Newly created accounts are !cleanShutdown. Simulate a sync to change - // that. - for _, acc := range accounts { - if acc.CleanShutdown { - t.Fatal("new account should indicate an unclean shutdown") - } else if acc.RequiresSync { - t.Fatal("new account should not require a sync") - } - if err := cluster.Bus.SetBalance(context.Background(), acc.ID, acc.HostKey, types.Siacoins(1).Big()); err != nil { - t.Fatal(err) - } - } + tt.OK(b.SetContractSet(context.Background(), test.ContractSet, []types.FileContractID{c.ID})) - // Fetch accounts again. + // fund the account + fundAmt := types.Siacoins(1) + tt.OK(w.RHPFund(context.Background(), c.ID, c.HostKey, c.HostIP, c.SiamuxAddr, fundAmt)) + + // fetch accounts accounts, err := cluster.Bus.Accounts(context.Background()) tt.OK(err) + // assert account state acc := accounts[0] - if acc.Balance.Cmp(types.Siacoins(1).Big()) < 0 { - t.Fatalf("wrong balance %v", acc.Balance) - } if acc.ID == (rhpv3.Account{}) { t.Fatal("account id not set") - } - host := cluster.hosts[0] - if acc.HostKey != types.PublicKey(host.PublicKey()) { + } else if acc.CleanShutdown { + t.Fatal("account should indicate an unclean shutdown") + } else if !acc.RequiresSync { + t.Fatal("account should require a sync") + } else if acc.HostKey != h.PublicKey { t.Fatal("wrong host") - } - if !acc.CleanShutdown { - t.Fatal("account should indicate a clean shutdown") + } else if acc.Balance.Cmp(types.Siacoins(1).Big()) < 0 { + t.Fatalf("wrong balance %v", acc.Balance) } - // Fetch account from bus directly. + // fetch account from bus directly busAccounts, err := cluster.Bus.Accounts(context.Background()) tt.OK(err) if len(busAccounts) != 1 { @@ -1179,12 +1190,11 @@ func TestEphemeralAccounts(t *testing.T) { t.Fatal("bus account doesn't match worker account") } - // Check that the spending was recorded for the contract. The recorded + // check that the spending was recorded for the contract. The recorded // spending should be > the fundAmt since it consists of the fundAmt plus // fee. - fundAmt := types.Siacoins(1) tt.Retry(10, testBusFlushInterval, func() error { - cm, err := cluster.Bus.Contract(context.Background(), contract.ID) + cm, err := cluster.Bus.Contract(context.Background(), c.ID) tt.OK(err) if cm.Spending.FundAccount.Cmp(fundAmt) <= 0 { @@ -1193,7 +1203,24 @@ func TestEphemeralAccounts(t *testing.T) { return nil }) - // Update the balance to create some drift. + // sync the account + tt.OK(w.RHPSync(context.Background(), c.ID, acc.HostKey, c.HostIP, c.SiamuxAddr)) + + // assert account state + accounts, err = cluster.Bus.Accounts(context.Background()) + tt.OK(err) + + // assert account state + acc = accounts[0] + if !acc.CleanShutdown { + t.Fatal("account should indicate a clean shutdown") + } else if acc.RequiresSync { + t.Fatal("account should not require a sync") + } else if acc.Drift.Cmp(new(big.Int)) != 0 { + t.Fatalf("account shoult not have drift %v", acc.Drift) + } + + // update the balance to create some drift newBalance := fundAmt.Div64(2) newDrift := new(big.Int).Sub(newBalance.Big(), fundAmt.Big()) if err := cluster.Bus.SetBalance(context.Background(), busAcc.ID, acc.HostKey, newBalance.Big()); err != nil { @@ -1207,11 +1234,11 @@ func TestEphemeralAccounts(t *testing.T) { t.Fatalf("drift was %v but should be %v", busAcc.Drift, maxNewDrift) } - // Reboot cluster. + // reboot cluster cluster2 := cluster.Reboot(t) defer cluster2.Shutdown() - // Check that accounts were loaded from the bus. + // check that accounts were loaded from the bus accounts2, err := cluster2.Bus.Accounts(context.Background()) tt.OK(err) for _, acc := range accounts2 { @@ -1224,7 +1251,7 @@ func TestEphemeralAccounts(t *testing.T) { } } - // Reset drift again. + // reset drift again if err := cluster2.Bus.ResetDrift(context.Background(), acc.ID); err != nil { t.Fatal(err) } diff --git a/internal/test/e2e/gouging_test.go b/internal/test/e2e/gouging_test.go index a40fe0024..b2efe2efe 100644 --- a/internal/test/e2e/gouging_test.go +++ b/internal/test/e2e/gouging_test.go @@ -187,7 +187,7 @@ func TestAccountFunding(t *testing.T) { time.Sleep(defaultHostSettings.PriceTableValidity) // fund the account again - tt.OK(w.RHPFund(context.Background(), c.ID, c.HostKey, c.HostIP, c.SiamuxAddr, types.Siacoins(1))) + tt.OK(w.RHPFund(context.Background(), c.ID, c.HostKey, c.HostIP, c.SiamuxAddr, types.Siacoins(1).Div64(2))) } func TestHostMinVersion(t *testing.T) { diff --git a/stores/sql/mysql/migrations/main/migration_00014_reset_drift.sql b/stores/sql/mysql/migrations/main/migration_00014_reset_drift.sql new file mode 100644 index 000000000..c151d90a3 --- /dev/null +++ b/stores/sql/mysql/migrations/main/migration_00014_reset_drift.sql @@ -0,0 +1 @@ +UPDATE ephemeral_accounts SET drift = "0", clean_shutdown = 0, requires_sync = 1; \ No newline at end of file diff --git a/stores/sql/sqlite/migrations/main/migration_00014_reset_drift.sql b/stores/sql/sqlite/migrations/main/migration_00014_reset_drift.sql new file mode 100644 index 000000000..c151d90a3 --- /dev/null +++ b/stores/sql/sqlite/migrations/main/migration_00014_reset_drift.sql @@ -0,0 +1 @@ +UPDATE ephemeral_accounts SET drift = "0", clean_shutdown = 0, requires_sync = 1; \ No newline at end of file diff --git a/worker/host.go b/worker/host.go index 56b5876d0..15e77e79b 100644 --- a/worker/host.go +++ b/worker/host.go @@ -26,11 +26,12 @@ type ( DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error - FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt api.HostPriceTable, err error) + FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (api.HostPriceTable, types.Currency, error) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error) + AccountBalance(ctx context.Context, rev *types.FileContractRevision) (types.Currency, types.Currency, error) FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error - SyncAccount(ctx context.Context, rev *types.FileContractRevision) error + SyncAccount(ctx context.Context, rev *types.FileContractRevision) (types.Currency, error) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _, _ types.Currency, err error) } @@ -81,21 +82,6 @@ func (w *Worker) Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr func (h *host) PublicKey() types.PublicKey { return h.hk } func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) (err error) { - pt, err := h.priceTables.fetch(ctx, h.hk, nil) - if err != nil { - return err - } - hpt := pt.HostPriceTable - - // check for download gouging specifically - gc, err := GougingCheckerFromContext(ctx, overpay) - if err != nil { - return err - } - if breakdown := gc.Check(nil, &hpt); breakdown.DownloadErr != "" { - return fmt.Errorf("%w: %v", errPriceTableGouging, breakdown.DownloadErr) - } - // return errBalanceInsufficient if balance insufficient defer func() { if isBalanceInsufficient(err) { @@ -103,8 +89,23 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 } }() - return h.acc.WithWithdrawal(ctx, func() (amount types.Currency, err error) { - err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { + var amount types.Currency + return h.acc.WithWithdrawal(ctx, func() (types.Currency, error) { + if err := h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { + pt, err := h.priceTables.fetch(ctx, h.hk, nil, &amount) + if err != nil { + return err + } + hpt := pt.HostPriceTable + + gc, err := GougingCheckerFromContext(ctx, overpay) + if err != nil { + return err + } + if breakdown := gc.Check(nil, &hpt); breakdown.DownloadErr != "" { + return fmt.Errorf("%w: %v", errPriceTableGouging, breakdown.DownloadErr) + } + cost, err := readSectorCost(hpt, uint64(length)) if err != nil { return err @@ -115,18 +116,23 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 if err != nil { return err } - - amount = cost.Sub(refund) + amount = amount.Add(cost) + amount = amount.Sub(refund) return nil - }) - return + }); err != nil { + return types.ZeroCurrency, err + } + return amount, nil }) } -func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (err error) { +func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) error { // fetch price table - pt, err := h.priceTable(ctx, nil) - if err != nil { + var pt rhpv3.HostPriceTable + if err := h.acc.WithWithdrawal(ctx, func() (amount types.Currency, err error) { + pt, err = h.priceTable(ctx, nil, &amount) + return + }); err != nil { return err } @@ -161,16 +167,19 @@ func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, secto } func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _, _ types.Currency, err error) { - // Try to get a valid pricetable. + // try to get a valid pricetable. ptCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() var pt *rhpv3.HostPriceTable - hpt, err := h.priceTables.fetch(ptCtx, h.hk, nil) - if err == nil { - pt = &hpt.HostPriceTable - } else { + if err := h.acc.WithWithdrawal(ptCtx, func() (amount types.Currency, _ error) { + hpt, err := h.priceTables.fetch(ptCtx, h.hk, nil, &amount) + if err == nil { + pt = &hpt.HostPriceTable + } + return amount, err + }); err != nil { h.logger.Infof("unable to fetch price table for renew: %v", err) } + cancel() var contractPrice types.Currency var rev rhpv2.ContractRevision @@ -194,42 +203,57 @@ func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rh return rev, txnSet, contractPrice, fundAmount, renewErr } -func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt api.HostPriceTable, err error) { - // fetchPT is a helper function that performs the RPC given a payment function - fetchPT := func(paymentFn PriceTablePaymentFunc) (hpt api.HostPriceTable, err error) { - err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { - hpt, err = RPCPriceTable(ctx, t, paymentFn) +func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt api.HostPriceTable, cost types.Currency, _ error) { + return hpt, cost, h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { + // pay by contract + if rev != nil { + hpt, err = RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { + payment, err := payByContract(rev, pt.UpdatePriceTableCost, rhpv3.Account(h.accountKey.PublicKey()), h.renterKey) + if err != nil { + return nil, err + } + return &payment, nil + }) return + } + + // pay by account + hpt, err = RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { + cost = pt.UpdatePriceTableCost + payment := rhpv3.PayByEphemeralAccount(rhpv3.Account(h.accountKey.PublicKey()), cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey) + return &payment, nil }) return - } - - // pay by contract if a revision is given - if rev != nil { - return fetchPT(h.preparePriceTableContractPayment(rev)) - } - - // pay by account - return fetchPT(h.preparePriceTableAccountPayment()) + }) } -func (h *host) FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error { - // fetch current balance - curr, err := h.acc.Balance(ctx) - if err != nil { - return err +func (h *host) FundAccount(ctx context.Context, desired types.Currency, rev *types.FileContractRevision) error { + log := h.logger.With( + zap.Stringer("host", h.hk), + zap.Stringer("account", h.acc.id), + ) + + // ensure we have at least 2H in the contract to cover the costs + if types.NewCurrency64(2).Cmp(rev.ValidRenterPayout()) >= 0 { + return fmt.Errorf("insufficient funds to fund account: %v <= %v", rev.ValidRenterPayout(), types.NewCurrency64(2)) } - // return early if we have the desired balance - if curr.Cmp(balance) >= 0 { + // sync account and return early if possible, we sync to ensure we + // effectively top up the account to the desired amount while keeping track + // of potential drift + balance, err := h.SyncAccount(ctx, rev) + if err == nil && balance.Cmp(desired) >= 0 { return nil + } else if err != nil { + log.Errorw("failed to sync account", zap.Error(err)) } - deposit := balance.Sub(curr) + // calculate the deposit amount + deposit := desired.Sub(balance) return h.acc.WithDeposit(ctx, func() (types.Currency, error) { if err := h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { // fetch pricetable directly to bypass the gouging check - pt, err := h.priceTables.fetch(ctx, h.hk, rev) + pt, err := h.priceTables.fetch(ctx, h.hk, rev, nil) if err != nil { return err } @@ -242,14 +266,9 @@ func (h *host) FundAccount(ctx context.Context, balance types.Currency, rev *typ return fmt.Errorf("%w: %v", errPriceTableGouging, err) } - // check whether we have money left in the contract + // cap the deposit by what's left in the contract cost := types.NewCurrency64(1) - if cost.Cmp(rev.ValidRenterPayout()) >= 0 { - return fmt.Errorf("insufficient funds to fund account: %v <= %v", rev.ValidRenterPayout(), cost) - } availableFunds := rev.ValidRenterPayout().Sub(cost) - - // cap the deposit amount by the money that's left in the contract if deposit.Cmp(availableFunds) > 0 { deposit = availableFunds } @@ -268,6 +287,13 @@ func (h *host) FundAccount(ctx context.Context, balance types.Currency, rev *typ // record the spend h.contractSpendingRecorder.Record(*rev, api.ContractSpending{FundAccount: amount}) + + // log the account balance after funding + log.Debugw("fund account succeeded", + "balance", balance.ExactString(), + "deposit", deposit.ExactString(), + ) + return nil }); err != nil { return types.ZeroCurrency, err @@ -276,24 +302,56 @@ func (h *host) FundAccount(ctx context.Context, balance types.Currency, rev *typ }) } -func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision) error { +func (h *host) AccountBalance(ctx context.Context, rev *types.FileContractRevision) (types.Currency, types.Currency, error) { // fetch pricetable directly to bypass the gouging check - pt, err := h.priceTables.fetch(ctx, h.hk, rev) + pt, err := h.priceTables.fetch(ctx, h.hk, rev, nil) if err != nil { - return err + return types.ZeroCurrency, types.ZeroCurrency, err } // check only the unused defaults gc, err := GougingCheckerFromContext(ctx, false) if err != nil { + return types.ZeroCurrency, types.ZeroCurrency, err + } else if err := gc.CheckUnusedDefaults(pt.HostPriceTable); err != nil { + return types.ZeroCurrency, types.ZeroCurrency, fmt.Errorf("%w: %v", errPriceTableGouging, err) + } + renterBalance, err := h.acc.Balance(ctx) + if err != nil { + return types.ZeroCurrency, types.ZeroCurrency, err + } + + var hostBalance types.Currency + err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { + payment, err := payByContract(rev, types.NewCurrency64(1), h.acc.id, h.renterKey) + if err != nil { + return err + } + hostBalance, err = RPCAccountBalance(ctx, t, &payment, h.acc.id, pt.UID) return err + }) + return renterBalance, hostBalance, err +} + +func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision) (types.Currency, error) { + // fetch pricetable directly to bypass the gouging check + pt, err := h.priceTables.fetch(ctx, h.hk, rev, nil) + if err != nil { + return types.ZeroCurrency, err + } + + // check only the unused defaults + gc, err := GougingCheckerFromContext(ctx, false) + if err != nil { + return types.ZeroCurrency, err } else if err := gc.CheckUnusedDefaults(pt.HostPriceTable); err != nil { - return fmt.Errorf("%w: %v", errPriceTableGouging, err) + return types.ZeroCurrency, fmt.Errorf("%w: %v", errPriceTableGouging, err) } - return h.acc.WithSync(ctx, func() (types.Currency, error) { - var balance types.Currency - err := h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { + // sync the account + var balance types.Currency + return balance, h.acc.WithSync(ctx, func() (types.Currency, error) { + return balance, h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { payment, err := payByContract(rev, types.NewCurrency64(1), h.acc.id, h.renterKey) if err != nil { return err @@ -301,36 +359,5 @@ func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision) balance, err = RPCAccountBalance(ctx, t, &payment, h.acc.id, pt.UID) return err }) - return balance, err }) } - -// preparePriceTableAccountPayment prepare a payment function to pay for a price -// table from the given host using the provided revision. -// -// NOTE: This is the preferred way of paying for a price table since it is -// faster and doesn't require locking a contract. -func (h *host) preparePriceTableAccountPayment() PriceTablePaymentFunc { - return func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { - account := rhpv3.Account(h.accountKey.PublicKey()) - payment := rhpv3.PayByEphemeralAccount(account, pt.UpdatePriceTableCost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey) - return &payment, nil - } -} - -// preparePriceTableContractPayment prepare a payment function to pay for a -// price table from the given host using the provided revision. -// -// NOTE: This way of paying for a price table should only be used if payment by -// EA is not possible or if we already need a contract revision anyway. e.g. -// funding an EA. -func (h *host) preparePriceTableContractPayment(rev *types.FileContractRevision) PriceTablePaymentFunc { - return func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { - refundAccount := rhpv3.Account(h.accountKey.PublicKey()) - payment, err := payByContract(rev, pt.UpdatePriceTableCost, refundAccount, h.renterKey) - if err != nil { - return nil, err - } - return &payment, nil - } -} diff --git a/worker/host_test.go b/worker/host_test.go index e329e4b90..d7d97afd7 100644 --- a/worker/host_test.go +++ b/worker/host_test.go @@ -110,8 +110,8 @@ func (h *testHost) FetchRevision(ctx context.Context, fetchTimeout time.Duration return rev, nil } -func (h *testHost) FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (api.HostPriceTable, error) { - return h.hptFn(), nil +func (h *testHost) FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (api.HostPriceTable, types.Currency, error) { + return h.hptFn(), types.ZeroCurrency, nil } func (h *testHost) FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error { @@ -122,8 +122,12 @@ func (h *testHost) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) ( return rhpv2.ContractRevision{}, nil, types.Currency{}, types.Currency{}, nil } -func (h *testHost) SyncAccount(ctx context.Context, rev *types.FileContractRevision) error { - return nil +func (h *testHost) SyncAccount(ctx context.Context, rev *types.FileContractRevision) (types.Currency, error) { + return types.ZeroCurrency, nil +} + +func (h *testHost) AccountBalance(ctx context.Context, rev *types.FileContractRevision) (types.Currency, types.Currency, error) { + return types.ZeroCurrency, types.ZeroCurrency, nil } func TestHost(t *testing.T) { diff --git a/worker/pricetables.go b/worker/pricetables.go index 3ffdef459..3d6521f7a 100644 --- a/worker/pricetables.go +++ b/worker/pricetables.go @@ -75,7 +75,7 @@ func newPriceTables(hm HostManager, hs HostStore) *priceTables { } // fetch returns a price table for the given host -func (pts *priceTables) fetch(ctx context.Context, hk types.PublicKey, rev *types.FileContractRevision) (api.HostPriceTable, error) { +func (pts *priceTables) fetch(ctx context.Context, hk types.PublicKey, rev *types.FileContractRevision, amount *types.Currency) (api.HostPriceTable, error) { pts.mu.Lock() pt, exists := pts.priceTables[hk] if !exists { @@ -88,7 +88,7 @@ func (pts *priceTables) fetch(ctx context.Context, hk types.PublicKey, rev *type } pts.mu.Unlock() - return pt.fetch(ctx, rev) + return pt.fetch(ctx, rev, amount) } func (pt *priceTable) ongoingUpdate() (bool, *priceTableUpdate) { @@ -105,7 +105,7 @@ func (pt *priceTable) ongoingUpdate() (bool, *priceTableUpdate) { return ongoing, pt.update } -func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision) (hpt api.HostPriceTable, err error) { +func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision, amount *types.Currency) (hpt api.HostPriceTable, err error) { // grab the current price table p.mu.Lock() hpt = p.hpt @@ -170,8 +170,12 @@ func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision) } // otherwise fetch it + var cost types.Currency h := p.hm.Host(p.hk, types.FileContractID{}, host.Settings.SiamuxAddr()) - hpt, err = h.FetchPriceTable(ctx, rev) + hpt, cost, err = h.FetchPriceTable(ctx, rev) + if amount != nil { + *amount = amount.Add(cost) + } // record it in the background if shouldRecordPriceTable(err) { diff --git a/worker/pricetables_test.go b/worker/pricetables_test.go index 22c021ccb..58dc4e8fc 100644 --- a/worker/pricetables_test.go +++ b/worker/pricetables_test.go @@ -50,7 +50,7 @@ func TestPriceTables(t *testing.T) { })) // trigger a fetch to make it block - go pts.fetch(gCtx, h.hk, nil) + go pts.fetch(gCtx, h.hk, nil, nil) time.Sleep(50 * time.Millisecond) // fetch it again but with a canceled context to avoid blocking @@ -58,14 +58,14 @@ func TestPriceTables(t *testing.T) { // update ctx, cancel := context.WithCancel(gCtx) cancel() - _, err := pts.fetch(ctx, h.hk, nil) + _, err := pts.fetch(ctx, h.hk, nil, nil) if !errors.Is(err, errPriceTableUpdateTimedOut) { t.Fatal("expected errPriceTableUpdateTimedOut, got", err) } // unblock and assert we receive a valid price table close(fetchPTBlockChan) - update, err := pts.fetch(gCtx, h.hk, nil) + update, err := pts.fetch(gCtx, h.hk, nil, nil) if err != nil { t.Fatal(err) } else if update.UID != validPT.UID { @@ -75,7 +75,7 @@ func TestPriceTables(t *testing.T) { // refresh the price table on the host, update again, assert we receive the // same price table as it hasn't expired yet h.hi.PriceTable = newTestHostPriceTable() - update, err = pts.fetch(gCtx, h.hk, nil) + update, err = pts.fetch(gCtx, h.hk, nil, nil) if err != nil { t.Fatal(err) } else if update.UID != validPT.UID { @@ -86,7 +86,7 @@ func TestPriceTables(t *testing.T) { pts.priceTables[h.hk].hpt.Expiry = time.Now() // fetch it again and assert we updated the price table - update, err = pts.fetch(gCtx, h.hk, nil) + update, err = pts.fetch(gCtx, h.hk, nil, nil) if err != nil { t.Fatal(err) } else if update.UID != h.hi.PriceTable.UID { @@ -97,7 +97,7 @@ func TestPriceTables(t *testing.T) { // the price table since it's not expired validPT = h.hi.PriceTable h.hi.PriceTable = newTestHostPriceTable() - update, err = pts.fetch(gCtx, h.hk, nil) + update, err = pts.fetch(gCtx, h.hk, nil, nil) if err != nil { t.Fatal(err) } else if update.UID != validPT.UID { @@ -110,7 +110,7 @@ func TestPriceTables(t *testing.T) { cm.cs.BlockHeight = validPT.HostBlockHeight + uint64(blockHeightLeeway) - priceTableBlockHeightLeeway // fetch it again and assert we updated the price table - update, err = pts.fetch(gCtx, h.hk, nil) + update, err = pts.fetch(gCtx, h.hk, nil, nil) if err != nil { t.Fatal(err) } else if update.UID != h.hi.PriceTable.UID { diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 2c0739703..1cd6a0b8e 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -58,10 +58,6 @@ var ( // account balance was insufficient. errBalanceInsufficient = errors.New("ephemeral account balance was insufficient") - // errBalanceMaxExceeded occurs when a deposit would push the account's - // balance over the maximum allowed ephemeral account balance. - errBalanceMaxExceeded = errors.New("ephemeral account maximum balance exceeded") - // errMaxRevisionReached occurs when trying to revise a contract that has // already reached the highest possible revision number. Usually happens // when trying to use a renewed contract. @@ -96,7 +92,6 @@ func IsErrHost(err error) bool { } func isBalanceInsufficient(err error) bool { return utils.IsErr(err, errBalanceInsufficient) } -func isBalanceMaxExceeded(err error) bool { return utils.IsErr(err, errBalanceMaxExceeded) } func isClosedStream(err error) bool { return utils.IsErr(err, mux.ErrClosedStream) || utils.IsErr(err, net.ErrClosed) } @@ -323,26 +318,25 @@ func (h *host) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (t return rev, nil } -func (h *host) fetchRevisionWithAccount(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, fcid types.FileContractID) (rev types.FileContractRevision, err error) { - err = h.acc.WithWithdrawal(ctx, func() (types.Currency, error) { - var cost types.Currency - return cost, h.transportPool.withTransportV3(ctx, hostKey, siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { +func (h *host) fetchRevisionWithAccount(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, fcid types.FileContractID) (rev types.FileContractRevision, _ error) { + var amount types.Currency + return rev, h.acc.WithWithdrawal(ctx, func() (types.Currency, error) { + if err := h.transportPool.withTransportV3(ctx, hostKey, siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { rev, err = RPCLatestRevision(ctx, t, fcid, func(rev *types.FileContractRevision) (rhpv3.HostPriceTable, rhpv3.PaymentMethod, error) { - pt, err := h.priceTable(ctx, nil) + pt, err := h.priceTable(ctx, nil, &amount) if err != nil { return rhpv3.HostPriceTable{}, nil, fmt.Errorf("failed to fetch pricetable, err: %w", err) } - cost = pt.LatestRevisionCost.Add(pt.UpdatePriceTableCost) // add cost of fetching the pricetable since we might need a new one and it's better to stay pessimistic - payment := rhpv3.PayByEphemeralAccount(h.acc.id, cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey) + amount = amount.Add(pt.LatestRevisionCost) + payment := rhpv3.PayByEphemeralAccount(h.acc.id, amount, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey) return pt, &payment, nil }) - if err != nil { - return err - } - return nil - }) + return + }); err != nil { + return types.ZeroCurrency, err + } + return amount, nil }) - return rev, err } // FetchRevisionWithContract fetches the latest revision of a contract and uses @@ -350,12 +344,10 @@ func (h *host) fetchRevisionWithAccount(ctx context.Context, hostKey types.Publi func (h *host) fetchRevisionWithContract(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, contractID types.FileContractID) (rev types.FileContractRevision, err error) { err = h.transportPool.withTransportV3(ctx, hostKey, siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { rev, err = RPCLatestRevision(ctx, t, contractID, func(rev *types.FileContractRevision) (rhpv3.HostPriceTable, rhpv3.PaymentMethod, error) { - // Fetch pt. - pt, err := h.priceTable(ctx, rev) + pt, err := h.priceTable(ctx, rev, nil) if err != nil { return rhpv3.HostPriceTable{}, nil, fmt.Errorf("failed to fetch pricetable, err: %v", err) } - // Pay for the revision. payment, err := payByContract(rev, pt.LatestRevisionCost, h.acc.id, h.renterKey) if err != nil { return rhpv3.HostPriceTable{}, nil, err @@ -382,17 +374,19 @@ type ( // accounts stores the balance and other metrics of accounts that the // worker maintains with a host. accounts struct { - as AccountStore - key types.PrivateKey + as AccountStore + key types.PrivateKey + logger *zap.SugaredLogger } // account contains information regarding a specific account of the // worker. account struct { - as AccountStore - id rhpv3.Account - key types.PrivateKey - host types.PublicKey + as AccountStore + id rhpv3.Account + key types.PrivateKey + host types.PublicKey + logger *zap.SugaredLogger } ) @@ -401,8 +395,9 @@ func (w *Worker) initAccounts(as AccountStore) { panic("accounts already initialized") // developer error } w.accounts = &accounts{ - as: as, - key: w.deriveSubKey("accountkey"), + as: as, + key: w.deriveSubKey("accountkey"), + logger: w.logger.Named("accounts"), } } @@ -418,10 +413,11 @@ func (w *Worker) initTransportPool() { func (a *accounts) ForHost(hk types.PublicKey) *account { accountID := rhpv3.Account(a.deriveAccountKey(hk).PublicKey()) return &account{ - as: a.as, - id: accountID, - key: a.key, - host: hk, + as: a.as, + id: accountID, + key: a.key, + host: hk, + logger: a.logger.With("account", accountID).With("host", hk), } } @@ -533,10 +529,11 @@ func (a *accounts) deriveAccountKey(hostKey types.PublicKey) types.PrivateKey { } // priceTable fetches a price table from the host. If a revision is provided, it -// will be used to pay for the price table. The returned price table is -// guaranteed to be safe to use. -func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) (rhpv3.HostPriceTable, error) { - pt, err := h.priceTables.fetch(ctx, h.hk, rev) +// will be used to pay for the price table. If not it will be paid for using an +// ephemeral account, in which case the given amount parameter will get updated. +// The returned price table is guaranteed to be safe to use. +func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision, amount *types.Currency) (rhpv3.HostPriceTable, error) { + pt, err := h.priceTables.fetch(ctx, h.hk, rev, amount) if err != nil { return rhpv3.HostPriceTable{}, err } diff --git a/worker/worker.go b/worker/worker.go index 1456f808a..d643612f5 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -96,7 +96,6 @@ type ( LockAccount(ctx context.Context, id rhpv3.Account, hostKey types.PublicKey, exclusive bool, duration time.Duration) (api.Account, uint64, error) UnlockAccount(ctx context.Context, id rhpv3.Account, lockID uint64) error - ResetDrift(ctx context.Context, id rhpv3.Account) error SetBalance(ctx context.Context, id rhpv3.Account, hk types.PublicKey, amt *big.Int) error ScheduleSync(ctx context.Context, id rhpv3.Account, hk types.PublicKey) error } @@ -697,24 +696,8 @@ func (w *Worker) rhpFundHandler(jc jape.Context) { ctx = WithGougingChecker(ctx, w.bus, gp) // fund the account - jc.Check("couldn't fund account", w.withRevision(ctx, defaultRevisionFetchTimeout, rfr.ContractID, rfr.HostKey, rfr.SiamuxAddr, lockingPriorityFunding, func(rev types.FileContractRevision) (err error) { - h := w.Host(rfr.HostKey, rev.ParentID, rfr.SiamuxAddr) - err = h.FundAccount(ctx, rfr.Balance, &rev) - if isBalanceMaxExceeded(err) { - // sync the account - err = h.SyncAccount(ctx, &rev) - if err != nil { - w.logger.Infof(fmt.Sprintf("failed to sync account: %v", err), "host", rfr.HostKey) - return - } - - // try funding the account again - err = h.FundAccount(ctx, rfr.Balance, &rev) - if err != nil { - w.logger.Errorw(fmt.Sprintf("failed to fund account after syncing: %v", err), "host", rfr.HostKey, "balance", rfr.Balance) - } - } - return + jc.Check("couldn't fund account", w.withRevision(ctx, defaultRevisionFetchTimeout, rfr.ContractID, rfr.HostKey, rfr.SiamuxAddr, lockingPriorityFunding, func(rev types.FileContractRevision) error { + return w.Host(rfr.HostKey, rev.ParentID, rfr.SiamuxAddr).FundAccount(ctx, rfr.Balance, &rev) })) } @@ -737,7 +720,8 @@ func (w *Worker) rhpSyncHandler(jc jape.Context) { // sync the account h := w.Host(rsr.HostKey, rsr.ContractID, rsr.SiamuxAddr) jc.Check("couldn't sync account", w.withRevision(ctx, defaultRevisionFetchTimeout, rsr.ContractID, rsr.HostKey, rsr.SiamuxAddr, lockingPrioritySyncing, func(rev types.FileContractRevision) error { - return h.SyncAccount(ctx, &rev) + _, err := h.SyncAccount(ctx, &rev) + return err })) } From 2cbade8adab06e85964770ce9a73352e6a1e79b9 Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 16 Aug 2024 11:13:24 +0200 Subject: [PATCH 02/13] worker: return amount --- worker/host.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/worker/host.go b/worker/host.go index 15e77e79b..37da032a1 100644 --- a/worker/host.go +++ b/worker/host.go @@ -91,7 +91,7 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 var amount types.Currency return h.acc.WithWithdrawal(ctx, func() (types.Currency, error) { - if err := h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { + err := h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { pt, err := h.priceTables.fetch(ctx, h.hk, nil, &amount) if err != nil { return err @@ -119,10 +119,8 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 amount = amount.Add(cost) amount = amount.Sub(refund) return nil - }); err != nil { - return types.ZeroCurrency, err - } - return amount, nil + }) + return amount, err }) } From 88cc0fb8863bb1f0a1b0c3e779abfbfa6780cd02 Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 16 Aug 2024 12:09:10 +0200 Subject: [PATCH 03/13] worker: do not sync before funding --- worker/host.go | 28 ++++++++++++++-------------- worker/host_test.go | 4 ++-- worker/worker.go | 3 +-- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/worker/host.go b/worker/host.go index 8f6fe334a..f6986d40d 100644 --- a/worker/host.go +++ b/worker/host.go @@ -32,7 +32,7 @@ type ( AccountBalance(ctx context.Context, rev *types.FileContractRevision) (types.Currency, types.Currency, error) FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error - SyncAccount(ctx context.Context, rev *types.FileContractRevision) (types.Currency, error) + SyncAccount(ctx context.Context, rev *types.FileContractRevision) error RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _, _ types.Currency, err error) } @@ -237,14 +237,15 @@ func (h *host) FundAccount(ctx context.Context, desired types.Currency, rev *typ return fmt.Errorf("insufficient funds to fund account: %v <= %v", rev.ValidRenterPayout(), types.NewCurrency64(2)) } - // sync account and return early if possible, we sync to ensure we - // effectively top up the account to the desired amount while keeping track - // of potential drift - balance, err := h.SyncAccount(ctx, rev) - if err == nil && balance.Cmp(desired) >= 0 { + // fetch current balance + balance, err := h.acc.Balance(ctx) + if err != nil { + return err + } + + // return early if we have the desired balance + if balance.Cmp(desired) >= 0 { return nil - } else if err != nil { - log.Errorw("failed to sync account", zap.Error(err)) } // calculate the deposit amount @@ -332,24 +333,23 @@ func (h *host) AccountBalance(ctx context.Context, rev *types.FileContractRevisi return renterBalance, hostBalance, err } -func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision) (types.Currency, error) { +func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision) error { // fetch pricetable directly to bypass the gouging check pt, err := h.priceTables.fetch(ctx, h.hk, rev, nil) if err != nil { - return types.ZeroCurrency, err + return err } // check only the unused defaults gc, err := GougingCheckerFromContext(ctx, false) if err != nil { - return types.ZeroCurrency, err + return err } else if err := gc.CheckUnusedDefaults(pt.HostPriceTable); err != nil { - return types.ZeroCurrency, fmt.Errorf("%w: %v", gouging.ErrPriceTableGouging, err) + return fmt.Errorf("%w: %v", gouging.ErrPriceTableGouging, err) } // sync the account - var balance types.Currency - return balance, h.acc.WithSync(ctx, func() (types.Currency, error) { + return h.acc.WithSync(ctx, func() (balance types.Currency, _ error) { return balance, h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { payment, err := payByContract(rev, types.NewCurrency64(1), h.acc.id, h.renterKey) if err != nil { diff --git a/worker/host_test.go b/worker/host_test.go index d7d97afd7..4ff146abc 100644 --- a/worker/host_test.go +++ b/worker/host_test.go @@ -122,8 +122,8 @@ func (h *testHost) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) ( return rhpv2.ContractRevision{}, nil, types.Currency{}, types.Currency{}, nil } -func (h *testHost) SyncAccount(ctx context.Context, rev *types.FileContractRevision) (types.Currency, error) { - return types.ZeroCurrency, nil +func (h *testHost) SyncAccount(ctx context.Context, rev *types.FileContractRevision) error { + return nil } func (h *testHost) AccountBalance(ctx context.Context, rev *types.FileContractRevision) (types.Currency, types.Currency, error) { diff --git a/worker/worker.go b/worker/worker.go index 6f368d788..82f8d583b 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -702,8 +702,7 @@ func (w *Worker) rhpSyncHandler(jc jape.Context) { // sync the account h := w.Host(rsr.HostKey, rsr.ContractID, rsr.SiamuxAddr) jc.Check("couldn't sync account", w.withRevision(ctx, defaultRevisionFetchTimeout, rsr.ContractID, rsr.HostKey, rsr.SiamuxAddr, lockingPrioritySyncing, func(rev types.FileContractRevision) error { - _, err := h.SyncAccount(ctx, &rev) - return err + return h.SyncAccount(ctx, &rev) })) } From a0a5dadfb1335a9833ec90967b518c42f0aef1b4 Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 16 Aug 2024 12:52:17 +0200 Subject: [PATCH 04/13] worker: add regression case to TestEphemeralAccounts --- internal/test/e2e/cluster_test.go | 13 +++++++++++-- worker/worker.go | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 92a6e3fbf..16aad40db 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -1144,7 +1144,7 @@ func TestEphemeralAccounts(t *testing.T) { tt.OK(err) // scan the host - _, err = w.RHPScan(context.Background(), h.PublicKey, h.NetAddress, 10*time.Second) + res, err := w.RHPScan(context.Background(), h.PublicKey, h.NetAddress, 10*time.Second) tt.OK(err) // manually form a contract with the host @@ -1175,7 +1175,7 @@ func TestEphemeralAccounts(t *testing.T) { t.Fatal("account should require a sync") } else if acc.HostKey != h.PublicKey { t.Fatal("wrong host") - } else if acc.Balance.Cmp(types.Siacoins(1).Big()) < 0 { + } else if acc.Balance.Cmp(types.Siacoins(1).Big()) != 0 { t.Fatalf("wrong balance %v", acc.Balance) } @@ -1220,6 +1220,15 @@ func TestEphemeralAccounts(t *testing.T) { t.Fatalf("account shoult not have drift %v", acc.Drift) } + // fetch a price table on the fly by triggering a renew + w.RHPRenew(context.Background(), c.ID, c.WindowStart, c.HostKey, c.SiamuxAddr, res.Settings.Address, wallet.Address, types.Siacoins(10), types.Siacoins(1), types.Siacoins(15), 1<<40, h.Settings.WindowSize) + + // assert we spent exactly 1H, the cost to fetch the price table + accounts, _ = cluster.Bus.Accounts(context.Background()) + if accounts[0].Balance.Cmp(types.Siacoins(1).Sub(types.NewCurrency64(1)).Big()) != 0 { + t.Fatalf("wrong balance %v", accounts[0].Balance) + } + // update the balance to create some drift newBalance := fundAmt.Div64(2) newDrift := new(big.Int).Sub(newBalance.Big(), fundAmt.Big()) diff --git a/worker/worker.go b/worker/worker.go index 82f8d583b..b2b00f026 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -637,7 +637,7 @@ func (w *Worker) rhpRenewHandler(jc jape.Context) { var renewed rhpv2.ContractRevision var txnSet []types.Transaction var contractPrice, fundAmount types.Currency - if jc.Check("couldn't renew contract", w.withRevision(ctx, defaultRevisionFetchTimeout, rrr.ContractID, rrr.HostKey, rrr.SiamuxAddr, lockingPriorityRenew, func(_ types.FileContractRevision) (err error) { + if jc.Check("couldn't renew contract", w.withContractLock(ctx, rrr.ContractID, lockingPriorityRenew, func() (err error) { h := w.Host(rrr.HostKey, rrr.ContractID, rrr.SiamuxAddr) renewed, txnSet, contractPrice, fundAmount, err = h.RenewContract(ctx, rrr) return err From 00fb85f64890b12a44a8ce14b32f86b2228572ca Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 16 Aug 2024 15:43:38 +0200 Subject: [PATCH 05/13] worker: remove AccountBalance --- worker/host.go | 32 -------------------------------- worker/host_test.go | 4 ---- 2 files changed, 36 deletions(-) diff --git a/worker/host.go b/worker/host.go index f6986d40d..78cf03953 100644 --- a/worker/host.go +++ b/worker/host.go @@ -30,7 +30,6 @@ type ( FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (api.HostPriceTable, types.Currency, error) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (types.FileContractRevision, error) - AccountBalance(ctx context.Context, rev *types.FileContractRevision) (types.Currency, types.Currency, error) FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error SyncAccount(ctx context.Context, rev *types.FileContractRevision) error @@ -302,37 +301,6 @@ func (h *host) FundAccount(ctx context.Context, desired types.Currency, rev *typ }) } -func (h *host) AccountBalance(ctx context.Context, rev *types.FileContractRevision) (types.Currency, types.Currency, error) { - // fetch pricetable directly to bypass the gouging check - pt, err := h.priceTables.fetch(ctx, h.hk, rev, nil) - if err != nil { - return types.ZeroCurrency, types.ZeroCurrency, err - } - - // check only the unused defaults - gc, err := GougingCheckerFromContext(ctx, false) - if err != nil { - return types.ZeroCurrency, types.ZeroCurrency, err - } else if err := gc.CheckUnusedDefaults(pt.HostPriceTable); err != nil { - return types.ZeroCurrency, types.ZeroCurrency, fmt.Errorf("%w: %v", gouging.ErrPriceTableGouging, err) - } - renterBalance, err := h.acc.Balance(ctx) - if err != nil { - return types.ZeroCurrency, types.ZeroCurrency, err - } - - var hostBalance types.Currency - err = h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { - payment, err := payByContract(rev, types.NewCurrency64(1), h.acc.id, h.renterKey) - if err != nil { - return err - } - hostBalance, err = RPCAccountBalance(ctx, t, &payment, h.acc.id, pt.UID) - return err - }) - return renterBalance, hostBalance, err -} - func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision) error { // fetch pricetable directly to bypass the gouging check pt, err := h.priceTables.fetch(ctx, h.hk, rev, nil) diff --git a/worker/host_test.go b/worker/host_test.go index 4ff146abc..53204739a 100644 --- a/worker/host_test.go +++ b/worker/host_test.go @@ -126,10 +126,6 @@ func (h *testHost) SyncAccount(ctx context.Context, rev *types.FileContractRevis return nil } -func (h *testHost) AccountBalance(ctx context.Context, rev *types.FileContractRevision) (types.Currency, types.Currency, error) { - return types.ZeroCurrency, types.ZeroCurrency, nil -} - func TestHost(t *testing.T) { // create test host h := newTestHost( From 577dd2dd072aeefab4ad3567cd903dbd3a278f7d Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 16 Aug 2024 16:24:56 +0200 Subject: [PATCH 06/13] worker: pessimistically withdraw from the account --- worker/host.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/host.go b/worker/host.go index 78cf03953..248f64827 100644 --- a/worker/host.go +++ b/worker/host.go @@ -110,13 +110,13 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 if err != nil { return err } + amount = amount.Add(cost) payment := rhpv3.PayByEphemeralAccount(h.acc.id, cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey) cost, refund, err := RPCReadSector(ctx, t, w, hpt, &payment, offset, length, root) if err != nil { return err } - amount = amount.Add(cost) amount = amount.Sub(refund) return nil }) From faa301ccd0f79813b4e285b6a488d01550b3eb6f Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 20 Aug 2024 15:33:12 +0200 Subject: [PATCH 07/13] account: schedule sync on deposit error --- internal/test/e2e/gouging_test.go | 2 +- worker/rhpv3.go | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/test/e2e/gouging_test.go b/internal/test/e2e/gouging_test.go index b2efe2efe..a40fe0024 100644 --- a/internal/test/e2e/gouging_test.go +++ b/internal/test/e2e/gouging_test.go @@ -187,7 +187,7 @@ func TestAccountFunding(t *testing.T) { time.Sleep(defaultHostSettings.PriceTableValidity) // fund the account again - tt.OK(w.RHPFund(context.Background(), c.ID, c.HostKey, c.HostIP, c.SiamuxAddr, types.Siacoins(1).Div64(2))) + tt.OK(w.RHPFund(context.Background(), c.ID, c.HostKey, c.HostIP, c.SiamuxAddr, types.Siacoins(1))) } func TestHostMinVersion(t *testing.T) { diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 48ffcc60b..f5a6a5c34 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -61,6 +61,10 @@ var ( // account balance was insufficient. errBalanceInsufficient = errors.New("ephemeral account balance was insufficient") + // errBalanceMaxExceeded occurs when a deposit would push the account's + // balance over the maximum allowed ephemeral account balance. + errBalanceMaxExceeded = errors.New("ephemeral account maximum balance exceeded") + // errInsufficientFunds is returned by various RPCs when the renter is // unable to provide sufficient payment to the host. errInsufficientFunds = errors.New("insufficient funds") @@ -99,6 +103,7 @@ func IsErrHost(err error) bool { } func isBalanceInsufficient(err error) bool { return utils.IsErr(err, errBalanceInsufficient) } +func isBalanceMaxExceeded(err error) bool { return utils.IsErr(err, errBalanceMaxExceeded) } func isClosedStream(err error) bool { return utils.IsErr(err, mux.ErrClosedStream) || utils.IsErr(err, net.ErrClosed) } @@ -463,6 +468,9 @@ func (a *account) WithDeposit(ctx context.Context, amtFn func() (types.Currency, return withAccountLock(ctx, a.as, a.id, a.host, false, func(_ api.Account) error { amt, err := amtFn() if err != nil { + if isBalanceMaxExceeded(err) { + err = errors.Join(err, a.as.ScheduleSync(ctx, a.id, a.host)) + } return err } return a.as.AddBalance(ctx, a.id, a.host, amt.Big()) From eca0e319eef4caf6d42ead5ddc4b106d493eeff4 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 20 Aug 2024 16:00:55 +0200 Subject: [PATCH 08/13] worker: avoid passing amount by ref. --- worker/host.go | 22 ++++++++++++---------- worker/pricetables.go | 20 ++++++++------------ worker/pricetables_test.go | 16 ++++++++-------- worker/rhpv3.go | 17 +++++++++-------- 4 files changed, 37 insertions(+), 38 deletions(-) diff --git a/worker/host.go b/worker/host.go index 248f64827..19df012ea 100644 --- a/worker/host.go +++ b/worker/host.go @@ -92,11 +92,12 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 var amount types.Currency return h.acc.WithWithdrawal(ctx, func() (types.Currency, error) { err := h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { - pt, err := h.priceTables.fetch(ctx, h.hk, nil, &amount) + pt, uptc, err := h.priceTables.fetch(ctx, h.hk, nil) if err != nil { return err } hpt := pt.HostPriceTable + amount = uptc gc, err := GougingCheckerFromContext(ctx, overpay) if err != nil { @@ -110,14 +111,13 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 if err != nil { return err } - amount = amount.Add(cost) payment := rhpv3.PayByEphemeralAccount(h.acc.id, cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey) cost, refund, err := RPCReadSector(ctx, t, w, hpt, &payment, offset, length, root) if err != nil { return err } - amount = amount.Sub(refund) + amount = amount.Add(cost).Sub(refund) return nil }) return amount, err @@ -128,7 +128,7 @@ func (h *host) UploadSector(ctx context.Context, sectorRoot types.Hash256, secto // fetch price table var pt rhpv3.HostPriceTable if err := h.acc.WithWithdrawal(ctx, func() (amount types.Currency, err error) { - pt, err = h.priceTable(ctx, nil, &amount) + pt, amount, err = h.priceTable(ctx, nil) return }); err != nil { return err @@ -168,12 +168,13 @@ func (h *host) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rh // try to get a valid pricetable. ptCtx, cancel := context.WithTimeout(ctx, 10*time.Second) var pt *rhpv3.HostPriceTable - if err := h.acc.WithWithdrawal(ptCtx, func() (amount types.Currency, _ error) { - hpt, err := h.priceTables.fetch(ptCtx, h.hk, nil, &amount) + if err := h.acc.WithWithdrawal(ptCtx, func() (amount types.Currency, err error) { + var hpt api.HostPriceTable + hpt, amount, err = h.priceTables.fetch(ptCtx, h.hk, nil) if err == nil { pt = &hpt.HostPriceTable } - return amount, err + return }); err != nil { h.logger.Infof("unable to fetch price table for renew: %v", err) } @@ -206,7 +207,8 @@ func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevis // pay by contract if rev != nil { hpt, err = RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { - payment, err := payByContract(rev, pt.UpdatePriceTableCost, rhpv3.Account(h.accountKey.PublicKey()), h.renterKey) + cost = pt.UpdatePriceTableCost + payment, err := payByContract(rev, cost, rhpv3.Account(h.accountKey.PublicKey()), h.renterKey) if err != nil { return nil, err } @@ -252,7 +254,7 @@ func (h *host) FundAccount(ctx context.Context, desired types.Currency, rev *typ return h.acc.WithDeposit(ctx, func() (types.Currency, error) { if err := h.transportPool.withTransportV3(ctx, h.hk, h.siamuxAddr, func(ctx context.Context, t *transportV3) error { // fetch pricetable directly to bypass the gouging check - pt, err := h.priceTables.fetch(ctx, h.hk, rev, nil) + pt, _, err := h.priceTables.fetch(ctx, h.hk, rev) if err != nil { return err } @@ -303,7 +305,7 @@ func (h *host) FundAccount(ctx context.Context, desired types.Currency, rev *typ func (h *host) SyncAccount(ctx context.Context, rev *types.FileContractRevision) error { // fetch pricetable directly to bypass the gouging check - pt, err := h.priceTables.fetch(ctx, h.hk, rev, nil) + pt, _, err := h.priceTables.fetch(ctx, h.hk, rev) if err != nil { return err } diff --git a/worker/pricetables.go b/worker/pricetables.go index 3d6521f7a..4b98b1867 100644 --- a/worker/pricetables.go +++ b/worker/pricetables.go @@ -75,7 +75,7 @@ func newPriceTables(hm HostManager, hs HostStore) *priceTables { } // fetch returns a price table for the given host -func (pts *priceTables) fetch(ctx context.Context, hk types.PublicKey, rev *types.FileContractRevision, amount *types.Currency) (api.HostPriceTable, error) { +func (pts *priceTables) fetch(ctx context.Context, hk types.PublicKey, rev *types.FileContractRevision) (api.HostPriceTable, types.Currency, error) { pts.mu.Lock() pt, exists := pts.priceTables[hk] if !exists { @@ -88,7 +88,7 @@ func (pts *priceTables) fetch(ctx context.Context, hk types.PublicKey, rev *type } pts.mu.Unlock() - return pt.fetch(ctx, rev, amount) + return pt.fetch(ctx, rev) } func (pt *priceTable) ongoingUpdate() (bool, *priceTableUpdate) { @@ -105,7 +105,7 @@ func (pt *priceTable) ongoingUpdate() (bool, *priceTableUpdate) { return ongoing, pt.update } -func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision, amount *types.Currency) (hpt api.HostPriceTable, err error) { +func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision) (hpt api.HostPriceTable, cost types.Currency, err error) { // grab the current price table p.mu.Lock() hpt = p.hpt @@ -115,7 +115,7 @@ func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision, // current price table is considered to gouge on the block height gc, err := GougingCheckerFromContext(ctx, false) if err != nil { - return api.HostPriceTable{}, err + return api.HostPriceTable{}, types.ZeroCurrency, err } // figure out whether we should update the price table, if not we can return @@ -137,10 +137,10 @@ func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision, } else if ongoing { select { case <-ctx.Done(): - return api.HostPriceTable{}, fmt.Errorf("%w; %w", errPriceTableUpdateTimedOut, context.Cause(ctx)) + return api.HostPriceTable{}, types.ZeroCurrency, fmt.Errorf("%w; %w", errPriceTableUpdateTimedOut, context.Cause(ctx)) case <-update.done: } - return update.hpt, update.err + return update.hpt, types.ZeroCurrency, update.err } // this thread is updating the price table @@ -166,16 +166,12 @@ func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision, // sanity check the host has been scanned before fetching the price table if !host.Scanned { - return api.HostPriceTable{}, fmt.Errorf("host %v was not scanned", p.hk) + return api.HostPriceTable{}, types.ZeroCurrency, fmt.Errorf("host %v was not scanned", p.hk) } // otherwise fetch it - var cost types.Currency h := p.hm.Host(p.hk, types.FileContractID{}, host.Settings.SiamuxAddr()) hpt, cost, err = h.FetchPriceTable(ctx, rev) - if amount != nil { - *amount = amount.Add(cost) - } // record it in the background if shouldRecordPriceTable(err) { @@ -193,7 +189,7 @@ func (p *priceTable) fetch(ctx context.Context, rev *types.FileContractRevision, // handle error after recording if err != nil { - return api.HostPriceTable{}, fmt.Errorf("failed to update pricetable, err %v", err) + return api.HostPriceTable{}, types.ZeroCurrency, fmt.Errorf("failed to update pricetable, err %v", err) } return } diff --git a/worker/pricetables_test.go b/worker/pricetables_test.go index 58dc4e8fc..ed918fa1a 100644 --- a/worker/pricetables_test.go +++ b/worker/pricetables_test.go @@ -50,7 +50,7 @@ func TestPriceTables(t *testing.T) { })) // trigger a fetch to make it block - go pts.fetch(gCtx, h.hk, nil, nil) + go pts.fetch(gCtx, h.hk, nil) time.Sleep(50 * time.Millisecond) // fetch it again but with a canceled context to avoid blocking @@ -58,14 +58,14 @@ func TestPriceTables(t *testing.T) { // update ctx, cancel := context.WithCancel(gCtx) cancel() - _, err := pts.fetch(ctx, h.hk, nil, nil) + _, _, err := pts.fetch(ctx, h.hk, nil) if !errors.Is(err, errPriceTableUpdateTimedOut) { t.Fatal("expected errPriceTableUpdateTimedOut, got", err) } - // unblock and assert we receive a valid price table + // unblock and assert we paid for the price table close(fetchPTBlockChan) - update, err := pts.fetch(gCtx, h.hk, nil, nil) + update, _, err := pts.fetch(gCtx, h.hk, nil) if err != nil { t.Fatal(err) } else if update.UID != validPT.UID { @@ -75,7 +75,7 @@ func TestPriceTables(t *testing.T) { // refresh the price table on the host, update again, assert we receive the // same price table as it hasn't expired yet h.hi.PriceTable = newTestHostPriceTable() - update, err = pts.fetch(gCtx, h.hk, nil, nil) + update, _, err = pts.fetch(gCtx, h.hk, nil) if err != nil { t.Fatal(err) } else if update.UID != validPT.UID { @@ -86,7 +86,7 @@ func TestPriceTables(t *testing.T) { pts.priceTables[h.hk].hpt.Expiry = time.Now() // fetch it again and assert we updated the price table - update, err = pts.fetch(gCtx, h.hk, nil, nil) + update, _, err = pts.fetch(gCtx, h.hk, nil) if err != nil { t.Fatal(err) } else if update.UID != h.hi.PriceTable.UID { @@ -97,7 +97,7 @@ func TestPriceTables(t *testing.T) { // the price table since it's not expired validPT = h.hi.PriceTable h.hi.PriceTable = newTestHostPriceTable() - update, err = pts.fetch(gCtx, h.hk, nil, nil) + update, _, err = pts.fetch(gCtx, h.hk, nil) if err != nil { t.Fatal(err) } else if update.UID != validPT.UID { @@ -110,7 +110,7 @@ func TestPriceTables(t *testing.T) { cm.cs.BlockHeight = validPT.HostBlockHeight + uint64(blockHeightLeeway) - priceTableBlockHeightLeeway // fetch it again and assert we updated the price table - update, err = pts.fetch(gCtx, h.hk, nil, nil) + update, _, err = pts.fetch(gCtx, h.hk, nil) if err != nil { t.Fatal(err) } else if update.UID != h.hi.PriceTable.UID { diff --git a/worker/rhpv3.go b/worker/rhpv3.go index f5a6a5c34..4da2c7ae0 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -340,7 +340,8 @@ func (h *host) fetchRevisionWithAccount(ctx context.Context, hostKey types.Publi return rev, h.acc.WithWithdrawal(ctx, func() (types.Currency, error) { if err := h.transportPool.withTransportV3(ctx, hostKey, siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { rev, err = RPCLatestRevision(ctx, t, fcid, func(rev *types.FileContractRevision) (rhpv3.HostPriceTable, rhpv3.PaymentMethod, error) { - pt, err := h.priceTable(ctx, nil, &amount) + var pt rhpv3.HostPriceTable + pt, amount, err = h.priceTable(ctx, nil) if err != nil { return rhpv3.HostPriceTable{}, nil, fmt.Errorf("failed to fetch pricetable, err: %w", err) } @@ -361,7 +362,7 @@ func (h *host) fetchRevisionWithAccount(ctx context.Context, hostKey types.Publi func (h *host) fetchRevisionWithContract(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, contractID types.FileContractID) (rev types.FileContractRevision, err error) { err = h.transportPool.withTransportV3(ctx, hostKey, siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { rev, err = RPCLatestRevision(ctx, t, contractID, func(rev *types.FileContractRevision) (rhpv3.HostPriceTable, rhpv3.PaymentMethod, error) { - pt, err := h.priceTable(ctx, rev, nil) + pt, _, err := h.priceTable(ctx, rev) if err != nil { return rhpv3.HostPriceTable{}, nil, fmt.Errorf("failed to fetch pricetable, err: %v", err) } @@ -552,19 +553,19 @@ func (a *accounts) deriveAccountKey(hostKey types.PublicKey) types.PrivateKey { // will be used to pay for the price table. If not it will be paid for using an // ephemeral account, in which case the given amount parameter will get updated. // The returned price table is guaranteed to be safe to use. -func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision, amount *types.Currency) (rhpv3.HostPriceTable, error) { - pt, err := h.priceTables.fetch(ctx, h.hk, rev, amount) +func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) (rhpv3.HostPriceTable, types.Currency, error) { + pt, cost, err := h.priceTables.fetch(ctx, h.hk, rev) if err != nil { - return rhpv3.HostPriceTable{}, err + return rhpv3.HostPriceTable{}, types.ZeroCurrency, err } gc, err := GougingCheckerFromContext(ctx, false) if err != nil { - return rhpv3.HostPriceTable{}, err + return rhpv3.HostPriceTable{}, cost, err } if breakdown := gc.Check(nil, &pt.HostPriceTable); breakdown.Gouging() { - return rhpv3.HostPriceTable{}, fmt.Errorf("%w: %v", gouging.ErrPriceTableGouging, breakdown) + return rhpv3.HostPriceTable{}, cost, fmt.Errorf("%w: %v", gouging.ErrPriceTableGouging, breakdown) } - return pt.HostPriceTable, nil + return pt.HostPriceTable, cost, nil } // padBandwitdh pads the bandwidth to the next multiple of 1460 bytes. 1460 From b3def3767c7093e3d46cf81df0aeef7279116607 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 20 Aug 2024 16:03:06 +0200 Subject: [PATCH 09/13] worker: update docstring --- worker/rhpv3.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 4da2c7ae0..824e4486b 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -550,9 +550,10 @@ func (a *accounts) deriveAccountKey(hostKey types.PublicKey) types.PrivateKey { } // priceTable fetches a price table from the host. If a revision is provided, it -// will be used to pay for the price table. If not it will be paid for using an -// ephemeral account, in which case the given amount parameter will get updated. -// The returned price table is guaranteed to be safe to use. +// will be used to pay for the price table. The currency being returned is the +// cost of fetching the price table, depending on whether a revision was +// provided it was spent from the contract or from the ephemeral account. The +// returned price table is guaranteed to be safe to use. func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) (rhpv3.HostPriceTable, types.Currency, error) { pt, cost, err := h.priceTables.fetch(ctx, h.hk, rev) if err != nil { From f084975b407c1a314da42dbe6ef8d6ba4a5fb196 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 20 Aug 2024 16:41:40 +0200 Subject: [PATCH 10/13] worker: cleanup PR --- worker/rhpv3.go | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 824e4486b..15136b46f 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -338,7 +338,7 @@ func (h *host) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (t func (h *host) fetchRevisionWithAccount(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, fcid types.FileContractID) (rev types.FileContractRevision, _ error) { var amount types.Currency return rev, h.acc.WithWithdrawal(ctx, func() (types.Currency, error) { - if err := h.transportPool.withTransportV3(ctx, hostKey, siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { + return amount, h.transportPool.withTransportV3(ctx, hostKey, siamuxAddr, func(ctx context.Context, t *transportV3) (err error) { rev, err = RPCLatestRevision(ctx, t, fcid, func(rev *types.FileContractRevision) (rhpv3.HostPriceTable, rhpv3.PaymentMethod, error) { var pt rhpv3.HostPriceTable pt, amount, err = h.priceTable(ctx, nil) @@ -350,10 +350,7 @@ func (h *host) fetchRevisionWithAccount(ctx context.Context, hostKey types.Publi return pt, &payment, nil }) return - }); err != nil { - return types.ZeroCurrency, err - } - return amount, nil + }) }) } @@ -392,19 +389,17 @@ type ( // accounts stores the balance and other metrics of accounts that the // worker maintains with a host. accounts struct { - as AccountStore - key types.PrivateKey - logger *zap.SugaredLogger + as AccountStore + key types.PrivateKey } // account contains information regarding a specific account of the // worker. account struct { - as AccountStore - id rhpv3.Account - key types.PrivateKey - host types.PublicKey - logger *zap.SugaredLogger + as AccountStore + id rhpv3.Account + key types.PrivateKey + host types.PublicKey } ) @@ -413,9 +408,8 @@ func (w *Worker) initAccounts(as AccountStore) { panic("accounts already initialized") // developer error } w.accounts = &accounts{ - as: as, - key: w.deriveSubKey("accountkey"), - logger: w.logger.Named("accounts"), + as: as, + key: w.deriveSubKey("accountkey"), } } @@ -431,11 +425,10 @@ func (w *Worker) initTransportPool() { func (a *accounts) ForHost(hk types.PublicKey) *account { accountID := rhpv3.Account(a.deriveAccountKey(hk).PublicKey()) return &account{ - as: a.as, - id: accountID, - key: a.key, - host: hk, - logger: a.logger.With("account", accountID).With("host", hk), + as: a.as, + id: accountID, + key: a.key, + host: hk, } } From f4c4dbdd9172b7f00f869df1457fac906c0f562d Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 21 Aug 2024 11:19:00 +0200 Subject: [PATCH 11/13] testing: remove assertion --- internal/test/e2e/cluster_test.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 7260808d6..b5de68d8b 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -1145,8 +1145,7 @@ func TestEphemeralAccounts(t *testing.T) { tt.OK(err) // scan the host - res, err := w.RHPScan(context.Background(), h.PublicKey, h.NetAddress, 10*time.Second) - tt.OK(err) + tt.OKAll(w.RHPScan(context.Background(), h.PublicKey, h.NetAddress, 10*time.Second)) // manually form a contract with the host cs, _ := b.ConsensusState(context.Background()) @@ -1221,15 +1220,6 @@ func TestEphemeralAccounts(t *testing.T) { t.Fatalf("account shoult not have drift %v", acc.Drift) } - // fetch a price table on the fly by triggering a renew - w.RHPRenew(context.Background(), c.ID, c.WindowStart, c.HostKey, c.SiamuxAddr, res.Settings.Address, wallet.Address, types.Siacoins(10), types.Siacoins(1), types.Siacoins(15), 1<<40, h.Settings.WindowSize) - - // assert we spent exactly 1H, the cost to fetch the price table - accounts, _ = cluster.Bus.Accounts(context.Background()) - if accounts[0].Balance.Cmp(types.Siacoins(1).Sub(types.NewCurrency64(1)).Big()) != 0 { - t.Fatalf("wrong balance %v", accounts[0].Balance) - } - // update the balance to create some drift newBalance := fundAmt.Div64(2) newDrift := new(big.Int).Sub(newBalance.Big(), fundAmt.Big()) From ecfb8a9522209a7f20d83607a3777ba2eecbb552 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 21 Aug 2024 11:24:32 +0200 Subject: [PATCH 12/13] rhp: pessimistic readsector cost --- internal/rhp/v3/rhp.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/rhp/v3/rhp.go b/internal/rhp/v3/rhp.go index 77fe38272..5ae5d9972 100644 --- a/internal/rhp/v3/rhp.go +++ b/internal/rhp/v3/rhp.go @@ -205,6 +205,7 @@ func (c *Client) ReadSector(ctx context.Context, offset, length uint32, root typ return err } + amount = cost // pessimistic cost estimate in case rpc fails payment := rhpv3.PayByEphemeralAccount(accID, cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, accKey) cost, refund, err := rpcReadSector(ctx, t, w, pt, &payment, offset, length, root) if err != nil { From 4668f5668422671cb4195ae71cd92d354fb3113f Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 21 Aug 2024 11:27:52 +0200 Subject: [PATCH 13/13] worker: schedule sync on max bal exceeded --- worker/host.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/worker/host.go b/worker/host.go index 471a5b863..9c65bd0c2 100644 --- a/worker/host.go +++ b/worker/host.go @@ -2,6 +2,7 @@ package worker import ( "context" + "errors" "fmt" "io" "time" @@ -244,6 +245,9 @@ func (h *host) FundAccount(ctx context.Context, desired types.Currency, rev *typ // fund the account if err := h.client.FundAccount(ctx, rev, h.hk, h.siamuxAddr, deposit, h.acc.id, pt.HostPriceTable, h.renterKey); err != nil { + if rhp3.IsBalanceMaxExceeded(err) { + err = errors.Join(err, h.acc.as.ScheduleSync(ctx, h.acc.id, h.hk)) + } return types.ZeroCurrency, fmt.Errorf("failed to fund account with %v; %w", deposit, err) }