diff --git a/packages/core/src/lib/stream_manager/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts index 2896ff583f..37e1adca9c 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -1,47 +1,46 @@ -import type { PeerUpdate, Stream } from "@libp2p/interface"; -import type { Peer, PeerId } from "@libp2p/interface"; -import { Libp2p } from "@waku/interfaces"; +import type { + Connection, + Peer, + PeerId, + PeerUpdate, + Stream +} from "@libp2p/interface"; +import type { Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { selectConnection } from "./utils.js"; const CONNECTION_TIMEOUT = 5_000; -const RETRY_BACKOFF_BASE = 1_000; -const MAX_RETRIES = 3; export class StreamManager { - private readonly streamPool: Map>; private readonly log: Logger; + private ongoingCreation: Set = new Set(); + private streamPool: Map> = new Map(); + public constructor( public multicodec: string, public getConnections: Libp2p["getConnections"], public addEventListener: Libp2p["addEventListener"] ) { this.log = new Logger(`stream-manager:${multicodec}`); - this.streamPool = new Map(); this.addEventListener("peer:update", this.handlePeerUpdateStreamPool); } public async getStream(peer: Peer): Promise { - const peerIdStr = peer.id.toString(); - const streamPromise = this.streamPool.get(peerIdStr); + const peerId = peer.id.toString(); - if (!streamPromise) { - return this.createStream(peer); - } + const scheduledStream = this.streamPool.get(peerId); + this.streamPool.delete(peerId); + await scheduledStream; - this.streamPool.delete(peerIdStr); - this.prepareStream(peer); + const stream = this.getStreamForCodec(peer.id); - try { - const stream = await streamPromise; - if (stream && stream.status !== "closed") { - return stream; - } - } catch (error) { - this.log.warn(`Failed to get stream for ${peerIdStr} -- `, error); - this.log.warn("Attempting to create a new stream for the peer"); + if (stream) { + this.log.info( + `Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); + return stream; } return this.createStream(peer); @@ -52,64 +51,102 @@ export class StreamManager { const connection = selectConnection(connections); if (!connection) { - throw new Error("Failed to get a connection to the peer"); + throw new Error( + `Failed to get a connection to the peer peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); } - try { - return await connection.newStream(this.multicodec); - } catch (error) { - if (retries < MAX_RETRIES) { - const backoff = RETRY_BACKOFF_BASE * Math.pow(2, retries); - await new Promise((resolve) => setTimeout(resolve, backoff)); - return this.createStream(peer, retries + 1); + let lastError: unknown; + let stream: Stream | undefined; + + for (let i = 0; i < retries + 1; i++) { + try { + this.log.info( + `Attempting to create a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); + stream = await connection.newStream(this.multicodec); + this.log.info( + `Created stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); + } catch (error) { + lastError = error; } + } + + if (!stream) { throw new Error( - `Failed to create a new stream for ${peer.id.toString()} -- ` + error + `Failed to create a new stream for ${peer.id.toString()} -- ` + + lastError ); } + + return stream; } - private prepareStream(peer: Peer): void { - const timeoutPromise = new Promise((resolve) => - setTimeout(resolve, CONNECTION_TIMEOUT) - ); + private async createStreamWithLock(peer: Peer): Promise { + const peerId = peer.id.toString(); - const streamPromise = Promise.race([ - this.createStream(peer), - timeoutPromise.then(() => { - throw new Error("Connection timeout"); - }) - ]).catch((error) => { - this.log.error( - `Failed to prepare a new stream for ${peer.id.toString()} -- `, - error + if (this.ongoingCreation.has(peerId)) { + this.log.info( + `Skipping creation of a stream due to lock for peerId=${peerId} multicodec=${this.multicodec}` ); - }); + return; + } - this.streamPool.set(peer.id.toString(), streamPromise); + try { + this.ongoingCreation.add(peerId); + await this.createStream(peer); + } finally { + this.ongoingCreation.delete(peerId); + } } private handlePeerUpdateStreamPool = (evt: CustomEvent): void => { const { peer } = evt.detail; - if (peer.protocols.includes(this.multicodec)) { - const isConnected = this.isConnectedTo(peer.id); + if (!peer.protocols.includes(this.multicodec)) { + return; + } - if (isConnected) { - this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`); - this.prepareStream(peer); - } else { - const peerIdStr = peer.id.toString(); - this.streamPool.delete(peerIdStr); - this.log.info( - `Removed pending stream for disconnected peer ${peerIdStr}` - ); - } + const stream = this.getStreamForCodec(peer.id); + + if (stream) { + return; } + + this.scheduleNewStream(peer); }; - private isConnectedTo(peerId: PeerId): boolean { - const connections = this.getConnections(peerId); - return connections.some((connection) => connection.status === "open"); + private scheduleNewStream(peer: Peer): void { + this.log.info( + `Scheduling creation of a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); + + const timeoutPromise = new Promise((resolve) => + setTimeout(() => resolve(undefined), CONNECTION_TIMEOUT) + ); + + const streamPromise = Promise.race([ + this.createStreamWithLock(peer), + timeoutPromise + ]); + + this.streamPool.set(peer.id.toString(), streamPromise); + } + + private getStreamForCodec(peerId: PeerId): Stream | undefined { + const connection: Connection | undefined = this.getConnections(peerId).find( + (c) => c.status === "open" + ); + + if (!connection) { + return; + } + + const stream = connection.streams.find( + (s) => s.protocol === this.multicodec + ); + + return stream; } }