From 27c4de717e2317c560a72d3a50a6d4c9abd05658 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 22 Mar 2024 09:29:34 -0300 Subject: [PATCH 1/4] server: Refactor channel info into own function This will enable querying the server for this information without having to also decode it somewhere else. --- server/chanscore.go | 10 +- server/chanscore_test.go | 2 +- server/server.go | 259 ++++++++++++++++++++++++++++----------- 3 files changed, 191 insertions(+), 80 deletions(-) diff --git a/server/chanscore.go b/server/chanscore.go index 09fa69a..3617930 100644 --- a/server/chanscore.go +++ b/server/chanscore.go @@ -2,7 +2,7 @@ package server import "time" -// chanActivityScore holds "how much activity" a channel received through some +// ChanActivityScore holds "how much activity" a channel received through some // period of time. // // The basic equation for determining the "activity" score for a channel is @@ -12,17 +12,17 @@ import "time" // The interpretation for this equation is that the activity score is the // percentage of the channel capacity sent through the channel during its // entire lifetime. -type chanActivityScore float64 +type ChanActivityScore float64 // toPercent returns the activity score as a percentage. -func (s chanActivityScore) toPercent() float64 { +func (s ChanActivityScore) ToPercent() float64 { return float64(s) * 100 } // channelActivity returns the "activity" score for a channel, which measures // the total amount of atoms sent through the channel during its lifetime. func channelActivity(totalSentAtoms, channelSizeAtoms int64, - lifetime time.Duration) chanActivityScore { + lifetime time.Duration) ChanActivityScore { if lifetime <= 0 { panic("lifetime cannot be <= 0") @@ -37,5 +37,5 @@ func channelActivity(totalSentAtoms, channelSizeAtoms int64, hours = 1 } - return chanActivityScore(float64(totalSentAtoms) / float64(channelSizeAtoms) / hours) + return ChanActivityScore(float64(totalSentAtoms) / float64(channelSizeAtoms) / hours) } diff --git a/server/chanscore_test.go b/server/chanscore_test.go index 4f9c72e..1bf9aba 100644 --- a/server/chanscore_test.go +++ b/server/chanscore_test.go @@ -14,7 +14,7 @@ func TestChanActivityScore(t *testing.T) { sent int64 size int64 life time.Duration - want chanActivityScore + want ChanActivityScore }{{ name: "1 atom sent through 1 DCR chan within 1 hour", sent: 1, diff --git a/server/server.go b/server/server.go index 5de1cd7..228d29f 100644 --- a/server/server.go +++ b/server/server.go @@ -448,134 +448,245 @@ func (s *Server) closeChannel(ctx context.Context, nodeID rpc.NodeID, channelPoi } } -// manageChannels manages opened channels where the local node is the initiator. -func (s *Server) manageChannels(ctx context.Context) error { +// ManagedChannel is information about channels managed by the LP. +type ManagedChannel struct { + // Lifetime of the channel since it has opened. + Lifetime time.Duration + ChanPoint string + Sid lnwire.ShortChannelID + RemotePubkey string + Capacity dcrutil.Amount + LocalBalance dcrutil.Amount + RemoteBalance dcrutil.Amount + TotalAtomsSent dcrutil.Amount + + // HasMinLifetime is true when this channel has the minimum lifetime + // required to be eligible for closing. + HasMinLifetime bool + + // Score of the channel (activity % within its lifetime). + Score ChanActivityScore + + // WillClose whether this channel is likely to be closed in the next + // round of management. + WillClose bool +} + +// UnmanagedChannel is information about channels NOT managed by the LP. +type UnmanagedChannel struct { + ChanPoint string + Sid lnwire.ShortChannelID + Capacity dcrutil.Amount + LocalBalance dcrutil.Amount + RemoteBalance dcrutil.Amount +} + +// ManagementInfo is the full set of information taken into account when +// managing channels. +type ManagementInfo struct { + // WalletBalance is the amount available (confirmed + unconfirmed) in + // the on-chain wallet. + WalletBalance dcrutil.Amount + + // LimboBalance is the amount that is pending to be reclaimed in + // already closed channels. + LimboBalance dcrutil.Amount + + // TotalBalance available to open channels (both confirmed and pending + // to be reclaimed). + TotalBalance dcrutil.Amount + + // MinWalletBalance minimum amount of funds that must be available to + // open new channels. When the total balance is lower than this amount, + // channels will start to be closed. + MinWalletBalance dcrutil.Amount + + // BlockHeight is the current block height of the wallet. + BlockHeight uint32 + + // Channels is the list of channels managed by the LP server. + Channels []ManagedChannel + + // UnmanagedChannels are channels that are NOT eligible for closing by + // the LP server. + UnmanagedChannels []UnmanagedChannel + + // NeedsManagement is true when the channels need to be managed to + // reclaim some funds. + NeedsManagement bool + + // Reclaimed is the (approximate) amount that will be reclaimed after + // closing some channels. This does not take into account various + // transaction fees that will need to be paid. + Reclaimed dcrutil.Amount +} + +// FetchManagedChannels returns the list of channels managed by the server. +func (s *Server) FetchManagedChannels(ctx context.Context) (res ManagementInfo, err error) { // Fetch the current wallet balance and see if we actually need to // manage the channels. bal, err := s.lc.WalletBalance(ctx, &lnrpc.WalletBalanceRequest{}) if err != nil { - return err + return res, err } - totalBalance := dcrutil.Amount(bal.TotalBalance) + res.WalletBalance = dcrutil.Amount(bal.TotalBalance) // Also add in the amount that is already pending from previously // closed channels. - var limboBalance dcrutil.Amount pendingChans, err := s.lc.PendingChannels(ctx, &lnrpc.PendingChannelsRequest{}) if err != nil { - return err + return res, err } for _, c := range pendingChans.PendingForceClosingChannels { - limboBalance += dcrutil.Amount(c.LimboBalance) - } - totalBalance += limboBalance - - if totalBalance >= s.cfg.MinWalletBalance { - s.log.Debugf("Wallet has more coins available (total "+ - "%.8f, limbo %.8f) than minimum (%.8f) needed "+ - "to manage channels", - dcrutil.Amount(bal.TotalBalance).ToCoin(), - limboBalance.ToCoin(), - s.cfg.MinWalletBalance.ToCoin()) - return nil + res.LimboBalance += dcrutil.Amount(c.LimboBalance) } + res.TotalBalance = res.LimboBalance + res.WalletBalance + res.MinWalletBalance = s.cfg.MinWalletBalance + res.NeedsManagement = res.TotalBalance < res.MinWalletBalance - // Time to check the channels. Fetch list of channels. + // Fetch list of channels. chanList, err := s.lc.ListChannels(ctx, &lnrpc.ListChannelsRequest{}) if err != nil { - return err + return res, err } - s.log.Debugf("Managing %d channels looking for %s", - len(chanList.Channels), s.cfg.MinWalletBalance-totalBalance) - // Fetch the current block time to figure out channel lifetime. info, err := s.lc.GetInfo(ctx, &lnrpc.GetInfoRequest{}) if err != nil { - return err + return res, err } - bh := info.BlockHeight - - var nonInit, beforeMinDur, closing int + res.BlockHeight = info.BlockHeight // Determine the "activity" score for each channel. Key is chanID. - chans := make([]*lnrpc.Channel, 0, len(chanList.Channels)) - activity := make(map[uint64]chanActivityScore, len(chanList.Channels)) + res.Channels = make([]ManagedChannel, 0, len(chanList.Channels)) + res.UnmanagedChannels = make([]UnmanagedChannel, 0, len(chanList.Channels)) for _, c := range chanList.Channels { cid := lnwire.NewShortChanIDFromInt(c.ChanId) + cp := c.ChannelPoint // Ignore channels where we are not the initiator. if !c.Initiator { - s.log.Tracef("Ignoring inbound channel %s", c.ChannelPoint) - nonInit += 1 + res.UnmanagedChannels = append(res.UnmanagedChannels, + UnmanagedChannel{ + ChanPoint: cp, + Sid: cid, + Capacity: dcrutil.Amount(c.Capacity), + LocalBalance: dcrutil.Amount(c.LocalBalance), + RemoteBalance: dcrutil.Amount(c.RemoteBalance), + }) continue } // Ignore channels that haven't been online for long enough. - lifetime := time.Duration(bh-cid.BlockHeight) * s.chainParams.TargetTimePerBlock - if lifetime < s.cfg.MinChanLifetime { - s.log.Tracef("Ignoring channel %s due to lifetime "+ - "%s < min duration %s", c.ChannelPoint, - lifetime, s.cfg.MinChanLifetime) - beforeMinDur += 1 - continue - } - - // Sanity check. - if c.Capacity == 0 { - s.log.Warnf("Channel without capacity: %s", c.ChannelPoint) - continue - } + lifetime := time.Duration(res.BlockHeight-cid.BlockHeight) * s.chainParams.TargetTimePerBlock + HasMinLifetime := lifetime >= s.cfg.MinChanLifetime // Calc activity. score := channelActivity(c.TotalAtomsSent, c.Capacity, lifetime) - activity[c.ChanId] = score - chans = append(chans, c) - - s.log.Tracef("Activity score for chan %s (sent: %d, "+ - "cap: %d, lt: %s): %f", c.ChannelPoint, - c.TotalAtomsSent, c.Capacity, - lifetime, score.toPercent()) + res.Channels = append(res.Channels, ManagedChannel{ + ChanPoint: cp, + Sid: cid, + RemotePubkey: c.RemotePubkey, + Lifetime: lifetime, + HasMinLifetime: HasMinLifetime, + Capacity: dcrutil.Amount(c.Capacity), + LocalBalance: dcrutil.Amount(c.LocalBalance), + RemoteBalance: dcrutil.Amount(c.RemoteBalance), + TotalAtomsSent: dcrutil.Amount(c.TotalAtomsSent), + Score: score, + }) } // Sort by activity score. - sort.Slice(chans, func(i, j int) bool { - scoreI := activity[chans[i].ChanId] - scoreJ := activity[chans[j].ChanId] + sort.Slice(res.Channels, func(i, j int) bool { + scoreI := res.Channels[i].Score + scoreJ := res.Channels[j].Score return scoreI < scoreJ }) + // If there's no need to manage the channels, we're done. + if !res.NeedsManagement { + return res, nil + } + // Close low activity channels until we reach the minimum amount of // atoms in the wallet. - var reclaimed dcrutil.Amount - - for _, c := range chans { + for i := range res.Channels { // Time to close channel! - cid := lnwire.NewShortChanIDFromInt(c.ChanId) - lifetime := time.Duration(bh-cid.BlockHeight) * s.chainParams.TargetTimePerBlock - s.log.Infof("Closing channel %s with %s due to "+ - "low activity (sent %.8f, lifetime %s, capacity %.8f)", - c.ChannelPoint, c.RemotePubkey, - dcrutil.Amount(c.TotalAtomsSent).ToCoin(), - lifetime, dcrutil.Amount(c.Capacity).ToCoin()) - closing += 1 - var nodeID rpc.NodeID - nodeID.FromString(c.RemotePubkey) - go s.closeChannel(ctx, nodeID, c.ChannelPoint) + c := &res.Channels[i] + + // Ignore channels that are not old enough yet to be closed. + if !c.HasMinLifetime { + continue + } + + // Close this channel. + c.WillClose = true // c.LocalBalance is not _exactly_ correct because there will // be fees to close the channel and reclaim the funds back, but // those are negligible compared to the channel sizes. - reclaimed += dcrutil.Amount(c.LocalBalance) - if totalBalance+reclaimed >= s.cfg.MinWalletBalance { + res.Reclaimed += c.LocalBalance + if res.TotalBalance+res.Reclaimed >= s.cfg.MinWalletBalance { break } } - remained := len(chans) - closing + return res, nil +} + +// manageChannels manages opened channels where the local node is the initiator. +func (s *Server) manageChannels(ctx context.Context) error { + // Fetch channel management info. + info, err := s.FetchManagedChannels(ctx) + if err != nil { + return err + } + + if !info.NeedsManagement { + s.log.Debugf("Wallet has more coins available (total "+ + "%.8f, limbo %.8f) than minimum (%.8f) needed "+ + "to manage channels", + info.WalletBalance.ToCoin(), + info.LimboBalance.ToCoin(), + info.MinWalletBalance.ToCoin()) + return nil + } + + s.log.Debugf("Managing %d channels (%d unmanaged) looking for %s", + len(info.Channels), len(info.UnmanagedChannels), + info.MinWalletBalance-info.TotalBalance) + + for _, c := range info.UnmanagedChannels { + s.log.Tracef("Ignoring inbound channel %s", c.ChanPoint) + } + + var closing, beforeMinDur int + for _, c := range info.Channels { + if !c.HasMinLifetime { + beforeMinDur++ + } + + // Time to close channel! + if !c.WillClose { + continue + } + + s.log.Infof("Closing channel %s with %s due to "+ + "low activity (sent %.8f, lifetime %s, capacity %.8f)", + c.ChanPoint, c.RemotePubkey, + c.TotalAtomsSent.ToCoin(), + c.Lifetime, c.Capacity.ToCoin()) + closing++ + var nodeID rpc.NodeID + nodeID.FromString(c.RemotePubkey) + go s.closeChannel(ctx, nodeID, c.ChanPoint) + } + + remained := len(info.Channels) - closing s.log.Infof("Managed channels. Non-initiator: %d, before min duration: %d "+ - "closing: %d, remained: %d, reclaimed: %s", nonInit, beforeMinDur, - closing, remained, reclaimed) + "closing: %d, remained: %d, reclaimed: %s", len(info.UnmanagedChannels), + beforeMinDur, closing, remained, info.Reclaimed) return nil } From f69f650d2fd5f9075e8c24eca148f13ece16f09c Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 22 Mar 2024 09:39:19 -0300 Subject: [PATCH 2/4] main: Add /info endpoint This returns information about the managed channels. --- main.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/main.go b/main.go index 1c361ec..3083eef 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "errors" "fmt" "net" @@ -20,9 +21,27 @@ func indexHandler(w http.ResponseWriter, req *http.Request) { fmt.Fprintf(w, "%s\n%s\n", appName, version.String()) } +func newInfoHandler(server *server.Server) func(w http.ResponseWriter, req *http.Request) { + return func(w http.ResponseWriter, req *http.Request) { + info, err := server.FetchManagedChannels(req.Context()) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "error fetching managed channels: %v\n", err) + log.Errorf("Unable to fetch managed channels: %v", err) + return + } + + err = json.NewEncoder(w).Encode(info) + if err != nil { + log.Errorf("Unable to encode info: %v", err) + } + } +} + func handler(s *server.Server) http.Handler { router := mux.NewRouter().StrictSlash(true) router.Methods("GET").Path("/").Name("index").HandlerFunc(indexHandler) + router.Methods("GET").Path("/info").Name("info").HandlerFunc(newInfoHandler(s)) server.NewV1Handler(s, router) logRouterConfig(router) return router From 7fa94cd0d553fc182b637a934713b7da4823713a Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 22 Mar 2024 09:59:43 -0300 Subject: [PATCH 3/4] main: Protect /info from external access Only accesses from localhost (considering reverse-proxied requests) are allowed. --- main.go | 6 ++++++ util.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/main.go b/main.go index 3083eef..a2c5f45 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,12 @@ func indexHandler(w http.ResponseWriter, req *http.Request) { func newInfoHandler(server *server.Server) func(w http.ResponseWriter, req *http.Request) { return func(w http.ResponseWriter, req *http.Request) { + if !isReqFromLocalhost(req) { + w.WriteHeader(http.StatusForbidden) + log.Warnf("Forbidden request for info from %s", requestAddr(req)) + return + } + info, err := server.FetchManagedChannels(req.Context()) if err != nil { w.WriteHeader(http.StatusInternalServerError) diff --git a/util.go b/util.go index c83f696..d4993f0 100644 --- a/util.go +++ b/util.go @@ -1,11 +1,59 @@ package main import ( + "net" + "net/http" "strings" "github.com/gorilla/mux" ) +func requestAddr(req *http.Request) string { + res := req.RemoteAddr + for _, header := range []string{"X-Forwarded-For", "X-Real-IP"} { + if values, ok := req.Header[header]; ok { + for _, value := range values { + res += strings.TrimSpace(value) + } + } + } + + return res +} + +// isReqFromLocalhost checks if the request came from localhost. It checks the +// X-Forwarded-For and X-Real-IP headers first and then falls back to the +// RemoteAddr if necessary. +func isReqFromLocalhost(req *http.Request) bool { + // Function to check if an IP from a given string is a loopback address. + // It splits the host and port if necessary and checks the IP. + isLoopback := func(addr string) bool { + ip := net.ParseIP(addr) + return ip != nil && ip.IsLoopback() + } + + // Check X-Forwarded-For and X-Real-IP headers for the original client IP + for _, header := range []string{"X-Forwarded-For", "X-Real-IP"} { + if values, ok := req.Header[header]; ok { + for _, value := range values { + for _, ipStr := range strings.Split(value, ",") { + ipStr = strings.TrimSpace(ipStr) + if isLoopback(ipStr) { + return true + } + } + } + } + } + + // If the headers are not present, fall back to the RemoteAddr. + host, _, err := net.SplitHostPort(req.RemoteAddr) + if err != nil { + host = req.RemoteAddr + } + return isLoopback(host) +} + func logRouterConfig(r *mux.Router) { err := r.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { pathTemplate, err := route.GetPathTemplate() From 9cf4061b252375a62e8aa32816ea88acbe048630 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 22 Mar 2024 11:31:50 -0300 Subject: [PATCH 4/4] server: Fix reconnection to dcrlnd This fixes the server's Run method to properly attempt reconnections to the dcrlnd node in case of errors. Previously, two things were wrong in this method: - The context used in some calls was ctx instead of gctx, which caused some of the subsystems to not exit correctly on failures and thus mask the underlying connection error. - There was no re-attempt at failed operations, when the reason for failure was a connection error (as opposed to a graceful termination of the server). This commit fixes both issues by using the correct context everywhere and ensuring all subsystems re-attempt their functions until the context is canceled, with an appropriate delay. --- server/server.go | 104 ++++++++++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 41 deletions(-) diff --git a/server/server.go b/server/server.go index 228d29f..e6f07c3 100644 --- a/server/server.go +++ b/server/server.go @@ -354,54 +354,74 @@ func (s *Server) openChannel(ctx context.Context, winv waitingInvoice) { // listenToInvoices reacts to invoice events. func (s *Server) listenToInvoices(ctx context.Context) error { - stream, err := s.lc.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{}) - if err != nil { - return err + + delay := func() { + select { + case <-time.After(time.Second): + case <-ctx.Done(): + } } - for { - inv, err := stream.Recv() +nextConn: + for ctx.Err() == nil { + stream, err := s.lc.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{}) if err != nil { - return err + s.log.Errorf("Unable to subscribe to invoices: %v", err) + delay() + continue nextConn } - switch { - case inv.State == lnrpc.Invoice_CANCELED: - fpath := filepath.Join(s.root, invoicesDir, - hex.EncodeToString(inv.RHash)) - if err := s.removeFile(fpath); err != nil { - return err - } - - case inv.State == lnrpc.Invoice_SETTLED: - fpath := filepath.Join(s.root, invoicesDir, - hex.EncodeToString(inv.RHash)) - var winv waitingInvoice - err := s.readJsonFile(fpath, &winv) - if errors.Is(err, errNotExists) { - // Payment for something that isn't channel - // creation. - continue - } + for { + inv, err := stream.Recv() if err != nil { - return err + s.log.Errorf("Unable to receive next invoice update: %v", err) + delay() + continue nextConn } - wantAtoms := int64(s.amountForNewChan(winv.ChannelSize)) - if inv.AmtPaidAtoms < wantAtoms { - s.log.Warnf("Received payment for invoice %x "+ - "lower than required (%d < %d)", - inv.AmtPaidAtoms < wantAtoms) - continue - } + switch { + case inv.State == lnrpc.Invoice_CANCELED: + fpath := filepath.Join(s.root, invoicesDir, + hex.EncodeToString(inv.RHash)) + if err := s.removeFile(fpath); err != nil { + return err + } + + case inv.State == lnrpc.Invoice_SETTLED: + fpath := filepath.Join(s.root, invoicesDir, + hex.EncodeToString(inv.RHash)) + var winv waitingInvoice + err := s.readJsonFile(fpath, &winv) + if errors.Is(err, errNotExists) { + // Payment for something that isn't channel + // creation. + continue + } + if err != nil { + // Fatal failure. + return err + } - // Create channel. - if err := s.removeFile(fpath); err != nil { - return err + wantAtoms := int64(s.amountForNewChan(winv.ChannelSize)) + if inv.AmtPaidAtoms < wantAtoms { + s.log.Warnf("Received payment for invoice %x "+ + "lower than required (%d < %d)", + inv.AmtPaidAtoms < wantAtoms) + continue + } + + // Create channel. + if err := s.removeFile(fpath); err != nil { + // Fatal failure. + return err + } + go s.openChannel(ctx, winv) } - go s.openChannel(ctx, winv) + } } + + return ctx.Err() } // closeChannel closes the given channel. @@ -527,7 +547,7 @@ func (s *Server) FetchManagedChannels(ctx context.Context) (res ManagementInfo, // manage the channels. bal, err := s.lc.WalletBalance(ctx, &lnrpc.WalletBalanceRequest{}) if err != nil { - return res, err + return res, fmt.Errorf("unable to fetch wallet balance: %v", err) } res.WalletBalance = dcrutil.Amount(bal.TotalBalance) @@ -697,7 +717,8 @@ func (s *Server) runManageChannels(ctx context.Context) error { case <-time.After(s.cfg.CloseCheckInterval): err := s.manageChannels(ctx) if err != nil { - return err + s.log.Errorf("Unable to manage channels in "+ + "this inverval: %v", err) } case <-ctx.Done(): return ctx.Err() @@ -717,8 +738,8 @@ func (s *Server) Run(ctx context.Context) error { g.Go(func() error { for { select { - case <-ctx.Done(): - return ctx.Err() + case <-gctx.Done(): + return gctx.Err() case <-time.After(time.Minute): } @@ -740,12 +761,13 @@ func (s *Server) Run(ctx context.Context) error { }) // Close low activity channels. - g.Go(func() error { return s.runManageChannels(ctx) }) + g.Go(func() error { return s.runManageChannels(gctx) }) // Shutdown conn once an error occurrs. This unblocks any outstanding // calls. g.Go(func() error { <-gctx.Done() + s.log.Infof("Closing connection to dcrlnd") if err := s.conn.Close(); err != nil { s.log.Warnf("Error while closing conn: %v", err) }