Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3/?] StaticAddr: Deposit tracking and timeout sweep #677

Merged
merged 9 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 9 additions & 26 deletions cmd/loop/staticaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func newStaticAddress(ctx *cli.Context) error {
return cli.ShowCommandHelp(ctx, "new")
}

client, cleanup, err := getAddressClient(ctx)
client, cleanup, err := getClient(ctx)
if err != nil {
return err
}
defer cleanup()

resp, err := client.NewAddress(
ctxb, &looprpc.NewAddressRequest{},
resp, err := client.NewStaticAddress(
ctxb, &looprpc.NewStaticAddressRequest{},
)
if err != nil {
return err
Expand Down Expand Up @@ -86,16 +86,17 @@ func listUnspent(ctx *cli.Context) error {
return cli.ShowCommandHelp(ctx, "listunspent")
}

client, cleanup, err := getAddressClient(ctx)
client, cleanup, err := getClient(ctx)
if err != nil {
return err
}
defer cleanup()

resp, err := client.ListUnspent(ctxb, &looprpc.ListUnspentRequest{
MinConfs: int32(ctx.Int("min_confs")),
MaxConfs: int32(ctx.Int("max_confs")),
})
resp, err := client.ListUnspentDeposits(
ctxb, &looprpc.ListUnspentDepositsRequest{
MinConfs: int32(ctx.Int("min_confs")),
MaxConfs: int32(ctx.Int("max_confs")),
})
if err != nil {
return err
}
Expand All @@ -104,21 +105,3 @@ func listUnspent(ctx *cli.Context) error {

return nil
}

func getAddressClient(ctx *cli.Context) (looprpc.StaticAddressClientClient,
func(), error) {

rpcServer := ctx.GlobalString("rpcserver")
tlsCertPath, macaroonPath, err := extractPathArgs(ctx)
if err != nil {
return nil, nil, err
}
conn, err := getClientConn(rpcServer, tlsCertPath, macaroonPath)
if err != nil {
return nil, nil, err
}
cleanup := func() { conn.Close() }

addressClient := looprpc.NewStaticAddressClientClient(conn)
return addressClient, cleanup, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgx/v4 v4.18.2
github.com/jessevdk/go-flags v1.4.0
github.com/lib/pq v1.10.7
github.com/lightninglabs/aperture v0.1.21-beta.0.20230705004936-87bb996a4030
Expand Down Expand Up @@ -98,6 +97,7 @@ require (
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgx/v4 v4.18.2 // indirect
github.com/jackpal/gateway v1.0.5 // indirect
github.com/jackpal/go-nat-pmp v0.0.0-20170405195558-28a68d0c24ad // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
Expand Down
132 changes: 82 additions & 50 deletions loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"github.com/lightninglabs/loop/loopd/perms"
"github.com/lightninglabs/loop/loopdb"
loop_looprpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/staticaddr"
"github.com/lightninglabs/loop/staticaddr/address"
"github.com/lightninglabs/loop/staticaddr/deposit"
loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightningnetwork/lnd/clock"
Expand Down Expand Up @@ -68,12 +69,6 @@ type Daemon struct {
// same process.
swapClientServer

// AddressServer is the embedded RPC server that satisfies the
// static address client RPC interface. We embed this struct so the
// Daemon itself can be registered to an existing grpc.Server to run as
// a subserver in the same process.
*staticaddr.AddressServer

// ErrChan is an error channel that users of the Daemon struct must use
// to detect runtime errors and also whether a shutdown is fully
// completed.
Expand Down Expand Up @@ -239,7 +234,6 @@ func (d *Daemon) startWebServers() error {
grpc.StreamInterceptor(streamInterceptor),
)
loop_looprpc.RegisterSwapClientServer(d.grpcServer, d)
loop_looprpc.RegisterStaticAddressClientServer(d.grpcServer, d)

// Register our debug server if it is compiled in.
d.registerDebugServer()
Expand Down Expand Up @@ -438,6 +432,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
swapClient.Conn,
)

// Create a static address server client.
staticAddressClient := loop_swaprpc.NewStaticAddressServerClient(
swapClient.Conn,
)

// Both the client RPC server and the swap server client should stop
// on main context cancel. So we create it early and pass it down.
d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -498,6 +497,9 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
var (
reservationManager *reservation.Manager
instantOutManager *instantout.Manager

staticAddressManager *address.Manager
depositManager *deposit.Manager
)
// Create the reservation and instantout managers.
if d.cfg.EnableExperimental {
Expand Down Expand Up @@ -534,43 +536,50 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
instantOutManager = instantout.NewInstantOutManager(
instantOutConfig,
)

// Static address manager setup.
staticAddressStore := address.NewSqlStore(baseDb)
addrCfg := &address.ManagerConfig{
AddressClient: staticAddressClient,
FetchL402: swapClient.Server.FetchL402,
Store: staticAddressStore,
WalletKit: d.lnd.WalletKit,
ChainParams: d.lnd.ChainParams,
}
staticAddressManager = address.NewManager(addrCfg)

// Static address deposit manager setup.
depositStore := deposit.NewSqlStore(baseDb)
depoCfg := &deposit.ManagerConfig{
AddressClient: staticAddressClient,
AddressManager: staticAddressManager,
SwapClient: swapClient,
Store: depositStore,
WalletKit: d.lnd.WalletKit,
ChainParams: d.lnd.ChainParams,
ChainNotifier: d.lnd.ChainNotifier,
Signer: d.lnd.Signer,
}
depositManager = deposit.NewManager(depoCfg)
}

// Now finally fully initialize the swap client RPC server instance.
d.swapClientServer = swapClientServer{
config: d.cfg,
network: lndclient.Network(d.cfg.Network),
impl: swapClient,
liquidityMgr: getLiquidityManager(swapClient),
lnd: &d.lnd.LndServices,
swaps: make(map[lntypes.Hash]loop.SwapInfo),
subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo),
mainCtx: d.mainCtx,
reservationManager: reservationManager,
instantOutManager: instantOutManager,
config: d.cfg,
network: lndclient.Network(d.cfg.Network),
impl: swapClient,
liquidityMgr: getLiquidityManager(swapClient),
lnd: &d.lnd.LndServices,
swaps: make(map[lntypes.Hash]loop.SwapInfo),
subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo),
mainCtx: d.mainCtx,
reservationManager: reservationManager,
instantOutManager: instantOutManager,
staticAddressManager: staticAddressManager,
depositManager: depositManager,
}

// Create a static address server client.
staticAddressClient := loop_swaprpc.NewStaticAddressServerClient(
swapClient.Conn,
)

store := staticaddr.NewSqlStore(baseDb)

cfg := &staticaddr.ManagerConfig{
AddressClient: staticAddressClient,
SwapClient: swapClient,
Store: store,
WalletKit: d.lnd.WalletKit,
ChainParams: d.lnd.ChainParams,
}
staticAddressManager := staticaddr.NewAddressManager(cfg)

d.AddressServer = staticaddr.NewAddressServer(
staticAddressClient, staticAddressManager,
)

// Retrieve all currently existing swaps from the database.
swapsList, err := d.impl.FetchSwaps(d.mainCtx)
if err != nil {
Expand Down Expand Up @@ -662,20 +671,43 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}

// Start the static address manager.
d.wg.Add(1)
go func() {
defer d.wg.Done()
if staticAddressManager != nil {
d.wg.Add(1)
go func() {
defer d.wg.Done()

log.Info("Starting static address manager...")
err = staticAddressManager.Run(d.mainCtx)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
log.Info("Starting static address manager...")
err = staticAddressManager.Run(d.mainCtx)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
log.Info("Static address manager stopped")
}()
}

log.Info("Static address manager stopped")
}()
// Start the static address deposit manager.
if depositManager != nil {
d.wg.Add(1)
go func() {
defer d.wg.Done()

staticAddressManager.WaitInitComplete()
// Lnd's GetInfo call supplies us with the current block
// height.
info, err := d.lnd.Client.GetInfo(d.mainCtx)
if err != nil {
d.internalErrChan <- err
return
}

log.Info("Starting static address deposit manager...")
err = depositManager.Run(d.mainCtx, info.BlockHeight)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
log.Info("Static address deposit manager stopped")
}()
depositManager.WaitInitComplete()
}

// Last, start our internal error handler. This will return exactly one
// error or nil on the main error channel to inform the caller that
Expand Down
4 changes: 2 additions & 2 deletions loopd/perms/perms.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ var RequiredPermissions = map[string][]bakery.Op{
Entity: "loop",
Action: "in",
}},
"/looprpc.StaticAddressClient/NewAddress": {{
"/looprpc.SwapClient/NewStaticAddress": {{
Entity: "swap",
Action: "read",
}, {
Entity: "loop",
Action: "in",
}},
"/looprpc.StaticAddressClient/ListUnspent": {{
"/looprpc.SwapClient/ListUnspentDeposits": {{
Entity: "swap",
Action: "read",
}, {
Expand Down
75 changes: 62 additions & 13 deletions loopd/swapclient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb"
clientrpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/staticaddr/address"
"github.com/lightninglabs/loop/staticaddr/deposit"
"github.com/lightninglabs/loop/swap"
looprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
Expand Down Expand Up @@ -76,19 +78,21 @@ type swapClientServer struct {
clientrpc.UnimplementedSwapClientServer
clientrpc.UnimplementedDebugServer

config *Config
network lndclient.Network
impl *loop.Client
liquidityMgr *liquidity.Manager
lnd *lndclient.LndServices
reservationManager *reservation.Manager
instantOutManager *instantout.Manager
swaps map[lntypes.Hash]loop.SwapInfo
subscribers map[int]chan<- interface{}
statusChan chan loop.SwapInfo
nextSubscriberID int
swapsLock sync.Mutex
mainCtx context.Context
config *Config
network lndclient.Network
impl *loop.Client
liquidityMgr *liquidity.Manager
lnd *lndclient.LndServices
reservationManager *reservation.Manager
instantOutManager *instantout.Manager
staticAddressManager *address.Manager
depositManager *deposit.Manager
swaps map[lntypes.Hash]loop.SwapInfo
subscribers map[int]chan<- interface{}
statusChan chan loop.SwapInfo
nextSubscriberID int
swapsLock sync.Mutex
mainCtx context.Context
}

// LoopOut initiates a loop out swap with the given parameters. The call returns
Expand Down Expand Up @@ -1272,6 +1276,51 @@ func rpcInstantOut(instantOut *instantout.InstantOut) *clientrpc.InstantOut {
}
}

// NewStaticAddress is the rpc endpoint for loop clients to request a new static
// address.
func (s *swapClientServer) NewStaticAddress(ctx context.Context,
_ *clientrpc.NewStaticAddressRequest) (
*clientrpc.NewStaticAddressResponse, error) {

staticAddress, err := s.staticAddressManager.NewAddress(ctx)
if err != nil {
return nil, err
}

return &clientrpc.NewStaticAddressResponse{
Address: staticAddress.String(),
}, nil
}

// ListUnspentDeposits returns a list of utxos behind the static address.
func (s *swapClientServer) ListUnspentDeposits(ctx context.Context,
req *clientrpc.ListUnspentDepositsRequest) (
*clientrpc.ListUnspentDepositsResponse, error) {

// List all unspent utxos the wallet sees, regardless of the number of
// confirmations.
staticAddress, utxos, err := s.staticAddressManager.ListUnspentRaw(
ctx, req.MinConfs, req.MaxConfs,
)
if err != nil {
return nil, err
}

// Prepare the list response.
var respUtxos []*clientrpc.Utxo
for _, u := range utxos {
utxo := &clientrpc.Utxo{
StaticAddress: staticAddress.String(),
AmountSat: int64(u.Value),
Confirmations: u.Confirmations,
Outpoint: u.OutPoint.String(),
}
respUtxos = append(respUtxos, utxo)
}

return &clientrpc.ListUnspentDepositsResponse{Utxos: respUtxos}, nil
}

func rpcAutoloopReason(reason liquidity.Reason) (clientrpc.AutoReason, error) {
switch reason {
case liquidity.ReasonNone:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS deposits;
Loading
Loading