Skip to content

Commit

Permalink
use EventBus replace of notify
Browse files Browse the repository at this point in the history
  • Loading branch information
Welkin committed Sep 19, 2023
1 parent 6027e97 commit dc690e4
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
15 changes: 15 additions & 0 deletions op-node/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/libp2p/go-libp2p/core/event"
"net"
"strconv"
"time"
Expand Down Expand Up @@ -92,6 +93,20 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
// Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() {
n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics)
subscribe, err := n.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
if err != nil {
return fmt.Errorf("failed to subscribe peer connectedness changed event: %w", err)
}
go func() {
for evt := range subscribe.Out() {
evto := evt.(event.EvtPeerConnectednessChanged)
if evto.Connectedness == network.Connected {
n.syncCl.AddPeer(evto.Peer)
} else if evto.Connectedness == network.NotConnected {
n.syncCl.RemovePeer(evto.Peer)
}
}
}()
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer())
Expand Down
36 changes: 23 additions & 13 deletions op-node/p2p/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"github.com/libp2p/go-libp2p/core/event"
"math/big"
"sync"
"testing"
Expand Down Expand Up @@ -344,20 +345,21 @@ func TestEdgeCaseWhenOnePeerHasMultiConn(t *testing.T) {
require.True(t, !ok, "peerB should be remove from syncClient")

hostA.Network().StopNotify(oldNotify)
//use new logic for fixing bug
hostA.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
log.Info("connect peer", "peer", conn.RemotePeer(), "connId", conn.ID(), "addr", conn.RemoteMultiaddr())
syncCl.AddPeer(conn.RemotePeer())
},
DisconnectedF: func(nw network.Network, conn network.Conn) {
log.Info("disconnect peer", "peer", conn.RemotePeer(), "connId", conn.ID(), "addr", conn.RemoteMultiaddr())
// only when no connection is available, we can remove the peer
if nw.Connectedness(conn.RemotePeer()) == network.NotConnected {
syncCl.RemovePeer(conn.RemotePeer())
subscribe, err := hostA.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err, "subscribe peerConnectednessChanged fail")
go func() {
for evt := range subscribe.Out() {
evto := evt.(event.EvtPeerConnectednessChanged)
if evto.Connectedness == network.Connected {
log.Info("event: connect peer", "peer", evto.Peer)
syncCl.AddPeer(evto.Peer)
} else if evto.Connectedness == network.NotConnected {
log.Info("event: disconnect peer", "peer", evto.Peer)
syncCl.RemovePeer(evto.Peer)
}
},
})
}
}()

syncCl.AddPeer(hostB.ID())
_, peerBExist := syncCl.peers[hostB.ID()]
require.True(t, peerBExist, "peerB should exist in syncClient")
Expand All @@ -371,4 +373,12 @@ func TestEdgeCaseWhenOnePeerHasMultiConn(t *testing.T) {
_, peerBExist2 := syncCl.peers[hostB.ID()]
require.True(t, peerBExist2, "peerB should still exist in syncClient")

err = hostA.Network().ClosePeer(hostB.ID())
require.NoError(t, err, "close peer fail")

//wait for async removing process done
time.Sleep(100 * time.Millisecond)
_, peerBExist3 := syncCl.peers[hostB.ID()]
require.True(t, !peerBExist3, "peerB should not exist in syncClient")

}

0 comments on commit dc690e4

Please sign in to comment.