From fc95a72192211e488d70c6117f9bbe60e2f2d457 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 2 Nov 2023 11:16:36 +0000 Subject: [PATCH] chore: fix pubsub interop tests When opening outbound streams, only make sure no outbound pubsub streams exist on the connection, not just any pubsub streams. Fixes a race condition where the remote peer can open streams before us which then prevents us opening streams. --- packages/libp2p/test/interop.ts | 5 ++++- packages/pubsub/src/index.ts | 13 ++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/packages/libp2p/test/interop.ts b/packages/libp2p/test/interop.ts index 871b47b3c9..5e34db58e6 100644 --- a/packages/libp2p/test/interop.ts +++ b/packages/libp2p/test/interop.ts @@ -129,7 +129,10 @@ async function createJsPeer (options: SpawnOptions): Promise { }, transports: [tcp(), circuitRelayTransport()], streamMuxers: [], - connectionEncryption: [noise()] + connectionEncryption: [noise()], + connectionManager: { + minConnections: 0 + } } const services: ServiceFactoryMap = { diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index ba467f6e88..dc78665470 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -214,7 +214,8 @@ export abstract class PubSubBaseProtocol = Pu log('connected %p', peerId) // if this connection is already in use for pubsub, ignore it - if (conn.streams.find(stream => stream.protocol != null && this.multicodecs.includes(stream.protocol)) != null) { + if (conn.streams.find(stream => stream.direction === 'outbound' && stream.protocol != null && this.multicodecs.includes(stream.protocol)) != null) { + log('outbound pubsub streams already present on connection from %p', peerId) return } @@ -533,8 +534,14 @@ export abstract class PubSubBaseProtocol = Pu sendRpc (peer: PeerId, rpc: PubSubRPC): void { const peerStreams = this.peers.get(peer) - if (peerStreams == null || !peerStreams.isWritable) { - log.error('Cannot send RPC to %p as there is no open stream to it available', peer) + if (peerStreams == null) { + log.error('Cannot send RPC to %p as there are no streams to it available', peer) + + return + } + + if (!peerStreams.isWritable) { + log.error('Cannot send RPC to %p as there is no outbound stream to it available', peer) return }