diff --git a/CHANGELOG.md b/CHANGELOG.md index f655860..736a222 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ The following emojis are used to highlight certain changes: ### Added +- Added endpoints to show and purge connected peers [#194](https://github.com/ipfs/rainbow/pull/194) + ### Changed ### Removed diff --git a/README.md b/README.md index 36e9398..dc2d274 100644 --- a/README.md +++ b/README.md @@ -142,6 +142,21 @@ possible to dynamically modify the logging at runtime. - `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/level?subsystem=&level=` will set the logging level for a subsystem - `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/ls` will return a comma separated list of available logging subsystems +## Purging Peer Connections + +Connections to a specific peer, or to all peers, can be closed and the peer information removed from the peer store. This can be useful to help determine if the presence/absence of a connection to a peer is affecting behavior. Be aware that purging a connection is inherently racey as it is possible for the peer to reestablish a connection at any time following a purge. + +If `RAINBOW_DHT_SHARED_HOST=false` this endpoint will not show peers connected to DHT host, and only list ones used for Bitswap. + +- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=` purges connection and info for peer identifid by peer_id +- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=all` purges connections and info for all peers +- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/peers` returns a list of currently connected peers + +Example cURL commmand to show connected peers and purge peer connection: + + curl http://127.0.0.1:8091/mgr/peers + curl http://127.0.0.1:8091/mgr/purge?peer=QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC + ## Tracing See [docs/tracing.md](docs/tracing.md). diff --git a/handlers.go b/handlers.go index 1afcd6e..57a989b 100644 --- a/handlers.go +++ b/handlers.go @@ -13,6 +13,8 @@ import ( "github.com/ipfs/boxo/blockstore" leveldb "github.com/ipfs/go-ds-leveldb" "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" _ "embed" _ "net/http/pprof" @@ -72,7 +74,7 @@ func addLogHandlers(mux *http.ServeMux) { }) } -func GCHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) { +func gcHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() @@ -92,6 +94,119 @@ func GCHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) { } } +func purgePeerHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + q := r.URL.Query() + peerIDStr := q.Get("peer") + if peerIDStr == "" { + http.Error(w, "missing peer id", http.StatusBadRequest) + return + } + + if peerIDStr == "all" { + purgeCount, err := purgeAllConnections(p2pHost) + if err != nil { + goLog.Errorw("Error closing all libp2p connections", "err", err) + http.Error(w, "error closing connections", http.StatusInternalServerError) + return + } + goLog.Infow("Purged connections", "count", purgeCount) + + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + fmt.Fprintln(w, "Peer connections purged:", purgeCount) + return + } + + peerID, err := peer.Decode(peerIDStr) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = purgeConnection(p2pHost, peerID) + if err != nil { + goLog.Errorw("Error closing libp2p connection", "err", err, "peer", peerID) + http.Error(w, "error closing connection", http.StatusInternalServerError) + return + } + goLog.Infow("Purged connection", "peer", peerID) + + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + fmt.Fprintln(w, "Purged connection to peer", peerID) + } +} + +func purgeConnection(p2pHost host.Host, peerID peer.ID) error { + peerStore := p2pHost.Peerstore() + if peerStore != nil { + peerStore.RemovePeer(peerID) + peerStore.ClearAddrs(peerID) + } + return p2pHost.Network().ClosePeer(peerID) +} + +func purgeAllConnections(p2pHost host.Host) (int, error) { + net := p2pHost.Network() + peers := net.Peers() + + peerStore := p2pHost.Peerstore() + if peerStore != nil { + for _, peerID := range peers { + peerStore.RemovePeer(peerID) + peerStore.ClearAddrs(peerID) + } + } + + var errCount, purgeCount int + for _, peerID := range peers { + err := net.ClosePeer(peerID) + if err != nil { + goLog.Errorw("Closing libp2p connection", "err", err, "peer", peerID) + errCount++ + } else { + purgeCount++ + } + } + + if errCount != 0 { + return 0, fmt.Errorf("error closing connections to %d peers", errCount) + } + + return purgeCount, nil +} + +func showPeersHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + peers := p2pHost.Network().Peers() + body := struct { + Count int64 + Peers []string + }{ + Count: int64(len(peers)), + } + + if len(peers) != 0 { + peerStrs := make([]string, len(peers)) + for i, peerID := range peers { + peerStrs[i] = peerID.String() + } + body.Peers = peerStrs + } + + enc := json.NewEncoder(w) + if err := enc.Encode(body); err != nil { + goLog.Errorw("cannot write response", "err", err) + http.Error(w, "", http.StatusInternalServerError) + } + } +} + func withConnect(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // ServeMux does not support requests with CONNECT method, diff --git a/main.go b/main.go index be37c14..d373dfc 100644 --- a/main.go +++ b/main.go @@ -589,7 +589,9 @@ share the same seed as long as the indexes are different. otel.SetTextMapPropagator(autoprop.NewTextMapPropagator()) apiMux := makeMetricsAndDebuggingHandler() - apiMux.HandleFunc("/mgr/gc", GCHandler(gnd)) + apiMux.HandleFunc("/mgr/gc", gcHandler(gnd)) + apiMux.HandleFunc("/mgr/purge", purgePeerHandler(gnd.host)) + apiMux.HandleFunc("/mgr/peers", showPeersHandler(gnd.host)) addLogHandlers(apiMux) apiSrv := &http.Server{