Skip to content

Commit

Permalink
Improve ephemeral account drift (#1403)
Browse files Browse the repository at this point in the history
This PR proposes a series of small changes in an attempt to fix (or
improve) the drift we see in production. I have not been able to
reproduce (meaningful) negative drift in a test locally and found
debugging it on production quite challenging because you want to caveman
debug both sides ideally. The following changes definitely improve
things, but we'll need to keep following it up.

- [x] plug leaks where we weren't withdrawing money from the EA even
though we paid for things like PT updates
- [x] pessimistically withdraw money from the account in
`DownloadSector` (host only refunds storage revenue on failure)
- [x] sync newly created accounts (avoids funding already funded
accounts)
- [x] reset drift in production (fresh start allows for better follow
up)


I considered syncing the account balance when we fund an account. I went
back and fourth on that several times but eventually removed it because
it more or less makes `requiresSync` obsolete. The benefit though would
be that you are certain the account has 1SC in it after you fund it,
while at the same time keeping track of drift. So drift would still
build up, but at the very least you have an account with 1SC and you
never run into unexpected insufficient balance errors...
  • Loading branch information
ChrisSchinnerl authored Aug 22, 2024
2 parents b377f76 + 4668f56 commit 0751034
Show file tree
Hide file tree
Showing 13 changed files with 208 additions and 143 deletions.
56 changes: 37 additions & 19 deletions internal/bus/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,18 @@ func (a *AccountMgr) ResetDrift(id rhpv3.Account) error {
return ErrAccountNotFound
}
a.mu.Unlock()

account.mu.Lock()
driftBefore := account.Drift.String()
account.mu.Unlock()

account.resetDrift()

a.logger.Infow("account drift was reset",
zap.Stringer("account", account.ID),
zap.Stringer("host", account.HostKey),
zap.String("driftBefore", driftBefore))

return nil
}

Expand All @@ -193,29 +204,37 @@ func (a *AccountMgr) ResetDrift(id rhpv3.Account) error {
func (a *AccountMgr) SetBalance(id rhpv3.Account, hk types.PublicKey, balance *big.Int) {
acc := a.account(id, hk)

// Update balance and drift.
acc.mu.Lock()
delta := new(big.Int).Sub(balance, acc.Balance)
balanceBefore := acc.Balance.String()
driftBefore := acc.Drift.String()
defer acc.mu.Unlock()

// save previous values
prevBalance := new(big.Int).Set(acc.Balance)
prevDrift := new(big.Int).Set(acc.Drift)

// update balance
acc.Balance.Set(balance)

// update drift
drift := new(big.Int).Sub(balance, prevBalance)
if acc.CleanShutdown {
acc.Drift = acc.Drift.Add(acc.Drift, delta)
acc.Drift = acc.Drift.Add(acc.Drift, drift)
}
acc.Balance.Set(balance)

// reset fields
acc.CleanShutdown = true
acc.RequiresSync = false // resetting the balance resets the sync field
balanceAfter := acc.Balance.String()
acc.mu.Unlock()
acc.RequiresSync = false

// Log resets.
// log account changes
a.logger.Infow("account balance was reset",
"account", acc.ID,
"host", acc.HostKey.String(),
"balanceBefore", balanceBefore,
"balanceAfter", balanceAfter,
"driftBefore", driftBefore,
"driftAfter", acc.Drift.String(),
"delta", delta.String())
zap.Stringer("account", acc.ID),
zap.Stringer("host", acc.HostKey),
zap.Stringer("balanceBefore", prevBalance),
zap.Stringer("balanceAfter", balance),
zap.Stringer("driftBefore", prevDrift),
zap.Stringer("driftAfter", acc.Drift),
zap.Bool("firstDrift", acc.Drift.Cmp(big.NewInt(0)) != 0 && prevDrift.Cmp(big.NewInt(0)) == 0),
zap.Bool("cleanshutdown", acc.CleanShutdown),
zap.Stringer("drift", drift))
}

// ScheduleSync sets the requiresSync flag of an account.
Expand Down Expand Up @@ -296,7 +315,6 @@ func (a *AccountMgr) account(id rhpv3.Account, hk types.PublicKey) *account {
a.mu.Lock()
defer a.mu.Unlock()

// Create account if it doesn't exist.
acc, exists := a.byID[id]
if !exists {
acc = &account{
Expand All @@ -306,7 +324,7 @@ func (a *AccountMgr) account(id rhpv3.Account, hk types.PublicKey) *account {
HostKey: hk,
Balance: big.NewInt(0),
Drift: big.NewInt(0),
RequiresSync: false,
RequiresSync: true, // initial sync
},
locks: map[uint64]*accountLock{},
}
Expand Down
7 changes: 3 additions & 4 deletions internal/rhp/v3/rhp.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,15 @@ func (c *Client) Renew(ctx context.Context, rrr api.RHPRenewRequest, gougingChec
return
}

func (c *Client) SyncAccount(ctx context.Context, rev *types.FileContractRevision, hk types.PublicKey, siamuxAddr string, accID rhpv3.Account, pt rhpv3.SettingsID, rk types.PrivateKey) (types.Currency, error) {
var balance types.Currency
err := c.tpool.withTransport(ctx, hk, siamuxAddr, func(ctx context.Context, t *transportV3) error {
func (c *Client) SyncAccount(ctx context.Context, rev *types.FileContractRevision, hk types.PublicKey, siamuxAddr string, accID rhpv3.Account, pt rhpv3.SettingsID, rk types.PrivateKey) (balance types.Currency, _ error) {
return balance, c.tpool.withTransport(ctx, hk, siamuxAddr, func(ctx context.Context, t *transportV3) error {
payment, err := payByContract(rev, types.NewCurrency64(1), accID, rk)
if err != nil {
return err
}
balance, err = rpcAccountBalance(ctx, t, &payment, accID, pt)
return err
})
return balance, err
}

func (c *Client) PriceTable(ctx context.Context, hk types.PublicKey, siamuxAddr string, paymentFn PriceTablePaymentFunc) (pt api.HostPriceTable, err error) {
Expand Down Expand Up @@ -207,6 +205,7 @@ func (c *Client) ReadSector(ctx context.Context, offset, length uint32, root typ
return err
}

amount = cost // pessimistic cost estimate in case rpc fails
payment := rhpv3.PayByEphemeralAccount(accID, cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, accKey)
cost, refund, err := rpcReadSector(ctx, t, w, pt, &payment, offset, length, root)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions internal/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ var (
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00014_hosts_resolvedaddresses", log)
},
},
{
ID: "00015_reset_drift",
Migrate: func(tx Tx) error {
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00015_reset_drift", log)
},
},
}
}
MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
Expand Down
104 changes: 65 additions & 39 deletions internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,54 +1131,64 @@ func TestEphemeralAccounts(t *testing.T) {
t.SkipNow()
}

// Create cluster
cluster := newTestCluster(t, testClusterOptions{hosts: 1})
// run without autopilot
opts := clusterOptsDefault
opts.skipRunningAutopilot = true

// create cluster
cluster := newTestCluster(t, opts)
defer cluster.Shutdown()

// convenience variables
b := cluster.Bus
w := cluster.Worker
tt := cluster.tt

// Shut down the autopilot to prevent it from interfering.
cluster.ShutdownAutopilot(context.Background())
tt.OK(b.UpdateSetting(context.Background(), api.SettingRedundancy, api.RedundancySettings{
MinShards: 1,
TotalShards: 1,
}))
// add a host
hosts := cluster.AddHosts(1)
h, err := b.Host(context.Background(), hosts[0].PublicKey())
tt.OK(err)

// Wait for contract and accounts.
contract := cluster.WaitForContracts()[0]
accounts := cluster.WaitForAccounts()
// scan the host
tt.OKAll(w.RHPScan(context.Background(), h.PublicKey, h.NetAddress, 10*time.Second))

// Shut down the autopilot to prevent it from interfering with the test.
cluster.ShutdownAutopilot(context.Background())
// manually form a contract with the host
cs, _ := b.ConsensusState(context.Background())
wallet, _ := b.Wallet(context.Background())
rev, _, err := w.RHPForm(context.Background(), cs.BlockHeight+test.AutopilotConfig.Contracts.Period+test.AutopilotConfig.Contracts.RenewWindow, h.PublicKey, h.NetAddress, wallet.Address, types.Siacoins(10), types.Siacoins(1))
tt.OK(err)
c, err := b.AddContract(context.Background(), rev, rev.Revision.MissedHostPayout().Sub(types.Siacoins(1)), types.Siacoins(1), cs.BlockHeight, api.ContractStatePending)
tt.OK(err)

// Newly created accounts are !cleanShutdown. Simulate a sync to change
// that.
for _, acc := range accounts {
if acc.CleanShutdown {
t.Fatal("new account should indicate an unclean shutdown")
} else if acc.RequiresSync {
t.Fatal("new account should not require a sync")
}
if err := cluster.Bus.SetBalance(context.Background(), acc.ID, acc.HostKey, types.Siacoins(1).Big()); err != nil {
t.Fatal(err)
}
}
tt.OK(b.SetContractSet(context.Background(), test.ContractSet, []types.FileContractID{c.ID}))

// fund the account
fundAmt := types.Siacoins(1)
tt.OK(w.RHPFund(context.Background(), c.ID, c.HostKey, c.HostIP, c.SiamuxAddr, fundAmt))

// Fetch accounts again.
// fetch accounts
accounts, err := cluster.Bus.Accounts(context.Background())
tt.OK(err)

// assert account state
acc := accounts[0]
if acc.Balance.Cmp(types.Siacoins(1).Big()) < 0 {
t.Fatalf("wrong balance %v", acc.Balance)
}
if acc.ID == (rhpv3.Account{}) {
t.Fatal("account id not set")
}
host := cluster.hosts[0]
if acc.HostKey != types.PublicKey(host.PublicKey()) {
} else if acc.CleanShutdown {
t.Fatal("account should indicate an unclean shutdown")
} else if !acc.RequiresSync {
t.Fatal("account should require a sync")
} else if acc.HostKey != h.PublicKey {
t.Fatal("wrong host")
}
if !acc.CleanShutdown {
t.Fatal("account should indicate a clean shutdown")
} else if acc.Balance.Cmp(types.Siacoins(1).Big()) != 0 {
t.Fatalf("wrong balance %v", acc.Balance)
}

// Fetch account from bus directly.
// fetch account from bus directly
busAccounts, err := cluster.Bus.Accounts(context.Background())
tt.OK(err)
if len(busAccounts) != 1 {
Expand All @@ -1189,12 +1199,11 @@ func TestEphemeralAccounts(t *testing.T) {
t.Fatal("bus account doesn't match worker account")
}

// Check that the spending was recorded for the contract. The recorded
// check that the spending was recorded for the contract. The recorded
// spending should be > the fundAmt since it consists of the fundAmt plus
// fee.
fundAmt := types.Siacoins(1)
tt.Retry(10, testBusFlushInterval, func() error {
cm, err := cluster.Bus.Contract(context.Background(), contract.ID)
cm, err := cluster.Bus.Contract(context.Background(), c.ID)
tt.OK(err)

if cm.Spending.FundAccount.Cmp(fundAmt) <= 0 {
Expand All @@ -1203,7 +1212,24 @@ func TestEphemeralAccounts(t *testing.T) {
return nil
})

// Update the balance to create some drift.
// sync the account
tt.OK(w.RHPSync(context.Background(), c.ID, acc.HostKey, c.HostIP, c.SiamuxAddr))

// assert account state
accounts, err = cluster.Bus.Accounts(context.Background())
tt.OK(err)

// assert account state
acc = accounts[0]
if !acc.CleanShutdown {
t.Fatal("account should indicate a clean shutdown")
} else if acc.RequiresSync {
t.Fatal("account should not require a sync")
} else if acc.Drift.Cmp(new(big.Int)) != 0 {
t.Fatalf("account shoult not have drift %v", acc.Drift)
}

// update the balance to create some drift
newBalance := fundAmt.Div64(2)
newDrift := new(big.Int).Sub(newBalance.Big(), fundAmt.Big())
if err := cluster.Bus.SetBalance(context.Background(), busAcc.ID, acc.HostKey, newBalance.Big()); err != nil {
Expand All @@ -1217,11 +1243,11 @@ func TestEphemeralAccounts(t *testing.T) {
t.Fatalf("drift was %v but should be %v", busAcc.Drift, maxNewDrift)
}

// Reboot cluster.
// reboot cluster
cluster2 := cluster.Reboot(t)
defer cluster2.Shutdown()

// Check that accounts were loaded from the bus.
// check that accounts were loaded from the bus
accounts2, err := cluster2.Bus.Accounts(context.Background())
tt.OK(err)
for _, acc := range accounts2 {
Expand All @@ -1234,7 +1260,7 @@ func TestEphemeralAccounts(t *testing.T) {
}
}

// Reset drift again.
// reset drift again
if err := cluster2.Bus.ResetDrift(context.Background(), acc.ID); err != nil {
t.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
ALTER TABLE hosts DROP COLUMN subnets;
ALTER TABLE hosts ADD resolved_addresses varchar(255) NOT NULL DEFAULT '';

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UPDATE ephemeral_accounts SET drift = "0", clean_shutdown = 0, requires_sync = 1;
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
ALTER TABLE hosts DROP COLUMN subnets;
ALTER TABLE hosts ADD resolved_addresses TEXT NOT NULL DEFAULT '';

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UPDATE ephemeral_accounts SET drift = "0", clean_shutdown = 0, requires_sync = 1;
Loading

0 comments on commit 0751034

Please sign in to comment.