Skip to content

Commit

Permalink
fix stream manager
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko committed Sep 27, 2024
1 parent dfdea84 commit 00c3261
Showing 1 changed file with 98 additions and 61 deletions.
159 changes: 98 additions & 61 deletions packages/core/src/lib/stream_manager/stream_manager.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,46 @@
import type { PeerUpdate, Stream } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import type {
Connection,
Peer,
PeerId,
PeerUpdate,
Stream
} from "@libp2p/interface";
import type { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";

import { selectConnection } from "./utils.js";

const CONNECTION_TIMEOUT = 5_000;
const RETRY_BACKOFF_BASE = 1_000;
const MAX_RETRIES = 3;

export class StreamManager {
private readonly streamPool: Map<string, Promise<Stream | void>>;
private readonly log: Logger;

private ongoingCreation: Set<string> = new Set();
private streamPool: Map<string, Promise<void>> = new Map();

public constructor(
public multicodec: string,
public getConnections: Libp2p["getConnections"],
public addEventListener: Libp2p["addEventListener"]
) {
this.log = new Logger(`stream-manager:${multicodec}`);
this.streamPool = new Map();
this.addEventListener("peer:update", this.handlePeerUpdateStreamPool);
}

public async getStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();
const streamPromise = this.streamPool.get(peerIdStr);
const peerId = peer.id.toString();

if (!streamPromise) {
return this.createStream(peer);
}
const scheduledStream = this.streamPool.get(peerId);
this.streamPool.delete(peerId);
await scheduledStream;

this.streamPool.delete(peerIdStr);
this.prepareStream(peer);
const stream = this.getStreamForCodec(peer.id);

try {
const stream = await streamPromise;
if (stream && stream.status !== "closed") {
return stream;
}
} catch (error) {
this.log.warn(`Failed to get stream for ${peerIdStr} -- `, error);
this.log.warn("Attempting to create a new stream for the peer");
if (stream) {
this.log.info(
`Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
return stream;
}

return this.createStream(peer);
Expand All @@ -52,64 +51,102 @@ export class StreamManager {
const connection = selectConnection(connections);

if (!connection) {
throw new Error("Failed to get a connection to the peer");
throw new Error(
`Failed to get a connection to the peer peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
}

try {
return await connection.newStream(this.multicodec);
} catch (error) {
if (retries < MAX_RETRIES) {
const backoff = RETRY_BACKOFF_BASE * Math.pow(2, retries);
await new Promise((resolve) => setTimeout(resolve, backoff));
return this.createStream(peer, retries + 1);
let lastError: unknown;
let stream: Stream | undefined;

for (let i = 0; i < retries + 1; i++) {
try {
this.log.info(
`Attempting to create a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
stream = await connection.newStream(this.multicodec);
this.log.info(
`Created stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
} catch (error) {
lastError = error;
}
}

if (!stream) {
throw new Error(
`Failed to create a new stream for ${peer.id.toString()} -- ` + error
`Failed to create a new stream for ${peer.id.toString()} -- ` +
lastError
);
}

return stream;
}

private prepareStream(peer: Peer): void {
const timeoutPromise = new Promise<void>((resolve) =>
setTimeout(resolve, CONNECTION_TIMEOUT)
);
private async createStreamWithLock(peer: Peer): Promise<void> {
const peerId = peer.id.toString();

const streamPromise = Promise.race([
this.createStream(peer),
timeoutPromise.then(() => {
throw new Error("Connection timeout");
})
]).catch((error) => {
this.log.error(
`Failed to prepare a new stream for ${peer.id.toString()} -- `,
error
if (this.ongoingCreation.has(peerId)) {
this.log.info(
`Skipping creation of a stream due to lock for peerId=${peerId} multicodec=${this.multicodec}`
);
});
return;
}

this.streamPool.set(peer.id.toString(), streamPromise);
try {
this.ongoingCreation.add(peerId);
await this.createStream(peer);
} finally {
this.ongoingCreation.delete(peerId);
}
}

private handlePeerUpdateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
const { peer } = evt.detail;

if (peer.protocols.includes(this.multicodec)) {
const isConnected = this.isConnectedTo(peer.id);
if (!peer.protocols.includes(this.multicodec)) {
return;
}

if (isConnected) {
this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`);
this.prepareStream(peer);
} else {
const peerIdStr = peer.id.toString();
this.streamPool.delete(peerIdStr);
this.log.info(
`Removed pending stream for disconnected peer ${peerIdStr}`
);
}
const stream = this.getStreamForCodec(peer.id);

if (stream) {
return;
}

this.scheduleNewStream(peer);
};

private isConnectedTo(peerId: PeerId): boolean {
const connections = this.getConnections(peerId);
return connections.some((connection) => connection.status === "open");
private scheduleNewStream(peer: Peer): void {
this.log.info(
`Scheduling creation of a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);

const timeoutPromise = new Promise<undefined>((resolve) =>
setTimeout(() => resolve(undefined), CONNECTION_TIMEOUT)
);

const streamPromise = Promise.race([
this.createStreamWithLock(peer),
timeoutPromise
]);

this.streamPool.set(peer.id.toString(), streamPromise);
}

private getStreamForCodec(peerId: PeerId): Stream | undefined {
const connection: Connection | undefined = this.getConnections(peerId).find(
(c) => c.status === "open"
);

if (!connection) {
return;
}

const stream = connection.streams.find(
(s) => s.protocol === this.multicodec
);

return stream;
}
}

0 comments on commit 00c3261

Please sign in to comment.