From 8122de71b784fc86ca36a1e83d3ac23eb9d069e5 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 18 Jun 2024 21:44:56 -0400 Subject: [PATCH] feat: `forceUseAllPeers` to wait for all connected peers to be resoled --- packages/interfaces/src/sender.ts | 19 ++++++ packages/sdk/src/protocols/base_protocol.ts | 59 ++++++++++--------- packages/sdk/src/protocols/light_push.ts | 9 ++- .../single_node/multiple_pubsub.node.spec.ts | 10 +++- 4 files changed, 62 insertions(+), 35 deletions(-) diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index 792ebfcea4..672395e84c 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -9,9 +9,28 @@ export interface ISender { ) => Promise; } +/** + * Options for using LightPush + */ export type SendOptions = { + /** + * Optional flag to enable auto-retry with exponential backoff + */ autoRetry?: boolean; + /** + * Optional flag to force using all available peers + */ + forceUseAllPeers?: boolean; + /** + * Optional maximum number of attempts for exponential backoff + */ maxAttempts?: number; + /** + * Optional initial delay in milliseconds for exponential backoff + */ initialDelay?: number; + /** + * Optional maximum delay in milliseconds for exponential backoff + */ maxDelay?: number; }; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index acfe6ac5ff..9688859cea 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -73,51 +73,52 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { /** * Checks if there are peers to send a message to. - * If there are connected peers, returns `true`. - * If `autoRetry` is `false`, returns `false`. - * If `autoRetry` is `true`, tries to find new peers from the ConnectionManager. - * If no peers are found after retries, returns `false`. - * If peers are found, returns `true`. - * @param autoRetry Optional flag to enable auto-retry with exponential backoff (default: false) + * If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`. + * If `forceUseAllPeers` is `true` or there are no connected peers, tries to find new peers from the ConnectionManager. + * If `autoRetry` is `false`, returns `false` if no peers are found. + * If `autoRetry` is `true`, tries to find new peers from the ConnectionManager with exponential backoff. + * Returns `true` if peers are found, `false` otherwise. + * @param options Optional options object + * @param options.autoRetry Optional flag to enable auto-retry with exponential backoff (default: false) + * @param options.forceUseAllPeers Optional flag to force using all available peers (default: false) + * @param options.initialDelay Optional initial delay in milliseconds for exponential backoff (default: 10) + * @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3) + * @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100) */ protected hasPeers = async ( options: Partial = {} ): Promise => { const { - autoRetry, - initialDelay: _initialDelay, - maxAttempts: _maxAttempts, - maxDelay: _maxDelay + autoRetry = false, + forceUseAllPeers = false, + initialDelay = 10, + maxAttempts = 3, + maxDelay = 100 } = options; - if (this.connectedPeers.length > 0) return true; - if (!autoRetry) return false; - let success = await this.maintainPeers(); - let attempts = 0; - - const initialDelay = _initialDelay ?? 10; - const maxAttempts = _maxAttempts ?? 3; - const maxDelay = _maxDelay ?? 100; + if (!forceUseAllPeers && this.connectedPeers.length > 0) return true; - while (!success && attempts < maxAttempts) { + let attempts = 0; + while (attempts < maxAttempts) { attempts++; + if (await this.maintainPeers()) { + if (this.peers.length < this.numPeersToUse) { + this.log.warn( + `Found only ${this.peers.length} peers, expected ${this.numPeersToUse}` + ); + } + return true; + } + if (!autoRetry) return false; const delayMs = Math.min( initialDelay * Math.pow(2, attempts - 1), maxDelay ); await delay(delayMs); - success = await this.maintainPeers(); } - if (this.peers.length === 0) { - this.log.error("Failed to find peers to send message to"); - return false; - } else if (this.peers.length < this.numPeersToUse) { - this.log.warn( - `Found only ${this.peers.length} peers, expected ${this.numPeersToUse}` - ); - } - return true; + this.log.error("Failed to find peers to send message to"); + return false; }; /** diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index a2d36df10d..ae52e06e49 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -35,10 +35,13 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { async send( encoder: IEncoder, message: IMessage, - options: SendOptions = { - autoRetry: true - } + _options?: SendOptions ): Promise { + const options = { + autoRetry: true, + ..._options + } as SendOptions; + const successes: PeerId[] = []; const failures: Failure[] = []; diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index f00bc8804b..b971f9b12c 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -432,9 +432,13 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () const { failures: f1 } = await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - const { failures: f2 } = await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); + const { failures: f2 } = await waku.lightPush.send( + customEncoder2, + { + payload: utf8ToBytes("M2") + }, + { forceUseAllPeers: true } + ); expect(f1).to.be.empty; expect(f2).to.be.empty;