diff --git a/waku/v2/api/common/pinger.go b/waku/v2/api/common/pinger.go new file mode 100644 index 000000000..ba8c26a21 --- /dev/null +++ b/waku/v2/api/common/pinger.go @@ -0,0 +1,37 @@ +package common + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" +) + +type Pinger interface { + PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) +} + +type defaultPingImpl struct { + host host.Host +} + +func NewDefaultPinger(host host.Host) Pinger { + return &defaultPingImpl{ + host: host, + } +} + +func (d *defaultPingImpl) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + pingResultCh := ping.Ping(ctx, d.host, peerID) + select { + case <-ctx.Done(): + return 0, ctx.Err() + case r := <-pingResultCh: + if r.Error != nil { + return 0, r.Error + } + return r.RTT, nil + } +} diff --git a/waku/v2/api/history/cycle.go b/waku/v2/api/history/cycle.go index 313ee0a45..b6571dfc1 100644 --- a/waku/v2/api/history/cycle.go +++ b/waku/v2/api/history/cycle.go @@ -14,9 +14,8 @@ import ( "sync" "time" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" ) @@ -55,9 +54,8 @@ type StorenodeCycle struct { logger *zap.Logger - host host.Host - storenodeConfigProvider StorenodeConfigProvider + pinger common.Pinger StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}] StorenodeChangedEmitter *Emitter[peer.ID] @@ -71,7 +69,7 @@ type StorenodeCycle struct { peers map[peer.ID]peerStatus } -func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle { +func NewStorenodeCycle(logger *zap.Logger, pinger common.Pinger) *StorenodeCycle { return &StorenodeCycle{ StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](), StorenodeChangedEmitter: NewEmitter[peer.ID](), @@ -81,9 +79,8 @@ func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle { } } -func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) { +func (m *StorenodeCycle) Start(ctx context.Context) { m.logger.Debug("starting storenode cycle") - m.host = h m.failedRequests = make(map[peer.ID]uint) m.peers = make(map[peer.ID]peerStatus) @@ -194,7 +191,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, ctx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() - rtt, err := m.pingPeer(ctx, peerID) + rtt, err := m.pinger.PingPeer(ctx, peerID) if err == nil { // pinging storenodes might fail, but we don't care availableStorenodesMutex.Lock() availableStorenodes[peerID] = rtt @@ -233,19 +230,6 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, return result } -func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { - pingResultCh := ping.Ping(ctx, m.host, peerID) - select { - case <-ctx.Done(): - return 0, ctx.Err() - case r := <-pingResultCh: - if r.Error != nil { - return 0, r.Error - } - return r.RTT, nil - } -} - func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { // we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581 if overrideDNS {