Skip to content

Commit

Permalink
testing: fix deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Feb 28, 2024
1 parent 014ffc9 commit 5cd6577
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 45 deletions.
11 changes: 9 additions & 2 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ func (ap *Autopilot) Run() error {
var forceScan bool
var launchAccountRefillsOnce sync.Once
for {
// check for shutdown right before starting a new iteration
select {
case <-ap.shutdownCtx.Done():
return nil
default:
}

ap.logger.Info("autopilot iteration starting")
tickerFired := make(chan struct{})
ap.workers.withWorker(func(w Worker) {
Expand All @@ -219,7 +226,7 @@ func (ap *Autopilot) Run() error {
close(tickerFired)
return
}
ap.logger.Error("autopilot stopped before consensus was synced")
ap.logger.Info("autopilot stopped before consensus was synced")
return
} else if blocked {
if scanning, _ := ap.s.Status(); !scanning {
Expand All @@ -233,7 +240,7 @@ func (ap *Autopilot) Run() error {
close(tickerFired)
return
}
ap.logger.Error("autopilot stopped before it was able to confirm it was configured in the bus")
ap.logger.Info("autopilot stopped before it was able to confirm it was configured in the bus")
return
}

Expand Down
2 changes: 1 addition & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger
cm := chain.NewManager(store, state)

// create wallet
w, err := NewSingleAddressWallet(seed, cm, sqlStore, sqlStore, logger.Named("wallet").Sugar(), cwallet.WithReservationDuration(cfg.UsedUTXOExpiry))
w, err := NewSingleAddressWallet(seed, cm.TipState().BlockInterval(), cm, sqlStore, sqlStore, logger.Named("wallet").Sugar(), cwallet.WithReservationDuration(cfg.UsedUTXOExpiry))
if err != nil {
return nil, nil, nil, err
}
Expand Down
57 changes: 25 additions & 32 deletions internal/node/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,65 +11,58 @@ import (
"go.uber.org/zap"
)

// TODO: feels quite hacky

type metricRecorder interface {
RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error
}

type singleAddressWallet struct {
*wallet.SingleAddressWallet

cm *chain.Manager
mr metricRecorder
logger *zap.SugaredLogger
blockInterval time.Duration
cm *chain.Manager
mr metricRecorder
logger *zap.SugaredLogger
}

func NewSingleAddressWallet(seed types.PrivateKey, cm *chain.Manager, store wallet.SingleAddressStore, mr metricRecorder, l *zap.SugaredLogger, opts ...wallet.Option) (*singleAddressWallet, error) {
func NewSingleAddressWallet(seed types.PrivateKey, blockInterval time.Duration, cm *chain.Manager, store wallet.SingleAddressStore, mr metricRecorder, l *zap.SugaredLogger, opts ...wallet.Option) (*singleAddressWallet, error) {
w, err := wallet.NewSingleAddressWallet(seed, cm, store, opts...)
if err != nil {
return nil, err
}

return &singleAddressWallet{w, cm, mr, l}, nil
return &singleAddressWallet{w, blockInterval, cm, mr, l}, nil
}

func (w *singleAddressWallet) ProcessChainApplyUpdate(cau *chain.ApplyUpdate, mayCommit bool) error {
// escape early if we're not synced
if !w.isSynced() {
if time.Since(cau.Block.Timestamp) >= 2*w.blockInterval {
return nil
}

// fetch balance
balance, err := w.Balance()
if err != nil {
w.logger.Errorf("failed to fetch wallet balance, err: %v", err)
return nil
}
// record metric in a goroutine
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
w.recordMetric(ctx)
cancel()
}()

// apply sane timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
return nil
}

// record wallet metric
err = w.mr.RecordWalletMetric(ctx, api.WalletMetric{
func (w *singleAddressWallet) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error { return nil }

func (w *singleAddressWallet) recordMetric(ctx context.Context) {
if balance, err := w.Balance(); err != nil {
w.logger.Errorf("failed to fetch wallet balance, err: %v", err)
return
} else if err := w.mr.RecordWalletMetric(ctx, api.WalletMetric{
Timestamp: api.TimeNow(),
Confirmed: balance.Confirmed,
Unconfirmed: balance.Unconfirmed,
Spendable: balance.Spendable,
})
if err != nil {
}); err != nil {
w.logger.Errorf("failed to record wallet metric, err: %v", err)
return nil
}

return nil
}

func (w *singleAddressWallet) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error { return nil }

func (w *singleAddressWallet) isSynced() bool {
var synced bool
if block, ok := w.cm.Block(w.cm.Tip().ID); ok && time.Since(block.Timestamp) < 2*w.cm.TipState().BlockInterval() {
synced = true
}
return synced
}
25 changes: 15 additions & 10 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2239,46 +2239,51 @@ func TestWalletSendUnconfirmed(t *testing.T) {
}

func TestWalletFormUnconfirmed(t *testing.T) {
// New cluster with autopilot disabled
// create cluster without autopilot
cfg := clusterOptsDefault
cfg.skipSettingAutopilot = true
cluster := newTestCluster(t, cfg)
defer cluster.Shutdown()

// convenience variables
b := cluster.Bus
tt := cluster.tt

// Add a host.
// add a host (non-blocking)
cluster.AddHosts(1)

// Send the full balance back to the wallet to make sure it's all
// unconfirmed.
// send all money to ourselves, making sure it's unconfirmed
feeReserve := types.Siacoins(1).Div64(100)
wr, err := b.Wallet(context.Background())
tt.OK(err)
tt.OK(b.SendSiacoins(context.Background(), []types.SiacoinOutput{
{
Address: wr.Address,
Value: wr.Confirmed.Sub(types.Siacoins(1).Div64(100)), // leave some for the fee
Value: wr.Confirmed.Sub(feeReserve), // leave some for the fee
},
}, false))

// There should be hardly any money in the wallet.
// check wallet only has the reserve in the confirmed balance
wr, err = b.Wallet(context.Background())
tt.OK(err)
if wr.Confirmed.Sub(wr.Unconfirmed).Cmp(types.Siacoins(1).Div64(100)) > 0 {
if wr.Confirmed.Sub(wr.Unconfirmed).Cmp(feeReserve) > 0 {
t.Fatal("wallet should have hardly any confirmed balance")
}
t.Logf("%+v", wr)
t.Log("Confirmed", wr.Confirmed)
t.Log("Unconfirmed", wr.Unconfirmed)

// There shouldn't be any contracts at this point.
// there shouldn't be any contracts yet
contracts, err := b.Contracts(context.Background(), api.ContractsOpts{})
tt.OK(err)
if len(contracts) != 0 {
t.Fatal("expected 0 contracts", len(contracts))
}

// Enable autopilot by setting it.
// enable the autopilot by configuring it
cluster.UpdateAutopilotConfig(context.Background(), testAutopilotConfig)

// Wait for a contract to form.
// wait for a contract to form
contractsFormed := cluster.WaitForContracts()
if len(contractsFormed) != 1 {
t.Fatal("expected 1 contract", len(contracts))
Expand Down

0 comments on commit 5cd6577

Please sign in to comment.