From f7e92203568daf3502c1d0a5f5b44cfc44968b2e Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 19 Jun 2024 17:50:40 +0100 Subject: [PATCH 1/2] feat: invoke progress events during dialing If the user passes an `onProgress` callback to a dial request, invoke the callback with progress events during the dial. --- .../src/connection-manager/index.ts | 13 +++++++++-- .../src/transport-manager/index.ts | 12 ++++++++-- .../src/connection-manager/dial-queue.ts | 22 ++++++++++--------- .../libp2p/src/connection-manager/index.ts | 2 ++ packages/libp2p/src/transport-manager.ts | 9 +++++--- .../test/connection-manager/direct.spec.ts | 10 ++++----- 6 files changed, 46 insertions(+), 22 deletions(-) diff --git a/packages/interface-internal/src/connection-manager/index.ts b/packages/interface-internal/src/connection-manager/index.ts index 7754641f5d..6c8668d878 100644 --- a/packages/interface-internal/src/connection-manager/index.ts +++ b/packages/interface-internal/src/connection-manager/index.ts @@ -1,8 +1,17 @@ -import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions } from '@libp2p/interface' +import type { TransportManagerDialProgressEvents } from '../transport-manager/index.js' +import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions, Address } from '@libp2p/interface' import type { PeerMap } from '@libp2p/peer-collections' import type { Multiaddr } from '@multiformats/multiaddr' +import type { ProgressOptions, ProgressEvent } from 'progress-events' -export interface OpenConnectionOptions extends AbortOptions { +export type OpenConnectionProgressEvents = + TransportManagerDialProgressEvents | + ProgressEvent<'dial:already-connected'> | + ProgressEvent<'dial:already-in-dial-queue'> | + ProgressEvent<'dial:add-to-dial-queue'> | + ProgressEvent<'dial:calculate-addresses', Address[]> + +export interface OpenConnectionOptions extends AbortOptions, ProgressOptions { /** * Connection requests with a higher priority will be executed before those * with a lower priority. (default: 50) diff --git a/packages/interface-internal/src/transport-manager/index.ts b/packages/interface-internal/src/transport-manager/index.ts index 734ec23939..7efcb223b3 100644 --- a/packages/interface-internal/src/transport-manager/index.ts +++ b/packages/interface-internal/src/transport-manager/index.ts @@ -1,5 +1,13 @@ -import type { Connection, Listener, Transport } from '@libp2p/interface' +import type { AbortOptions, Connection, Listener, Transport } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' +import type { ProgressOptions, ProgressEvent } from 'progress-events' + +export type TransportManagerDialProgressEvents = + ProgressEvent<'dial:selected-transport', string> + +export interface TransportManagerDialOptions extends AbortOptions, ProgressOptions { + +} export interface TransportManager { /** @@ -13,7 +21,7 @@ export interface TransportManager { * a multiaddr, you may want to call openConnection on the connection manager * instead. */ - dial(ma: Multiaddr, options?: any): Promise + dial(ma: Multiaddr, options?: TransportManagerDialOptions): Promise /** * Return all addresses currently being listened on diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index ec5b7fb1e3..3329fa98ef 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -7,6 +7,7 @@ import { type Multiaddr, type Resolver, resolvers, multiaddr } from '@multiforma import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' import { Circuit } from '@multiformats/multiaddr-matcher' import { type ClearableSignal, anySignal } from 'any-signal' +import { CustomProgressEvent } from 'progress-events' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { codes } from '../errors.js' import { getPeerAddress } from '../get-peer.js' @@ -19,21 +20,17 @@ import { } from './constants.js' import { resolveMultiaddrs } from './utils.js' import { DEFAULT_DIAL_PRIORITY } from './index.js' -import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting, IsDialableOptions } from '@libp2p/interface' -import type { TransportManager } from '@libp2p/interface-internal' +import type { AddressSorter, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting, IsDialableOptions } from '@libp2p/interface' +import type { OpenConnectionOptions, OpenConnectionProgressEvents, TransportManager } from '@libp2p/interface-internal' import type { DNS } from '@multiformats/dns' +import type { ProgressOptions } from 'progress-events' export interface PendingDialTarget { resolve(value: any): void reject(err: Error): void } -export interface DialOptions extends AbortOptions { - priority?: number - force?: boolean -} - -interface DialQueueJobOptions extends PriorityQueueJobOptions { +interface DialQueueJobOptions extends PriorityQueueJobOptions, ProgressOptions { peerId?: PeerId multiaddrs: Set } @@ -134,7 +131,7 @@ export class DialQueue { * The dial to the first address that is successfully able to upgrade a * connection will be used, all other dials will be aborted when that happens. */ - async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: DialOptions = {}): Promise { + async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise { const { peerId, multiaddrs } = getPeerAddress(peerIdOrMultiaddr) // make sure we don't have an existing connection to any of the addresses we @@ -155,6 +152,7 @@ export class DialQueue { if (existingConnection != null) { this.log('already connected to %a', existingConnection.remoteAddr) + options.onProgress?.(new CustomProgressEvent('dial:already-connected')) return existingConnection } @@ -189,6 +187,7 @@ export class DialQueue { existingDial.options.multiaddrs.add(multiaddr.toString()) } + options.onProgress?.(new CustomProgressEvent('dial:already-in-dial-queue')) return existingDial.join(options) } @@ -198,6 +197,7 @@ export class DialQueue { this.log('creating dial target for %p', peerId, multiaddrs.map(ma => ma.toString())) + options.onProgress?.(new CustomProgressEvent('dial:add-to-dial-queue')) return this.queue.add(async (options) => { // create abort conditions - need to do this before `calculateMultiaddrs` as // we may be about to resolve a dns addr which can time out @@ -212,6 +212,8 @@ export class DialQueue { signal }) + options?.onProgress?.(new CustomProgressEvent('dial:calculate-addresses', addrsToDial)) + addrsToDial.map(({ multiaddr }) => multiaddr.toString()).forEach(addr => { options?.multiaddrs.add(addr) }) @@ -299,7 +301,7 @@ export class DialQueue { } // eslint-disable-next-line complexity - private async calculateMultiaddrs (peerId?: PeerId, multiaddrs: Set = new Set(), options: DialOptions = {}): Promise { + private async calculateMultiaddrs (peerId?: PeerId, multiaddrs: Set = new Set(), options: OpenConnectionOptions = {}): Promise { const addrs: Address[] = [...multiaddrs].map(ma => ({ multiaddr: multiaddr(ma), isCertified: false diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index 7b6233b35e..f7159eecbe 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -4,6 +4,7 @@ import { defaultAddressSort } from '@libp2p/utils/address-sort' import { RateLimiter } from '@libp2p/utils/rate-limiter' import { type Multiaddr, type Resolver, multiaddr } from '@multiformats/multiaddr' import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' +import { CustomProgressEvent } from 'progress-events' import { codes } from '../errors.js' import { getPeerAddress } from '../get-peer.js' import { AutoDial } from './auto-dial.js' @@ -509,6 +510,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { if (existingConnection != null) { this.log('had an existing non-transient connection to %p', peerId) + options.onProgress?.(new CustomProgressEvent('dial:already-connected')) return existingConnection } } diff --git a/packages/libp2p/src/transport-manager.ts b/packages/libp2p/src/transport-manager.ts index 55e2b1ea6e..74a8382237 100644 --- a/packages/libp2p/src/transport-manager.ts +++ b/packages/libp2p/src/transport-manager.ts @@ -1,8 +1,9 @@ import { CodeError, FaultTolerance } from '@libp2p/interface' import { trackedMap } from '@libp2p/utils/tracked-map' +import { CustomProgressEvent } from 'progress-events' import { codes } from './errors.js' -import type { Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, TypedEventTarget, Metrics, Startable, Listener, Transport, Upgrader } from '@libp2p/interface' -import type { AddressManager, TransportManager } from '@libp2p/interface-internal' +import type { Libp2pEvents, ComponentLogger, Logger, Connection, TypedEventTarget, Metrics, Startable, Listener, Transport, Upgrader } from '@libp2p/interface' +import type { AddressManager, TransportManager, TransportManagerDialOptions } from '@libp2p/interface-internal' import type { Multiaddr } from '@multiformats/multiaddr' export interface TransportManagerInit { @@ -107,13 +108,15 @@ export class DefaultTransportManager implements TransportManager, Startable { /** * Dials the given Multiaddr over it's supported transport */ - async dial (ma: Multiaddr, options?: AbortOptions): Promise { + async dial (ma: Multiaddr, options?: TransportManagerDialOptions): Promise { const transport = this.dialTransportForMultiaddr(ma) if (transport == null) { throw new CodeError(`No transport available for address ${String(ma)}`, codes.ERR_TRANSPORT_UNAVAILABLE) } + options?.onProgress?.(new CustomProgressEvent('dial:selected-transport', transport[Symbol.toStringTag])) + try { return await transport.dial(ma, { ...options, diff --git a/packages/libp2p/test/connection-manager/direct.spec.ts b/packages/libp2p/test/connection-manager/direct.spec.ts index a325619abf..ef2f3941f4 100644 --- a/packages/libp2p/test/connection-manager/direct.spec.ts +++ b/packages/libp2p/test/connection-manager/direct.spec.ts @@ -156,11 +156,11 @@ describe('dialing (direct, WebSockets)', () => { }) sinon.stub(localTM, 'dial').callsFake(async (addr, options) => { - expect(options.signal).to.exist() - expect(options.signal.aborted).to.equal(false) + expect(options?.signal).to.exist() + expect(options?.signal?.aborted).to.equal(false) expect(addr.toString()).to.eql(remoteAddr.toString()) await delay(60) - expect(options.signal.aborted).to.equal(true) + expect(options?.signal?.aborted).to.equal(true) throw new AbortError() }) @@ -235,10 +235,10 @@ describe('dialing (direct, WebSockets)', () => { sinon.stub(localTM, 'dial').callsFake(async (_, options) => { const deferredDial = pDefer() const onAbort = (): void => { - options.signal.removeEventListener('abort', onAbort) + options?.signal?.removeEventListener('abort', onAbort) deferredDial.reject(new AbortError()) } - options.signal.addEventListener('abort', onAbort) + options?.signal?.addEventListener('abort', onAbort) return deferredDial.promise }) From a578c1fd97bd0698f8d484bc400c655e7a58eb95 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 20 Jun 2024 09:04:24 +0100 Subject: [PATCH 2/2] chore: add missing deps --- packages/interface-internal/package.json | 1 + packages/libp2p/package.json | 1 + 2 files changed, 2 insertions(+) diff --git a/packages/interface-internal/package.json b/packages/interface-internal/package.json index e942e4ca33..2cb608a998 100644 --- a/packages/interface-internal/package.json +++ b/packages/interface-internal/package.json @@ -51,6 +51,7 @@ "@libp2p/interface": "^1.4.1", "@libp2p/peer-collections": "^5.2.3", "@multiformats/multiaddr": "^12.2.3", + "progress-events": "^1.0.0", "uint8arraylist": "^2.4.8" }, "devDependencies": { diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index bdbf9f31f3..914ab11c0d 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -106,6 +106,7 @@ "merge-options": "^3.0.4", "multiformats": "^13.1.0", "p-defer": "^4.0.1", + "progress-events": "^1.0.0", "race-event": "^1.3.0", "race-signal": "^1.0.2", "uint8arrays": "^5.1.0"