diff --git a/lnwallet/dcrwallet/rpcsync.go b/lnwallet/dcrwallet/rpcsync.go index 8c20b73651..59349bac67 100644 --- a/lnwallet/dcrwallet/rpcsync.go +++ b/lnwallet/dcrwallet/rpcsync.go @@ -2,6 +2,8 @@ package dcrwallet import ( "context" + "sync" + "time" "github.com/decred/dcrd/chaincfg/v2" "github.com/decred/dcrd/rpcclient/v5" @@ -13,9 +15,14 @@ import ( // RPCSyncer implements the required methods for synchronizing a DcrWallet // instance using a full node dcrd backend. type RPCSyncer struct { - cancel func() rpcConfig rpcclient.ConnConfig net *chaincfg.Params + + mtx sync.Mutex + + // The following fields are protected by mtx. + + cancel func() } // NewRPCSyncer initializes a new syncer backed by a full dcrd node. It @@ -31,36 +38,54 @@ func NewRPCSyncer(rpcConfig rpcclient.ConnConfig, net *chaincfg.Params) (*RPCSyn // start the syncer backend and begin synchronizing the given wallet. func (s *RPCSyncer) start(w *DcrWallet) error { - dcrwLog.Debugf("Starting rpc syncer") - - // This context will be canceled by `w` once its Stop() method is - // called. - var ctx context.Context - ctx, s.cancel = context.WithCancel(context.Background()) - chainRpcOpts := chain.RPCOptions{ Address: s.rpcConfig.Host, User: s.rpcConfig.User, Pass: s.rpcConfig.Pass, CA: s.rpcConfig.Certificates, } - syncer := chain.NewSyncer(w.wallet, &chainRpcOpts) - syncer.SetCallbacks(&chain.Callbacks{ - Synced: w.onRPCSyncerSynced, - }) go func() { - err := syncer.Run(ctx) - dcrwLog.Debugf("RPCsyncer shutting down") - - // TODO: convert to errors.Is - if werr, is := err.(*errors.Error); is && werr.Err == context.Canceled { - // This was a graceful shutdown, so ignore the error. - dcrwLog.Debugf("RPCsyncer shutting down") - return - } + for { + // This context will be canceled by `w` once its Stop() method is + // called. + ctx, cancel := context.WithCancel(context.Background()) + s.mtx.Lock() + s.cancel = cancel + s.mtx.Unlock() - dcrwLog.Errorf("RPCSyncer error: %v", err) + syncer := chain.NewSyncer(w.wallet, &chainRpcOpts) + syncer.SetCallbacks(&chain.Callbacks{ + Synced: w.onRPCSyncerSynced, + }) + + dcrwLog.Debugf("Starting rpc syncer") + err := syncer.Run(ctx) + w.rpcSyncerFinished() + + // TODO: convert to errors.Is + if werr, is := err.(*errors.Error); is && werr.Err == context.Canceled { + // This was a graceful shutdown, so ignore the error. + dcrwLog.Debugf("RPCsyncer shutting down") + return + } + dcrwLog.Errorf("RPCSyncer error: %v", err) + + // Backoff for 5 seconds. + select { + case <-ctx.Done(): + // Graceful shutdown. + dcrwLog.Debugf("RPCsyncer shutting down") + return + case <-time.After(5 * time.Second): + } + + // Clear and call s.cancel() so we don't leak it. + s.mtx.Lock() + s.cancel = nil + s.mtx.Unlock() + cancel() + } }() return nil @@ -68,5 +93,10 @@ func (s *RPCSyncer) start(w *DcrWallet) error { func (s *RPCSyncer) stop() { dcrwLog.Debugf("RPCSyncer requested shutdown") - s.cancel() + s.mtx.Lock() + if s.cancel != nil { + s.cancel() + s.cancel = nil + } + s.mtx.Unlock() } diff --git a/lnwallet/dcrwallet/wallet.go b/lnwallet/dcrwallet/wallet.go index 854c784f5e..4b07a6f9cc 100644 --- a/lnwallet/dcrwallet/wallet.go +++ b/lnwallet/dcrwallet/wallet.go @@ -30,6 +30,20 @@ const ( scriptVersion = uint16(0) ) +const ( + // The following values are used by atomicWalletSync. Their + // interpretation is the following: + // + // - Unsynced: wallet just started and hasn't performed the first sync + // yet. + // - Synced: wallet is currently synced. + // - LostSync: wallet was synced in the past but lost the connection to + // the network and it's unknown whether it's synced or not. + syncStatusUnsynced uint32 = 0 + syncStatusSynced = 1 + syncStatusLostSync = 2 +) + // DcrWallet is an implementation of the lnwallet.WalletController interface // backed by an active instance of dcrwallet. At the time of the writing of // this documentation, this implementation requires a full dcrd node to @@ -42,9 +56,12 @@ const ( // wallet has been fully synced. type DcrWallet struct { // wallet is an active instance of dcrwallet. - wallet *base.Wallet - loader *walletloader.Loader - atomicWalletSynced uint32 // CAS (synced=1) when wallet syncing complete + wallet *base.Wallet + loader *walletloader.Loader + + // atomicWalletSync controls the current sync status of the wallet. It + // MUST be used atomically. + atomicWalletSynced uint32 // syncedChan is a channel that is closed once the wallet has initially // synced to the network. It is protected by atomicWalletSynced. @@ -107,12 +124,13 @@ func New(cfg Config) (*DcrWallet, error) { } return &DcrWallet{ - cfg: &cfg, - wallet: wallet, - loader: loader, - syncer: syncer, - syncedChan: make(chan struct{}), - netParams: cfg.NetParams, + cfg: &cfg, + wallet: wallet, + loader: loader, + syncer: syncer, + syncedChan: make(chan struct{}), + atomicWalletSynced: syncStatusUnsynced, + netParams: cfg.NetParams, }, nil } @@ -807,6 +825,12 @@ func (b *DcrWallet) InitialSyncChannel() <-chan struct{} { func (b *DcrWallet) onRPCSyncerSynced(synced bool) { dcrwLog.Debug("RPC syncer notified wallet is synced") + if atomic.CompareAndSwapUint32(&b.atomicWalletSynced, syncStatusLostSync, syncStatusSynced) { + // No need to recreate the keyring or close the initial sync + // channel, so just return. + return + } + // Now that the wallet is synced and address discovery has ended, we // can create the keyring. We can only do this here (after sync) // because address discovery might upgrade the underlying dcrwallet @@ -821,7 +845,13 @@ func (b *DcrWallet) onRPCSyncerSynced(synced bool) { } // Signal that the wallet is synced by closing the channel. - if atomic.CompareAndSwapUint32(&b.atomicWalletSynced, 0, 1) { + if atomic.CompareAndSwapUint32(&b.atomicWalletSynced, syncStatusUnsynced, syncStatusSynced) { close(b.syncedChan) } } + +func (b *DcrWallet) rpcSyncerFinished() { + // The RPC syncer stopped, so if we were previously synced we need to + // signal that we aren't anymore. + atomic.CompareAndSwapUint32(&b.atomicWalletSynced, syncStatusSynced, syncStatusLostSync) +}