Skip to content

Commit

Permalink
feat(lightpush): introduce ReliabilityMonitor and allow send retries (
Browse files Browse the repository at this point in the history
#2130)

* chore: restructure reliabiltiy monitors

* feat: setup sender monitor

* chore: update tests

* chore: minor fixes

* chore: comment for doc
  • Loading branch information
danisharora099 authored Sep 17, 2024
1 parent 7ad1d32 commit 7a6247c
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 71 deletions.
4 changes: 2 additions & 2 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ export {
defaultLibp2p,
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuLightPush } from "./protocols/lightpush/index.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store.js";
export { wakuStore } from "./protocols/store/index.js";

export * as waku from "@waku/core";
export * as utils from "@waku/utils";
Expand Down
7 changes: 3 additions & 4 deletions packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import {
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, Logger } from "@waku/utils";

import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js";

import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import {
ReceiverReliabilityMonitor,
ReliabilityMonitorManager
} from "./reliability_monitor.js";

const log = new Logger("sdk:filter:subscription_manager");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import {
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";

import { BaseProtocolSDK } from "./base_protocol.js";
import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js";
import { BaseProtocolSDK } from "../base_protocol.js";

const log = new Logger("sdk:light-push");

class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
public readonly protocol: LightPushCore;

private readonly reliabilityMonitor: SenderReliabilityMonitor;

public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
Expand All @@ -33,6 +37,10 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
}
);

this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor(
this.renewPeer.bind(this)
);

this.protocol = this.core as LightPushCore;
}

Expand Down Expand Up @@ -89,16 +97,23 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
successes.push(success);
}
if (failure) {
failures.push(failure);
if (failure.peerId) {
try {
await this.renewPeer(failure.peerId);
log.info("Renewed peer", failure.peerId.toString());
} catch (error) {
log.error("Failed to renew peer", error);
const peer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);
if (peer) {
log.info(`
Failed to send message to peer ${failure.peerId}.
Retrying the message with the same peer in the background.
If this fails, the peer will be renewed.
`);
void this.reliabilityMonitor.attemptRetriesOrRenew(
failure.peerId,
() => this.protocol.send(encoder, message, peer)
);
}
}

failures.push(failure);
}
} else {
log.error("Failed unexpectedly while sending:", result.reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import { messageHash } from "@waku/message-hash";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";

import { BaseProtocolSDK } from "./base_protocol.js";
import { BaseProtocolSDK } from "../base_protocol.js";

const DEFAULT_NUM_PEERS = 1;

Expand Down
70 changes: 70 additions & 0 deletions packages/sdk/src/reliability_monitor/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import type { Peer, PeerId } from "@libp2p/interface";
import {
ContentTopic,
CoreProtocolResult,
PubsubTopic
} from "@waku/interfaces";

import { ReceiverReliabilityMonitor } from "./receiver.js";
import { SenderReliabilityMonitor } from "./sender.js";

export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();
private static senderMonitor: SenderReliabilityMonitor | undefined;

public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>,
getContentTopics: () => ContentTopic[],
protocolSubscribe: (
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
}

const monitor = new ReceiverReliabilityMonitor(
pubsubTopic,
getPeers,
renewPeer,
getContentTopics,
protocolSubscribe
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
}

public static createSenderMonitor(
renewPeer: (peerId: PeerId) => Promise<Peer>
): SenderReliabilityMonitor {
if (!ReliabilityMonitorManager.senderMonitor) {
ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor(
renewPeer
);
}
return ReliabilityMonitorManager.senderMonitor;
}

private constructor() {}

public static stop(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}

public static stopAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,6 @@ const log = new Logger("sdk:receiver:reliability_monitor");

const DEFAULT_MAX_PINGS = 3;

export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();

public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>,
getContentTopics: () => ContentTopic[],
protocolSubscribe: (
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
}

const monitor = new ReceiverReliabilityMonitor(
pubsubTopic,
getPeers,
renewPeer,
getContentTopics,
protocolSubscribe
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
}

private constructor() {}

public static destroy(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
}

public static destroyAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
}
}
}

export class ReceiverReliabilityMonitor {
private receivedMessagesHashes: ReceivedMessageHashes;
private missedMessagesByPeer: Map<string, number> = new Map();
Expand Down
57 changes: 57 additions & 0 deletions packages/sdk/src/reliability_monitor/sender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces";
import { Logger } from "@waku/utils";

const log = new Logger("sdk:sender:reliability_monitor");

const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3;

export class SenderReliabilityMonitor {
private attempts: Map<PeerIdStr, number> = new Map();
private readonly maxAttemptsBeforeRenewal =
DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL;

public constructor(private renewPeer: (peerId: PeerId) => Promise<Peer>) {}

public async attemptRetriesOrRenew(
peerId: PeerId,
protocolSend: () => Promise<CoreProtocolResult>
): Promise<void> {
const peerIdStr = peerId.toString();
const currentAttempts = this.attempts.get(peerIdStr) || 0;
this.attempts.set(peerIdStr, currentAttempts + 1);

if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) {
try {
const result = await protocolSend();
if (result.success) {
log.info(`Successfully sent message after retry to ${peerIdStr}`);
this.attempts.delete(peerIdStr);
} else {
log.error(
`Failed to send message after retry to ${peerIdStr}: ${result.failure}`
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
}
} catch (error) {
log.error(
`Failed to send message after retry to ${peerIdStr}: ${error}`
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
}
} else {
try {
const newPeer = await this.renewPeer(peerId);
log.info(
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
);

this.attempts.delete(peerIdStr);
this.attempts.set(newPeer.id.toString(), 0);
await protocolSend();
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`);
}
}
}
}
8 changes: 4 additions & 4 deletions packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";

import { wakuFilter } from "./protocols/filter/index.js";
import { ReliabilityMonitorManager } from "./protocols/filter/reliability_monitor.js";
import { wakuLightPush } from "./protocols/light_push.js";
import { wakuStore } from "./protocols/store.js";
import { wakuLightPush } from "./protocols/lightpush/index.js";
import { wakuStore } from "./protocols/store/index.js";
import { ReliabilityMonitorManager } from "./reliability_monitor/index.js";

export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -196,7 +196,7 @@ export class WakuNode implements Waku {
}

public async stop(): Promise<void> {
ReliabilityMonitorManager.destroyAll();
ReliabilityMonitorManager.stopAll();
this.connectionManager.stop();
await this.libp2p.stop();
}
Expand Down
17 changes: 12 additions & 5 deletions packages/tests/tests/light-push/peer_management.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LightNode } from "@waku/interfaces";
import { createEncoder, utf8ToBytes } from "@waku/sdk";
import { delay } from "@waku/utils";
import { expect } from "chai";
import { describe } from "mocha";

Expand Down Expand Up @@ -78,18 +79,24 @@ describe("Waku Light Push: Peer Management: E2E", function () {
expect(response2.failures).to.have.length(1);
expect(response2.failures?.[0].peerId).to.equal(peerToDisconnect);

// send another lightpush request -- renewal should have triggerred and new peer should be used instead of the disconnected one
// send another lightpush request
// reattempts to send should be triggerred
// then renewal should happen
// so one failure should exist
const response3 = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});

// wait for reattempts to finish as they are async and not awaited
await delay(500);

// doing -1 because the peer that was disconnected is not in the successes
expect(response3.successes.length).to.be.equal(
waku.lightPush.numPeersToUse
waku.lightPush.numPeersToUse - 1
);
// and exists in failure instead
expect(response3.failures).to.have.length(1);

expect(response3.successes).to.not.include(peerToDisconnect);
if (response3.failures) {
expect(response3.failures.length).to.equal(0);
}
});
});

0 comments on commit 7a6247c

Please sign in to comment.