diff --git a/loopd/daemon.go b/loopd/daemon.go index d0b3fe99a..d7b69708a 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -564,6 +564,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error { Store: store, WalletKit: d.lnd.WalletKit, ChainParams: d.lnd.ChainParams, + ChainNotifier: d.lnd.ChainNotifier, + Signer: d.lnd.Signer, } staticAddressManager := staticaddr.NewAddressManager(cfg) @@ -666,8 +668,15 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() + // Lnd's GetInfo call supplies us with the current block height. + info, err := d.lnd.Client.GetInfo(d.mainCtx) + if err != nil { + d.internalErrChan <- err + return + } + log.Info("Starting static address manager...") - err = staticAddressManager.Run(d.mainCtx) + err = staticAddressManager.Run(d.mainCtx, info.BlockHeight) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } diff --git a/staticaddr/manager.go b/staticaddr/manager.go index f724ca0ab..89cfe911c 100644 --- a/staticaddr/manager.go +++ b/staticaddr/manager.go @@ -5,11 +5,14 @@ import ( "context" "fmt" "sync" + "time" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2/schnorr" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop" "github.com/lightninglabs/loop/staticaddr/script" @@ -17,9 +20,25 @@ import ( staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnwallet" ) +const ( + // DepositPollInterval is the interval in which we poll for new deposits + // to our static address. + DepositPollInterval = 10 * time.Second + + // MinConfs is the minimum number of confirmations we require for a + // deposit to be considered available for loop-ins, coop-spends and + // timeouts. + MinConfs = 3 + + // MaxConfs is unset since we don't require a max number of + // confirmations for deposits. + MaxConfs = 0 +) + // ManagerConfig holds the configuration for the address manager. type ManagerConfig struct { // AddressClient is the client that communicates with the loop server @@ -40,37 +59,121 @@ type ManagerConfig struct { // ChainParams is the chain configuration(mainnet, testnet...) this // manager uses. ChainParams *chaincfg.Params + + // ChainNotifier is the chain notifier that is used to listen for new + // blocks. + ChainNotifier lndclient.ChainNotifierClient + + // Signer is the signer client that is used to sign transactions. + Signer lndclient.SignerClient } // Manager manages the address state machines. type Manager struct { cfg *ManagerConfig - initChan chan struct{} + runCtx context.Context sync.Mutex + + // initChan signals the daemon that the address manager has completed + // its initialization. + initChan chan struct{} + + // activeDeposits contains all the active static address outputs. + activeDeposits map[wire.OutPoint]*FSM + + // initiationHeight stores the currently best known block height. + initiationHeight uint32 + + // currentHeight stores the currently best known block height. + currentHeight uint32 + + // addrParams is the static address parameters that are required for + // loop-ins, coop-spends and timeouts. + addrParams *AddressParameters + + // staticAddress contains bitcoin script utils for a static address. + staticAddress *script.StaticAddress + + // deposits contains all the deposits that have ever been made to the + // static address. This field is used to store and recover deposits. It + // also serves as basis for reconciliation of newly detected deposits by + // matching them against deposits in this map that were already seen. + deposits map[wire.OutPoint]*Deposit + + // finalizedDepositChan is a channel that receives deposits that have + // been finalized. The manager will adjust its internal state and flush + // finalized deposits from its memory. + finalizedDepositChan chan wire.OutPoint } // NewAddressManager creates a new address manager. func NewAddressManager(cfg *ManagerConfig) *Manager { return &Manager{ - cfg: cfg, - initChan: make(chan struct{}), + cfg: cfg, + initChan: make(chan struct{}), + activeDeposits: make(map[wire.OutPoint]*FSM), + deposits: make(map[wire.OutPoint]*Deposit), + finalizedDepositChan: make(chan wire.OutPoint), } } // Run runs the address manager. -func (m *Manager) Run(ctx context.Context) error { +func (m *Manager) Run(ctx context.Context, currentHeight uint32) error { log.Debugf("Starting address manager.") defer log.Debugf("Address manager stopped.") + m.runCtx = ctx + + m.Lock() + m.currentHeight, m.initiationHeight = currentHeight, currentHeight + m.Unlock() + + newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(m.runCtx) //nolint:lll + if err != nil { + return err + } + + // Recover previous deposits and static address parameters from the DB. + err = m.recover(m.runCtx) + if err != nil { + return err + } + // Communicate to the caller that the address manager has completed its // initialization. close(m.initChan) - <-ctx.Done() - - return nil + for { + select { + case height := <-newBlockChan: + m.Lock() + m.currentHeight = uint32(height) + m.Unlock() + + // Inform all active deposits about a new block arrival. + for _, fsm := range m.activeDeposits { + select { + case fsm.blockNtfnChan <- uint32(height): + + case <-m.runCtx.Done(): + return m.runCtx.Err() + } + } + case outpoint := <-m.finalizedDepositChan: + // If deposits notify us about their finalization, we + // update the manager's internal state and flush the + // finalized deposit from memory. + m.finalizeDeposit(outpoint) + + case err := <-newBlockErrChan: + return err + + case <-m.runCtx.Done(): + return m.runCtx.Err() + } + } } // NewAddress starts a new address creation flow. @@ -124,8 +227,8 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, return nil, err } + // Parse the server's pubkey that it send us. serverParams := resp.GetParams() - serverPubKey, err := btcec.ParsePubKey(serverParams.ServerKey) if err != nil { return nil, err @@ -177,26 +280,86 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, log.Infof("imported static address taproot script to lnd wallet: %v", addr) + // Retain the static address in memory. + m.staticAddress = staticAddress + + // Retain the address parameters for subroutines of the address manager. + m.addrParams = addrParams + + // Start the deposit poller to notify us about new deposits. + m.pollDeposits(m.runCtx) + return m.getTaprootAddress( clientPubKey.PubKey, serverPubKey, int64(serverParams.Expiry), ) } -func (m *Manager) getTaprootAddress(clientPubkey, - serverPubkey *btcec.PublicKey, expiry int64) (*btcutil.AddressTaproot, - error) { +// recover recovers static address parameters, previous deposits and state +// machines from the database and starts the deposit notifier. +func (m *Manager) recover(ctx context.Context) error { + log.Infof("Recovering static address parameters and deposits...") + addrParams, err := m.cfg.Store.GetAllStaticAddresses(ctx) + if err != nil { + return err + } - staticAddress, err := script.NewStaticAddress( - input.MuSig2Version100RC2, expiry, clientPubkey, serverPubkey, + if len(addrParams) == 0 { + // No static address parameters found. We can't recover. It is + // probably the first time we are running. + return nil + } + + m.addrParams = addrParams[0] + + // Recover the static address from the retained parameters. + m.staticAddress, err = script.NewStaticAddress( + input.MuSig2Version100RC2, int64(m.addrParams.Expiry), + m.addrParams.ClientPubkey, m.addrParams.ServerPubkey, ) if err != nil { - return nil, err + return err } - return btcutil.NewAddressTaproot( - schnorr.SerializePubKey(staticAddress.TaprootKey), - m.cfg.ChainParams, - ) + // Recover deposits. + deposits, err := m.cfg.Store.AllDeposits(ctx) + if err != nil { + return err + } + + for i, d := range deposits { + if d.IsFinal() { + continue + } + + log.Debugf("Recovering deposit %x", d.ID) + + // Create a state machine for a given deposit. + fsm, err := NewFSM( + m.runCtx, m.addrParams, m.staticAddress, d, m.cfg, + m.finalizedDepositChan, true, + ) + if err != nil { + return err + } + + // Send the OnRecover event to the state machine. + go func() { + err = fsm.SendEvent(OnRecover, nil) + if err != nil { + log.Errorf("Error sending OnStart event: %v", + err) + } + }() + + m.deposits[d.OutPoint] = deposits[i] + + m.activeDeposits[d.OutPoint] = fsm + } + + // Start the deposit notifier. + m.pollDeposits(ctx) + + return nil } // WaitInitComplete waits until the address manager has completed its setup. @@ -251,3 +414,214 @@ func (m *Manager) ListUnspentRaw(ctx context.Context, minConfs, return taprootAddress, filteredUtxos, nil } + +// pollDeposits polls new deposits to our static address and notifies the +// manager's event loop about them. +func (m *Manager) pollDeposits(ctx context.Context) { + log.Debugf("waiting for new static address deposits...") + + go func() { + ticker := time.NewTicker(DepositPollInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := m.reconcileDeposits(ctx) + if err != nil { + log.Errorf("unable to reconcile "+ + "deposits: %v", err) + } + + case <-ctx.Done(): + return + } + } + }() +} + +// reconcileDeposits fetches all spends to our static address from our lnd +// wallet and matches it against the deposits in our memory that we've seen so +// far. It picks the newly identified deposits and starts a state machine per +// deposit to track its progress. +func (m *Manager) reconcileDeposits(ctx context.Context) error { + log.Tracef("Reconciling new deposits...") + + _, utxos, err := m.ListUnspentRaw(ctx, MinConfs, MaxConfs) + if err != nil { + return fmt.Errorf("unable to list new deposits: %v", err) + } + + newDeposits := m.filterNewDeposits(utxos) + if err != nil { + return fmt.Errorf("unable to filter new deposits: %v", err) + } + + if len(newDeposits) == 0 { + log.Tracef("No new deposits...") + return nil + } + + for _, utxo := range newDeposits { + deposit, err := m.createNewDeposit(ctx, utxo) + if err != nil { + return fmt.Errorf("unable to retain new deposit: %v", + err) + } + + log.Debugf("Received deposit: %v", deposit) + err = m.startDepositFsm(deposit) + if err != nil { + return fmt.Errorf("unable to start new deposit FSM: %v", + err) + } + } + + return nil +} + +// createNewDeposit transforms the wallet utxo into a deposit struct and stores +// it in our database and manager memory. +func (m *Manager) createNewDeposit(ctx context.Context, + utxo *lnwallet.Utxo) (*Deposit, error) { + + blockHeight, err := m.getBlockHeight(ctx, utxo) + if err != nil { + return nil, err + } + + // Get the sweep pk script. + addr, err := m.cfg.WalletKit.NextAddr( + ctx, lnwallet.DefaultAccountName, + walletrpc.AddressType_TAPROOT_PUBKEY, false, + ) + if err != nil { + return nil, err + } + + timeoutSweepPkScript, err := txscript.PayToAddrScript(addr) + if err != nil { + return nil, err + } + + id, err := GetRandomDepositID() + if err != nil { + return nil, err + } + deposit := &Deposit{ + ID: id, + State: Deposited, + OutPoint: utxo.OutPoint, + Value: utxo.Value, + ConfirmationHeight: int64(blockHeight), + TimeOutSweepPkScript: timeoutSweepPkScript, + } + + err = m.cfg.Store.CreateDeposit(ctx, deposit) + if err != nil { + return nil, err + } + + m.Lock() + m.deposits[deposit.OutPoint] = deposit + m.Unlock() + + return deposit, nil +} + +// getBlockHeight retrieves the block height of a given utxo. +func (m *Manager) getBlockHeight(ctx context.Context, + utxo *lnwallet.Utxo) (uint32, error) { + + notifChan, errChan, err := m.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll + ctx, &utxo.OutPoint.Hash, m.addrParams.PkScript, MinConfs, + int32(m.initiationHeight), + ) + if err != nil { + return 0, err + } + + select { + case tx := <-notifChan: + return tx.BlockHeight, nil + + case err := <-errChan: + return 0, err + + case <-ctx.Done(): + return 0, ctx.Err() + } +} + +// filterNewDeposits filters the given utxos for new deposits that we haven't +// seen before. +func (m *Manager) filterNewDeposits(utxos []*lnwallet.Utxo) []*lnwallet.Utxo { + m.Lock() + defer m.Unlock() + + var newDeposits []*lnwallet.Utxo + for _, utxo := range utxos { + _, ok := m.deposits[utxo.OutPoint] + if !ok { + newDeposits = append(newDeposits, utxo) + } + } + + return newDeposits +} + +// startDepositFsm creates a new state machine flow from the latest deposit to +// our static address. +func (m *Manager) startDepositFsm(deposit *Deposit) error { + // Create a state machine for a given deposit. + fsm, err := NewFSM( + m.runCtx, m.addrParams, m.staticAddress, deposit, m.cfg, + m.finalizedDepositChan, false, + ) + if err != nil { + return err + } + + // Send the start event to the state machine. + go func() { + err = fsm.SendEvent(OnStart, nil) + if err != nil { + log.Errorf("Error sending OnStart event: %v", err) + } + }() + + err = fsm.DefaultObserver.WaitForState(m.runCtx, time.Minute, Deposited) + if err != nil { + return err + } + + // Add the FSM to the active FSMs map. + m.Lock() + m.activeDeposits[deposit.OutPoint] = fsm + m.Unlock() + + return nil +} + +func (m *Manager) getTaprootAddress(clientPubkey, + serverPubkey *btcec.PublicKey, expiry int64) (*btcutil.AddressTaproot, + error) { + + staticAddress, err := script.NewStaticAddress( + input.MuSig2Version100RC2, expiry, clientPubkey, serverPubkey, + ) + if err != nil { + return nil, err + } + + return btcutil.NewAddressTaproot( + schnorr.SerializePubKey(staticAddress.TaprootKey), + m.cfg.ChainParams, + ) +} + +func (m *Manager) finalizeDeposit(outpoint wire.OutPoint) { + m.Lock() + delete(m.activeDeposits, outpoint) + delete(m.deposits, outpoint) + m.Unlock() +}