From 5cd6577fb8a98ec99eab74ebd5d34c212b7d6d9a Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 28 Feb 2024 16:26:41 +0100 Subject: [PATCH] testing: fix deadlock --- autopilot/autopilot.go | 11 ++++-- internal/node/node.go | 2 +- internal/node/wallet.go | 57 ++++++++++++++------------------ internal/testing/cluster_test.go | 25 ++++++++------ 4 files changed, 50 insertions(+), 45 deletions(-) diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 7822dc356..1aa9ecade 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -201,6 +201,13 @@ func (ap *Autopilot) Run() error { var forceScan bool var launchAccountRefillsOnce sync.Once for { + // check for shutdown right before starting a new iteration + select { + case <-ap.shutdownCtx.Done(): + return nil + default: + } + ap.logger.Info("autopilot iteration starting") tickerFired := make(chan struct{}) ap.workers.withWorker(func(w Worker) { @@ -219,7 +226,7 @@ func (ap *Autopilot) Run() error { close(tickerFired) return } - ap.logger.Error("autopilot stopped before consensus was synced") + ap.logger.Info("autopilot stopped before consensus was synced") return } else if blocked { if scanning, _ := ap.s.Status(); !scanning { @@ -233,7 +240,7 @@ func (ap *Autopilot) Run() error { close(tickerFired) return } - ap.logger.Error("autopilot stopped before it was able to confirm it was configured in the bus") + ap.logger.Info("autopilot stopped before it was able to confirm it was configured in the bus") return } diff --git a/internal/node/node.go b/internal/node/node.go index e76b89467..ddcdf03af 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -122,7 +122,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger cm := chain.NewManager(store, state) // create wallet - w, err := NewSingleAddressWallet(seed, cm, sqlStore, sqlStore, logger.Named("wallet").Sugar(), cwallet.WithReservationDuration(cfg.UsedUTXOExpiry)) + w, err := NewSingleAddressWallet(seed, cm.TipState().BlockInterval(), cm, sqlStore, sqlStore, logger.Named("wallet").Sugar(), cwallet.WithReservationDuration(cfg.UsedUTXOExpiry)) if err != nil { return nil, nil, nil, err } diff --git a/internal/node/wallet.go b/internal/node/wallet.go index 804d28734..0443aabd9 100644 --- a/internal/node/wallet.go +++ b/internal/node/wallet.go @@ -11,6 +11,8 @@ import ( "go.uber.org/zap" ) +// TODO: feels quite hacky + type metricRecorder interface { RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error } @@ -18,58 +20,49 @@ type metricRecorder interface { type singleAddressWallet struct { *wallet.SingleAddressWallet - cm *chain.Manager - mr metricRecorder - logger *zap.SugaredLogger + blockInterval time.Duration + cm *chain.Manager + mr metricRecorder + logger *zap.SugaredLogger } -func NewSingleAddressWallet(seed types.PrivateKey, cm *chain.Manager, store wallet.SingleAddressStore, mr metricRecorder, l *zap.SugaredLogger, opts ...wallet.Option) (*singleAddressWallet, error) { +func NewSingleAddressWallet(seed types.PrivateKey, blockInterval time.Duration, cm *chain.Manager, store wallet.SingleAddressStore, mr metricRecorder, l *zap.SugaredLogger, opts ...wallet.Option) (*singleAddressWallet, error) { w, err := wallet.NewSingleAddressWallet(seed, cm, store, opts...) if err != nil { return nil, err } - return &singleAddressWallet{w, cm, mr, l}, nil + return &singleAddressWallet{w, blockInterval, cm, mr, l}, nil } func (w *singleAddressWallet) ProcessChainApplyUpdate(cau *chain.ApplyUpdate, mayCommit bool) error { // escape early if we're not synced - if !w.isSynced() { + if time.Since(cau.Block.Timestamp) >= 2*w.blockInterval { return nil } - // fetch balance - balance, err := w.Balance() - if err != nil { - w.logger.Errorf("failed to fetch wallet balance, err: %v", err) - return nil - } + // record metric in a goroutine + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + w.recordMetric(ctx) + cancel() + }() - // apply sane timeout - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() + return nil +} - // record wallet metric - err = w.mr.RecordWalletMetric(ctx, api.WalletMetric{ +func (w *singleAddressWallet) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error { return nil } + +func (w *singleAddressWallet) recordMetric(ctx context.Context) { + if balance, err := w.Balance(); err != nil { + w.logger.Errorf("failed to fetch wallet balance, err: %v", err) + return + } else if err := w.mr.RecordWalletMetric(ctx, api.WalletMetric{ Timestamp: api.TimeNow(), Confirmed: balance.Confirmed, Unconfirmed: balance.Unconfirmed, Spendable: balance.Spendable, - }) - if err != nil { + }); err != nil { w.logger.Errorf("failed to record wallet metric, err: %v", err) - return nil - } - - return nil -} - -func (w *singleAddressWallet) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error { return nil } - -func (w *singleAddressWallet) isSynced() bool { - var synced bool - if block, ok := w.cm.Block(w.cm.Tip().ID); ok && time.Since(block.Timestamp) < 2*w.cm.TipState().BlockInterval() { - synced = true } - return synced } diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index c2d8a4edc..ce0a6d717 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -2239,46 +2239,51 @@ func TestWalletSendUnconfirmed(t *testing.T) { } func TestWalletFormUnconfirmed(t *testing.T) { - // New cluster with autopilot disabled + // create cluster without autopilot cfg := clusterOptsDefault cfg.skipSettingAutopilot = true cluster := newTestCluster(t, cfg) defer cluster.Shutdown() + + // convenience variables b := cluster.Bus tt := cluster.tt - // Add a host. + // add a host (non-blocking) cluster.AddHosts(1) - // Send the full balance back to the wallet to make sure it's all - // unconfirmed. + // send all money to ourselves, making sure it's unconfirmed + feeReserve := types.Siacoins(1).Div64(100) wr, err := b.Wallet(context.Background()) tt.OK(err) tt.OK(b.SendSiacoins(context.Background(), []types.SiacoinOutput{ { Address: wr.Address, - Value: wr.Confirmed.Sub(types.Siacoins(1).Div64(100)), // leave some for the fee + Value: wr.Confirmed.Sub(feeReserve), // leave some for the fee }, }, false)) - // There should be hardly any money in the wallet. + // check wallet only has the reserve in the confirmed balance wr, err = b.Wallet(context.Background()) tt.OK(err) - if wr.Confirmed.Sub(wr.Unconfirmed).Cmp(types.Siacoins(1).Div64(100)) > 0 { + if wr.Confirmed.Sub(wr.Unconfirmed).Cmp(feeReserve) > 0 { t.Fatal("wallet should have hardly any confirmed balance") } + t.Logf("%+v", wr) + t.Log("Confirmed", wr.Confirmed) + t.Log("Unconfirmed", wr.Unconfirmed) - // There shouldn't be any contracts at this point. + // there shouldn't be any contracts yet contracts, err := b.Contracts(context.Background(), api.ContractsOpts{}) tt.OK(err) if len(contracts) != 0 { t.Fatal("expected 0 contracts", len(contracts)) } - // Enable autopilot by setting it. + // enable the autopilot by configuring it cluster.UpdateAutopilotConfig(context.Background(), testAutopilotConfig) - // Wait for a contract to form. + // wait for a contract to form contractsFormed := cluster.WaitForContracts() if len(contractsFormed) != 1 { t.Fatal("expected 1 contract", len(contracts))