From 7a6247cb7081e8b9b1d48c24040aae63144457aa Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 17 Sep 2024 11:34:59 +0530 Subject: [PATCH] feat(lightpush): introduce ReliabilityMonitor and allow `send` retries (#2130) * chore: restructure reliabiltiy monitors * feat: setup sender monitor * chore: update tests * chore: minor fixes * chore: comment for doc --- packages/sdk/src/index.ts | 4 +- .../protocols/filter/subscription_manager.ts | 7 +- .../{light_push.ts => lightpush/index.ts} | 31 +++++--- .../protocols/{store.ts => store/index.ts} | 2 +- packages/sdk/src/reliability_monitor/index.ts | 70 +++++++++++++++++++ .../receiver.ts} | 47 ------------- .../sdk/src/reliability_monitor/sender.ts | 57 +++++++++++++++ packages/sdk/src/waku.ts | 8 +-- .../tests/light-push/peer_management.spec.ts | 17 +++-- 9 files changed, 172 insertions(+), 71 deletions(-) rename packages/sdk/src/protocols/{light_push.ts => lightpush/index.ts} (73%) rename packages/sdk/src/protocols/{store.ts => store/index.ts} (99%) create mode 100644 packages/sdk/src/reliability_monitor/index.ts rename packages/sdk/src/{protocols/filter/reliability_monitor.ts => reliability_monitor/receiver.ts} (82%) create mode 100644 packages/sdk/src/reliability_monitor/sender.ts diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 3b0aa54dda..2cbcfce937 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -14,9 +14,9 @@ export { defaultLibp2p, createLibp2pAndUpdateOptions } from "./create/index.js"; -export { wakuLightPush } from "./protocols/light_push.js"; +export { wakuLightPush } from "./protocols/lightpush/index.js"; export { wakuFilter } from "./protocols/filter/index.js"; -export { wakuStore } from "./protocols/store.js"; +export { wakuStore } from "./protocols/store/index.js"; export * as waku from "@waku/core"; export * as utils from "@waku/utils"; diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 6d3d50f13b..51a9e16e8d 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -20,11 +20,10 @@ import { import { WakuMessage } from "@waku/proto"; import { groupByContentTopic, Logger } from "@waku/utils"; +import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; +import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js"; + import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; -import { - ReceiverReliabilityMonitor, - ReliabilityMonitorManager -} from "./reliability_monitor.js"; const log = new Logger("sdk:filter:subscription_manager"); diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/lightpush/index.ts similarity index 73% rename from packages/sdk/src/protocols/light_push.ts rename to packages/sdk/src/protocols/lightpush/index.ts index 4b303118bd..f3be55d54a 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/lightpush/index.ts @@ -13,13 +13,17 @@ import { } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; -import { BaseProtocolSDK } from "./base_protocol.js"; +import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; +import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js"; +import { BaseProtocolSDK } from "../base_protocol.js"; const log = new Logger("sdk:light-push"); class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { public readonly protocol: LightPushCore; + private readonly reliabilityMonitor: SenderReliabilityMonitor; + public constructor( connectionManager: ConnectionManager, libp2p: Libp2p, @@ -33,6 +37,10 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { } ); + this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor( + this.renewPeer.bind(this) + ); + this.protocol = this.core as LightPushCore; } @@ -89,16 +97,23 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { successes.push(success); } if (failure) { + failures.push(failure); if (failure.peerId) { - try { - await this.renewPeer(failure.peerId); - log.info("Renewed peer", failure.peerId.toString()); - } catch (error) { - log.error("Failed to renew peer", error); + const peer = this.connectedPeers.find((connectedPeer) => + connectedPeer.id.equals(failure.peerId) + ); + if (peer) { + log.info(` + Failed to send message to peer ${failure.peerId}. + Retrying the message with the same peer in the background. + If this fails, the peer will be renewed. + `); + void this.reliabilityMonitor.attemptRetriesOrRenew( + failure.peerId, + () => this.protocol.send(encoder, message, peer) + ); } } - - failures.push(failure); } } else { log.error("Failed unexpectedly while sending:", result.reason); diff --git a/packages/sdk/src/protocols/store.ts b/packages/sdk/src/protocols/store/index.ts similarity index 99% rename from packages/sdk/src/protocols/store.ts rename to packages/sdk/src/protocols/store/index.ts index c851fbefe4..3aba99b87c 100644 --- a/packages/sdk/src/protocols/store.ts +++ b/packages/sdk/src/protocols/store/index.ts @@ -10,7 +10,7 @@ import { import { messageHash } from "@waku/message-hash"; import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils"; -import { BaseProtocolSDK } from "./base_protocol.js"; +import { BaseProtocolSDK } from "../base_protocol.js"; const DEFAULT_NUM_PEERS = 1; diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts new file mode 100644 index 0000000000..1e420dd921 --- /dev/null +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -0,0 +1,70 @@ +import type { Peer, PeerId } from "@libp2p/interface"; +import { + ContentTopic, + CoreProtocolResult, + PubsubTopic +} from "@waku/interfaces"; + +import { ReceiverReliabilityMonitor } from "./receiver.js"; +import { SenderReliabilityMonitor } from "./sender.js"; + +export class ReliabilityMonitorManager { + private static receiverMonitors: Map< + PubsubTopic, + ReceiverReliabilityMonitor + > = new Map(); + private static senderMonitor: SenderReliabilityMonitor | undefined; + + public static createReceiverMonitor( + pubsubTopic: PubsubTopic, + getPeers: () => Peer[], + renewPeer: (peerId: PeerId) => Promise, + getContentTopics: () => ContentTopic[], + protocolSubscribe: ( + pubsubTopic: PubsubTopic, + peer: Peer, + contentTopics: ContentTopic[] + ) => Promise + ): ReceiverReliabilityMonitor { + if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { + return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; + } + + const monitor = new ReceiverReliabilityMonitor( + pubsubTopic, + getPeers, + renewPeer, + getContentTopics, + protocolSubscribe + ); + ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); + return monitor; + } + + public static createSenderMonitor( + renewPeer: (peerId: PeerId) => Promise + ): SenderReliabilityMonitor { + if (!ReliabilityMonitorManager.senderMonitor) { + ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( + renewPeer + ); + } + return ReliabilityMonitorManager.senderMonitor; + } + + private constructor() {} + + public static stop(pubsubTopic: PubsubTopic): void { + this.receiverMonitors.delete(pubsubTopic); + this.senderMonitor = undefined; + } + + public static stopAll(): void { + for (const [pubsubTopic, monitor] of this.receiverMonitors) { + monitor.setMaxMissedMessagesThreshold(undefined); + monitor.setMaxPingFailures(undefined); + this.receiverMonitors.delete(pubsubTopic); + this.senderMonitor = undefined; + } + } +} diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/reliability_monitor/receiver.ts similarity index 82% rename from packages/sdk/src/protocols/filter/reliability_monitor.ts rename to packages/sdk/src/reliability_monitor/receiver.ts index 84d386f716..440e35829c 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -21,53 +21,6 @@ const log = new Logger("sdk:receiver:reliability_monitor"); const DEFAULT_MAX_PINGS = 3; -export class ReliabilityMonitorManager { - private static receiverMonitors: Map< - PubsubTopic, - ReceiverReliabilityMonitor - > = new Map(); - - public static createReceiverMonitor( - pubsubTopic: PubsubTopic, - getPeers: () => Peer[], - renewPeer: (peerId: PeerId) => Promise, - getContentTopics: () => ContentTopic[], - protocolSubscribe: ( - pubsubTopic: PubsubTopic, - peer: Peer, - contentTopics: ContentTopic[] - ) => Promise - ): ReceiverReliabilityMonitor { - if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { - return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; - } - - const monitor = new ReceiverReliabilityMonitor( - pubsubTopic, - getPeers, - renewPeer, - getContentTopics, - protocolSubscribe - ); - ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); - return monitor; - } - - private constructor() {} - - public static destroy(pubsubTopic: PubsubTopic): void { - this.receiverMonitors.delete(pubsubTopic); - } - - public static destroyAll(): void { - for (const [pubsubTopic, monitor] of this.receiverMonitors) { - monitor.setMaxMissedMessagesThreshold(undefined); - monitor.setMaxPingFailures(undefined); - this.receiverMonitors.delete(pubsubTopic); - } - } -} - export class ReceiverReliabilityMonitor { private receivedMessagesHashes: ReceivedMessageHashes; private missedMessagesByPeer: Map = new Map(); diff --git a/packages/sdk/src/reliability_monitor/sender.ts b/packages/sdk/src/reliability_monitor/sender.ts new file mode 100644 index 0000000000..0ffe9a1659 --- /dev/null +++ b/packages/sdk/src/reliability_monitor/sender.ts @@ -0,0 +1,57 @@ +import type { Peer, PeerId } from "@libp2p/interface"; +import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces"; +import { Logger } from "@waku/utils"; + +const log = new Logger("sdk:sender:reliability_monitor"); + +const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3; + +export class SenderReliabilityMonitor { + private attempts: Map = new Map(); + private readonly maxAttemptsBeforeRenewal = + DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; + + public constructor(private renewPeer: (peerId: PeerId) => Promise) {} + + public async attemptRetriesOrRenew( + peerId: PeerId, + protocolSend: () => Promise + ): Promise { + const peerIdStr = peerId.toString(); + const currentAttempts = this.attempts.get(peerIdStr) || 0; + this.attempts.set(peerIdStr, currentAttempts + 1); + + if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) { + try { + const result = await protocolSend(); + if (result.success) { + log.info(`Successfully sent message after retry to ${peerIdStr}`); + this.attempts.delete(peerIdStr); + } else { + log.error( + `Failed to send message after retry to ${peerIdStr}: ${result.failure}` + ); + await this.attemptRetriesOrRenew(peerId, protocolSend); + } + } catch (error) { + log.error( + `Failed to send message after retry to ${peerIdStr}: ${error}` + ); + await this.attemptRetriesOrRenew(peerId, protocolSend); + } + } else { + try { + const newPeer = await this.renewPeer(peerId); + log.info( + `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` + ); + + this.attempts.delete(peerIdStr); + this.attempts.set(newPeer.id.toString(), 0); + await protocolSend(); + } catch (error) { + log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); + } + } + } +} diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index 57c9c16d2a..a5314a99f8 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -17,9 +17,9 @@ import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { wakuFilter } from "./protocols/filter/index.js"; -import { ReliabilityMonitorManager } from "./protocols/filter/reliability_monitor.js"; -import { wakuLightPush } from "./protocols/light_push.js"; -import { wakuStore } from "./protocols/store.js"; +import { wakuLightPush } from "./protocols/lightpush/index.js"; +import { wakuStore } from "./protocols/store/index.js"; +import { ReliabilityMonitorManager } from "./reliability_monitor/index.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultRelayKeepAliveValueSecs = 5 * 60; @@ -196,7 +196,7 @@ export class WakuNode implements Waku { } public async stop(): Promise { - ReliabilityMonitorManager.destroyAll(); + ReliabilityMonitorManager.stopAll(); this.connectionManager.stop(); await this.libp2p.stop(); } diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index 1c7fa59d44..a275f4e970 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -1,5 +1,6 @@ import { LightNode } from "@waku/interfaces"; import { createEncoder, utf8ToBytes } from "@waku/sdk"; +import { delay } from "@waku/utils"; import { expect } from "chai"; import { describe } from "mocha"; @@ -78,18 +79,24 @@ describe("Waku Light Push: Peer Management: E2E", function () { expect(response2.failures).to.have.length(1); expect(response2.failures?.[0].peerId).to.equal(peerToDisconnect); - // send another lightpush request -- renewal should have triggerred and new peer should be used instead of the disconnected one + // send another lightpush request + // reattempts to send should be triggerred + // then renewal should happen + // so one failure should exist const response3 = await waku.lightPush.send(encoder, { payload: utf8ToBytes("Hello_World") }); + // wait for reattempts to finish as they are async and not awaited + await delay(500); + + // doing -1 because the peer that was disconnected is not in the successes expect(response3.successes.length).to.be.equal( - waku.lightPush.numPeersToUse + waku.lightPush.numPeersToUse - 1 ); + // and exists in failure instead + expect(response3.failures).to.have.length(1); expect(response3.successes).to.not.include(peerToDisconnect); - if (response3.failures) { - expect(response3.failures.length).to.equal(0); - } }); });