From 877fe1dc1daf6826b60ac5011af2915c47864d90 Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Wed, 13 Mar 2024 19:33:50 +0530 Subject: [PATCH] feat: peer-exchange uses error codes (#1907) * setup a generic protocol result type (DRY) * metadata: use generic * lightpush: use generic * peer-exchange: use error codes + generic + update tests * add issue link to skipped test * tests: improve while loop readability --- packages/core/src/lib/light_push/index.ts | 25 +---- .../src/peer-exchange/waku_peer_exchange.ts | 33 ++++-- .../waku_peer_exchange_discovery.ts | 19 +++- packages/interfaces/src/metadata.ts | 14 +-- packages/interfaces/src/peer_exchange.ts | 6 +- packages/interfaces/src/protocols.ts | 28 ++++- .../tests/tests/peer-exchange/query.spec.ts | 105 ++++++++++-------- 7 files changed, 132 insertions(+), 98 deletions(-) diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 3e5dcd6be0..62d922c805 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -6,7 +6,8 @@ import { IMessage, Libp2p, ProtocolCreateOptions, - ProtocolError + ProtocolError, + ProtocolResult } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; import { isMessageSizeUnderCap } from "@waku/utils"; @@ -25,25 +26,9 @@ const log = new Logger("light-push"); export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1"; export { PushResponse }; -type PreparePushMessageResult = - | { - query: PushRpc; - error: null; - } - | { - query: null; - error: ProtocolError; - }; - -type CoreSendResult = - | { - success: null; - failure: Failure; - } - | { - success: PeerId; - failure: null; - }; +type PreparePushMessageResult = ProtocolResult<"query", PushRpc>; + +type CoreSendResult = ProtocolResult<"success", PeerId, "failure", Failure>; /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts index 9471b72271..babb6bf688 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts @@ -1,10 +1,11 @@ import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { EnrDecoder } from "@waku/enr"; -import type { +import { IPeerExchange, Libp2pComponents, PeerExchangeQueryParams, - PeerInfo, + PeerExchangeResult, + ProtocolError, PubsubTopic } from "@waku/interfaces"; import { isDefined } from "@waku/utils"; @@ -34,18 +35,18 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { /** * Make a peer exchange query to a peer */ - async query( - params: PeerExchangeQueryParams - ): Promise { + async query(params: PeerExchangeQueryParams): Promise { const { numPeers } = params; - const rpcQuery = PeerExchangeRPC.createRequest({ numPeers: BigInt(numPeers) }); const peer = await this.peerStore.get(params.peerId); if (!peer) { - throw new Error(`Peer ${params.peerId.toString()} not found`); + return { + peerInfos: null, + error: ProtocolError.NO_PEER_AVAILABLE + }; } const stream = await this.getStream(peer); @@ -65,15 +66,17 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { }); const { response } = PeerExchangeRPC.decode(bytes); - if (!response) { log.error( "PeerExchangeRPC message did not contains a `response` field" ); - return; + return { + peerInfos: null, + error: ProtocolError.EMPTY_PAYLOAD + }; } - return Promise.all( + const peerInfos = await Promise.all( response.peerInfos .map((peerInfo) => peerInfo.enr) .filter(isDefined) @@ -81,9 +84,17 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { return { ENR: await EnrDecoder.fromRLP(enr) }; }) ); + + return { + peerInfos, + error: null + }; } catch (err) { log.error("Failed to decode push reply", err); - return; + return { + peerInfos: null, + error: ProtocolError.DECODE_FAILED + }; } } } diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts index dc49339823..bbf3ca6ff6 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts @@ -7,7 +7,12 @@ import type { PeerId, PeerInfo } from "@libp2p/interface"; -import { Libp2pComponents, PubsubTopic, Tags } from "@waku/interfaces"; +import { + Libp2pComponents, + PeerExchangeResult, + PubsubTopic, + Tags +} from "@waku/interfaces"; import { encodeRelayShard, Logger } from "@waku/utils"; import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; @@ -160,15 +165,15 @@ export class PeerExchangeDiscovery }, queryInterval * currentAttempt); }; - private async query(peerId: PeerId): Promise { - const peerInfos = await this.peerExchange.query({ + private async query(peerId: PeerId): Promise { + const { error, peerInfos } = await this.peerExchange.query({ numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES, peerId }); - if (!peerInfos) { - log.error("Peer exchange query failed, no peer info returned"); - return; + if (error) { + log.error("Peer exchange query failed", error); + return { error, peerInfos: null }; } for (const _peerInfo of peerInfos) { @@ -214,6 +219,8 @@ export class PeerExchangeDiscovery }) ); } + + return { error: null, peerInfos }; } private abortQueriesForPeer(peerIdStr: string): void { diff --git a/packages/interfaces/src/metadata.ts b/packages/interfaces/src/metadata.ts index 492d3b4168..47d561b936 100644 --- a/packages/interfaces/src/metadata.ts +++ b/packages/interfaces/src/metadata.ts @@ -1,21 +1,13 @@ import type { PeerId } from "@libp2p/interface"; -import type { ShardInfo } from "./enr.js"; +import { type ShardInfo } from "./enr.js"; import type { IBaseProtocolCore, - ProtocolError, + ProtocolResult, ShardingParams } from "./protocols.js"; -export type QueryResult = - | { - shardInfo: ShardInfo; - error: null; - } - | { - shardInfo: null; - error: ProtocolError; - }; +export type QueryResult = ProtocolResult<"shardInfo", ShardInfo>; // IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol export interface IMetadata extends Omit { diff --git a/packages/interfaces/src/peer_exchange.ts b/packages/interfaces/src/peer_exchange.ts index fc64965cbe..9f9c624ab5 100644 --- a/packages/interfaces/src/peer_exchange.ts +++ b/packages/interfaces/src/peer_exchange.ts @@ -3,12 +3,14 @@ import type { PeerStore } from "@libp2p/interface"; import type { ConnectionManager } from "@libp2p/interface-internal"; import { IEnr } from "./enr.js"; -import { IBaseProtocolCore } from "./protocols.js"; +import { IBaseProtocolCore, ProtocolResult } from "./protocols.js"; export interface IPeerExchange extends IBaseProtocolCore { - query(params: PeerExchangeQueryParams): Promise; + query(params: PeerExchangeQueryParams): Promise; } +export type PeerExchangeResult = ProtocolResult<"peerInfos", PeerInfo[]>; + export interface PeerExchangeQueryParams { numPeers: number; peerId: PeerId; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index b1a161058b..012a7834af 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -101,6 +101,27 @@ export type Callback = ( msg: T ) => void | Promise; +// SK = success key name +// SV = success value type +// EK = error key name (default: "error") +// EV = error value type (default: ProtocolError) +export type ProtocolResult< + SK extends string, + SV, + EK extends string = "error", + EV = ProtocolError +> = + | ({ + [key in SK]: SV; + } & { + [key in EK]: null; + }) + | ({ + [key in SK]: null; + } & { + [key in EK]: EV; + }); + export enum ProtocolError { /** Could not determine the origin of the fault. Best to check connectivity and try again */ GENERIC_FAIL = "Generic error", @@ -146,7 +167,12 @@ export enum ProtocolError { * is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE` * or `DECODE_FAILED` can be used. */ - REMOTE_PEER_REJECTED = "Remote peer rejected" + REMOTE_PEER_REJECTED = "Remote peer rejected", + /** + * The protocol request timed out without a response. This may be due to a connection issue. + * Mitigation can be: retrying after a given time period + */ + REQUEST_TIMEOUT = "Request timeout" } export interface Failure { diff --git a/packages/tests/tests/peer-exchange/query.spec.ts b/packages/tests/tests/peer-exchange/query.spec.ts index 72e3027e4a..9ebb832af5 100644 --- a/packages/tests/tests/peer-exchange/query.spec.ts +++ b/packages/tests/tests/peer-exchange/query.spec.ts @@ -7,8 +7,8 @@ import { WakuPeerExchange, wakuPeerExchangeDiscovery } from "@waku/discovery"; -import type { LightNode, PeerInfo } from "@waku/interfaces"; -import { createLightNode, Libp2pComponents } from "@waku/sdk"; +import type { LightNode, PeerExchangeResult } from "@waku/interfaces"; +import { createLightNode, Libp2pComponents, ProtocolError } from "@waku/sdk"; import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; import { expect } from "chai"; @@ -38,7 +38,7 @@ describe("Peer Exchange Query", function () { let components: Libp2pComponents; let peerExchange: WakuPeerExchange; let numPeersToRequest: number; - let peerInfos: PeerInfo[]; + let queryResult: PeerExchangeResult; beforeEachCustom( this, @@ -85,57 +85,77 @@ describe("Peer Exchange Query", function () { peerExchange = new WakuPeerExchange(components, pubsubTopic); numPeersToRequest = 2; - // querying the connected peer - peerInfos = []; const startTime = Date.now(); - while (!peerInfos || peerInfos.length != numPeersToRequest) { - if (Date.now() - startTime > 100000) { + + while (true) { + if (Date.now() - startTime > 100_000) { log.error("Timeout reached, exiting the loop."); break; } - await delay(2000); - try { - peerInfos = await Promise.race([ + queryResult = await Promise.race([ peerExchange.query({ peerId: nwaku3PeerId, numPeers: numPeersToRequest - }) as Promise, - new Promise((resolve) => - setTimeout(() => resolve([]), 5000) + }), + new Promise((resolve) => + setTimeout( + () => + resolve({ + peerInfos: null, + error: ProtocolError.REQUEST_TIMEOUT + }), + 5000 + ) ) ]); - - if (peerInfos.length === 0) { - log.warn("Query timed out, retrying..."); + const hasErrors = queryResult?.error !== null; + const hasPeerInfos = + queryResult?.peerInfos && + queryResult.peerInfos.length === numPeersToRequest; + if (hasErrors) { + if (queryResult.error === ProtocolError.REQUEST_TIMEOUT) { + log.warn("Query timed out, retrying..."); + } else { + log.error("Error encountered, retrying...", queryResult.error); + } + continue; + } + if (!hasPeerInfos) { + log.warn( + "Peer info not available or does not match the requested number of peers, retrying..." + ); continue; } + break; } catch (error) { - log.warn("Error encountered, retrying..."); + log.warn("Error encountered, retrying...", error); } } }, - 120000 + 120_000 ); afterEachCustom(this, async () => { await tearDownNodes([nwaku1, nwaku2, nwaku3], waku); }); - // slow and flaky in CI + // slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911 it.skip("connected peers and dial", async function () { - expect(peerInfos[0].ENR).to.not.be.null; - expect(peerInfos[0].ENR?.peerInfo?.multiaddrs).to.not.be.null; + expect(queryResult.error).to.be.null; + + expect(queryResult.peerInfos?.[0].ENR).to.not.be.null; + expect(queryResult.peerInfos?.[0].ENR?.peerInfo?.multiaddrs).to.not.be.null; - const peerWsMA = peerInfos[0].ENR?.peerInfo?.multiaddrs[2]; + const peerWsMA = queryResult.peerInfos?.[0].ENR?.peerInfo?.multiaddrs[2]; const localPeerWsMAAsString = peerWsMA ?.toString() .replace(/\/ip4\/[\d.]+\//, "/ip4/127.0.0.1/"); const localPeerWsMA = multiaddr(localPeerWsMAAsString); let foundNodePeerId: PeerId | undefined = undefined; - const doesPeerIdExistInResponse = peerInfos.some(({ ENR }) => { + const doesPeerIdExistInResponse = queryResult.peerInfos?.some(({ ENR }) => { foundNodePeerId = ENR?.peerInfo?.id; return ENR?.peerInfo?.id.toString() === nwaku1PeerId.toString(); }); @@ -148,43 +168,34 @@ describe("Peer Exchange Query", function () { await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, foundNodePeerId); }); - // slow and flaky in CI + // slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911 it.skip("more peers than existing", async function () { - const peerInfo = await peerExchange.query({ + const result = await peerExchange.query({ peerId: nwaku3PeerId, numPeers: 5 }); - expect(peerInfo?.length).to.be.eq(numPeersToRequest); + expect(result.error).to.be.null; + expect(result.peerInfos?.length).to.be.eq(numPeersToRequest); }); - // slow and flaky in CI + // slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911 it.skip("less peers than existing", async function () { - const peerInfo = await peerExchange.query({ + const result = await peerExchange.query({ peerId: nwaku3PeerId, numPeers: 1 }); - expect(peerInfo?.length).to.be.eq(1); + expect(result.error).to.be.null; + expect(result.peerInfos?.length).to.be.eq(1); }); - // slow and flaky in CI + // slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911 it.skip("non connected peers", async function () { // querying the non connected peer - try { - await peerExchange.query({ - peerId: nwaku1PeerId, - numPeers: numPeersToRequest - }); - throw new Error("Query on not connected peer succeeded unexpectedly."); - } catch (error) { - if ( - !( - error instanceof Error && - (error.message === "Not Found" || - error.message === "Failed to get a connection to the peer") - ) - ) { - throw error; - } - } + const result = await peerExchange.query({ + peerId: nwaku1PeerId, + numPeers: numPeersToRequest + }); + expect(result.error).to.be.eq(ProtocolError.NO_PEER_AVAILABLE); + expect(result.peerInfos).to.be.null; }); });