Skip to content

Commit

Permalink
Merge pull request #677 from hieblmi/static-addr-3
Browse files Browse the repository at this point in the history
[3/?] StaticAddr: Deposit tracking and timeout sweep
  • Loading branch information
hieblmi authored Apr 25, 2024
2 parents 45eb57a + 9fe9aa9 commit 3961d45
Show file tree
Hide file tree
Showing 32 changed files with 2,700 additions and 727 deletions.
35 changes: 9 additions & 26 deletions cmd/loop/staticaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func newStaticAddress(ctx *cli.Context) error {
return cli.ShowCommandHelp(ctx, "new")
}

client, cleanup, err := getAddressClient(ctx)
client, cleanup, err := getClient(ctx)
if err != nil {
return err
}
defer cleanup()

resp, err := client.NewAddress(
ctxb, &looprpc.NewAddressRequest{},
resp, err := client.NewStaticAddress(
ctxb, &looprpc.NewStaticAddressRequest{},
)
if err != nil {
return err
Expand Down Expand Up @@ -86,16 +86,17 @@ func listUnspent(ctx *cli.Context) error {
return cli.ShowCommandHelp(ctx, "listunspent")
}

client, cleanup, err := getAddressClient(ctx)
client, cleanup, err := getClient(ctx)
if err != nil {
return err
}
defer cleanup()

resp, err := client.ListUnspent(ctxb, &looprpc.ListUnspentRequest{
MinConfs: int32(ctx.Int("min_confs")),
MaxConfs: int32(ctx.Int("max_confs")),
})
resp, err := client.ListUnspentDeposits(
ctxb, &looprpc.ListUnspentDepositsRequest{
MinConfs: int32(ctx.Int("min_confs")),
MaxConfs: int32(ctx.Int("max_confs")),
})
if err != nil {
return err
}
Expand All @@ -104,21 +105,3 @@ func listUnspent(ctx *cli.Context) error {

return nil
}

func getAddressClient(ctx *cli.Context) (looprpc.StaticAddressClientClient,
func(), error) {

rpcServer := ctx.GlobalString("rpcserver")
tlsCertPath, macaroonPath, err := extractPathArgs(ctx)
if err != nil {
return nil, nil, err
}
conn, err := getClientConn(rpcServer, tlsCertPath, macaroonPath)
if err != nil {
return nil, nil, err
}
cleanup := func() { conn.Close() }

addressClient := looprpc.NewStaticAddressClientClient(conn)
return addressClient, cleanup, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgx/v4 v4.18.2
github.com/jessevdk/go-flags v1.4.0
github.com/lib/pq v1.10.7
github.com/lightninglabs/aperture v0.1.21-beta.0.20230705004936-87bb996a4030
Expand Down Expand Up @@ -98,6 +97,7 @@ require (
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgx/v4 v4.18.2 // indirect
github.com/jackpal/gateway v1.0.5 // indirect
github.com/jackpal/go-nat-pmp v0.0.0-20170405195558-28a68d0c24ad // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
Expand Down
132 changes: 82 additions & 50 deletions loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"github.com/lightninglabs/loop/loopd/perms"
"github.com/lightninglabs/loop/loopdb"
loop_looprpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/staticaddr"
"github.com/lightninglabs/loop/staticaddr/address"
"github.com/lightninglabs/loop/staticaddr/deposit"
loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightningnetwork/lnd/clock"
Expand Down Expand Up @@ -68,12 +69,6 @@ type Daemon struct {
// same process.
swapClientServer

// AddressServer is the embedded RPC server that satisfies the
// static address client RPC interface. We embed this struct so the
// Daemon itself can be registered to an existing grpc.Server to run as
// a subserver in the same process.
*staticaddr.AddressServer

// ErrChan is an error channel that users of the Daemon struct must use
// to detect runtime errors and also whether a shutdown is fully
// completed.
Expand Down Expand Up @@ -239,7 +234,6 @@ func (d *Daemon) startWebServers() error {
grpc.StreamInterceptor(streamInterceptor),
)
loop_looprpc.RegisterSwapClientServer(d.grpcServer, d)
loop_looprpc.RegisterStaticAddressClientServer(d.grpcServer, d)

// Register our debug server if it is compiled in.
d.registerDebugServer()
Expand Down Expand Up @@ -438,6 +432,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
swapClient.Conn,
)

// Create a static address server client.
staticAddressClient := loop_swaprpc.NewStaticAddressServerClient(
swapClient.Conn,
)

// Both the client RPC server and the swap server client should stop
// on main context cancel. So we create it early and pass it down.
d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -498,6 +497,9 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
var (
reservationManager *reservation.Manager
instantOutManager *instantout.Manager

staticAddressManager *address.Manager
depositManager *deposit.Manager
)
// Create the reservation and instantout managers.
if d.cfg.EnableExperimental {
Expand Down Expand Up @@ -534,43 +536,50 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
instantOutManager = instantout.NewInstantOutManager(
instantOutConfig,
)

// Static address manager setup.
staticAddressStore := address.NewSqlStore(baseDb)
addrCfg := &address.ManagerConfig{
AddressClient: staticAddressClient,
FetchL402: swapClient.Server.FetchL402,
Store: staticAddressStore,
WalletKit: d.lnd.WalletKit,
ChainParams: d.lnd.ChainParams,
}
staticAddressManager = address.NewManager(addrCfg)

// Static address deposit manager setup.
depositStore := deposit.NewSqlStore(baseDb)
depoCfg := &deposit.ManagerConfig{
AddressClient: staticAddressClient,
AddressManager: staticAddressManager,
SwapClient: swapClient,
Store: depositStore,
WalletKit: d.lnd.WalletKit,
ChainParams: d.lnd.ChainParams,
ChainNotifier: d.lnd.ChainNotifier,
Signer: d.lnd.Signer,
}
depositManager = deposit.NewManager(depoCfg)
}

// Now finally fully initialize the swap client RPC server instance.
d.swapClientServer = swapClientServer{
config: d.cfg,
network: lndclient.Network(d.cfg.Network),
impl: swapClient,
liquidityMgr: getLiquidityManager(swapClient),
lnd: &d.lnd.LndServices,
swaps: make(map[lntypes.Hash]loop.SwapInfo),
subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo),
mainCtx: d.mainCtx,
reservationManager: reservationManager,
instantOutManager: instantOutManager,
config: d.cfg,
network: lndclient.Network(d.cfg.Network),
impl: swapClient,
liquidityMgr: getLiquidityManager(swapClient),
lnd: &d.lnd.LndServices,
swaps: make(map[lntypes.Hash]loop.SwapInfo),
subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo),
mainCtx: d.mainCtx,
reservationManager: reservationManager,
instantOutManager: instantOutManager,
staticAddressManager: staticAddressManager,
depositManager: depositManager,
}

// Create a static address server client.
staticAddressClient := loop_swaprpc.NewStaticAddressServerClient(
swapClient.Conn,
)

store := staticaddr.NewSqlStore(baseDb)

cfg := &staticaddr.ManagerConfig{
AddressClient: staticAddressClient,
SwapClient: swapClient,
Store: store,
WalletKit: d.lnd.WalletKit,
ChainParams: d.lnd.ChainParams,
}
staticAddressManager := staticaddr.NewAddressManager(cfg)

d.AddressServer = staticaddr.NewAddressServer(
staticAddressClient, staticAddressManager,
)

// Retrieve all currently existing swaps from the database.
swapsList, err := d.impl.FetchSwaps(d.mainCtx)
if err != nil {
Expand Down Expand Up @@ -662,20 +671,43 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}

// Start the static address manager.
d.wg.Add(1)
go func() {
defer d.wg.Done()
if staticAddressManager != nil {
d.wg.Add(1)
go func() {
defer d.wg.Done()

log.Info("Starting static address manager...")
err = staticAddressManager.Run(d.mainCtx)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
log.Info("Starting static address manager...")
err = staticAddressManager.Run(d.mainCtx)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
log.Info("Static address manager stopped")
}()
}

log.Info("Static address manager stopped")
}()
// Start the static address deposit manager.
if depositManager != nil {
d.wg.Add(1)
go func() {
defer d.wg.Done()

staticAddressManager.WaitInitComplete()
// Lnd's GetInfo call supplies us with the current block
// height.
info, err := d.lnd.Client.GetInfo(d.mainCtx)
if err != nil {
d.internalErrChan <- err
return
}

log.Info("Starting static address deposit manager...")
err = depositManager.Run(d.mainCtx, info.BlockHeight)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
log.Info("Static address deposit manager stopped")
}()
depositManager.WaitInitComplete()
}

// Last, start our internal error handler. This will return exactly one
// error or nil on the main error channel to inform the caller that
Expand Down
4 changes: 2 additions & 2 deletions loopd/perms/perms.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ var RequiredPermissions = map[string][]bakery.Op{
Entity: "loop",
Action: "in",
}},
"/looprpc.StaticAddressClient/NewAddress": {{
"/looprpc.SwapClient/NewStaticAddress": {{
Entity: "swap",
Action: "read",
}, {
Entity: "loop",
Action: "in",
}},
"/looprpc.StaticAddressClient/ListUnspent": {{
"/looprpc.SwapClient/ListUnspentDeposits": {{
Entity: "swap",
Action: "read",
}, {
Expand Down
75 changes: 62 additions & 13 deletions loopd/swapclient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb"
clientrpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/staticaddr/address"
"github.com/lightninglabs/loop/staticaddr/deposit"
"github.com/lightninglabs/loop/swap"
looprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
Expand Down Expand Up @@ -76,19 +78,21 @@ type swapClientServer struct {
clientrpc.UnimplementedSwapClientServer
clientrpc.UnimplementedDebugServer

config *Config
network lndclient.Network
impl *loop.Client
liquidityMgr *liquidity.Manager
lnd *lndclient.LndServices
reservationManager *reservation.Manager
instantOutManager *instantout.Manager
swaps map[lntypes.Hash]loop.SwapInfo
subscribers map[int]chan<- interface{}
statusChan chan loop.SwapInfo
nextSubscriberID int
swapsLock sync.Mutex
mainCtx context.Context
config *Config
network lndclient.Network
impl *loop.Client
liquidityMgr *liquidity.Manager
lnd *lndclient.LndServices
reservationManager *reservation.Manager
instantOutManager *instantout.Manager
staticAddressManager *address.Manager
depositManager *deposit.Manager
swaps map[lntypes.Hash]loop.SwapInfo
subscribers map[int]chan<- interface{}
statusChan chan loop.SwapInfo
nextSubscriberID int
swapsLock sync.Mutex
mainCtx context.Context
}

// LoopOut initiates a loop out swap with the given parameters. The call returns
Expand Down Expand Up @@ -1272,6 +1276,51 @@ func rpcInstantOut(instantOut *instantout.InstantOut) *clientrpc.InstantOut {
}
}

// NewStaticAddress is the rpc endpoint for loop clients to request a new static
// address.
func (s *swapClientServer) NewStaticAddress(ctx context.Context,
_ *clientrpc.NewStaticAddressRequest) (
*clientrpc.NewStaticAddressResponse, error) {

staticAddress, err := s.staticAddressManager.NewAddress(ctx)
if err != nil {
return nil, err
}

return &clientrpc.NewStaticAddressResponse{
Address: staticAddress.String(),
}, nil
}

// ListUnspentDeposits returns a list of utxos behind the static address.
func (s *swapClientServer) ListUnspentDeposits(ctx context.Context,
req *clientrpc.ListUnspentDepositsRequest) (
*clientrpc.ListUnspentDepositsResponse, error) {

// List all unspent utxos the wallet sees, regardless of the number of
// confirmations.
staticAddress, utxos, err := s.staticAddressManager.ListUnspentRaw(
ctx, req.MinConfs, req.MaxConfs,
)
if err != nil {
return nil, err
}

// Prepare the list response.
var respUtxos []*clientrpc.Utxo
for _, u := range utxos {
utxo := &clientrpc.Utxo{
StaticAddress: staticAddress.String(),
AmountSat: int64(u.Value),
Confirmations: u.Confirmations,
Outpoint: u.OutPoint.String(),
}
respUtxos = append(respUtxos, utxo)
}

return &clientrpc.ListUnspentDepositsResponse{Utxos: respUtxos}, nil
}

func rpcAutoloopReason(reason liquidity.Reason) (clientrpc.AutoReason, error) {
switch reason {
case liquidity.ReasonNone:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS deposits;
Loading

0 comments on commit 3961d45

Please sign in to comment.