Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: invoke progress events during dialing #2596

Merged
merged 2 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/interface-internal/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"@libp2p/interface": "^1.4.1",
"@libp2p/peer-collections": "^5.2.3",
"@multiformats/multiaddr": "^12.2.3",
"progress-events": "^1.0.0",
"uint8arraylist": "^2.4.8"
},
"devDependencies": {
Expand Down
13 changes: 11 additions & 2 deletions packages/interface-internal/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions } from '@libp2p/interface'
import type { TransportManagerDialProgressEvents } from '../transport-manager/index.js'
import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions, Address } from '@libp2p/interface'
import type { PeerMap } from '@libp2p/peer-collections'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

export interface OpenConnectionOptions extends AbortOptions {
export type OpenConnectionProgressEvents =
TransportManagerDialProgressEvents |
ProgressEvent<'dial:already-connected'> |
ProgressEvent<'dial:already-in-dial-queue'> |
ProgressEvent<'dial:add-to-dial-queue'> |
ProgressEvent<'dial:calculate-addresses', Address[]>

export interface OpenConnectionOptions extends AbortOptions, ProgressOptions<OpenConnectionProgressEvents> {
/**
* Connection requests with a higher priority will be executed before those
* with a lower priority. (default: 50)
Expand Down
12 changes: 10 additions & 2 deletions packages/interface-internal/src/transport-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import type { Connection, Listener, Transport } from '@libp2p/interface'
import type { AbortOptions, Connection, Listener, Transport } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

export type TransportManagerDialProgressEvents =
ProgressEvent<'dial:selected-transport', string>

export interface TransportManagerDialOptions extends AbortOptions, ProgressOptions<TransportManagerDialProgressEvents> {

}

export interface TransportManager {
/**
Expand All @@ -13,7 +21,7 @@ export interface TransportManager {
* a multiaddr, you may want to call openConnection on the connection manager
* instead.
*/
dial(ma: Multiaddr, options?: any): Promise<Connection>
dial(ma: Multiaddr, options?: TransportManagerDialOptions): Promise<Connection>

/**
* Return all addresses currently being listened on
Expand Down
1 change: 1 addition & 0 deletions packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
"merge-options": "^3.0.4",
"multiformats": "^13.1.0",
"p-defer": "^4.0.1",
"progress-events": "^1.0.0",
"race-event": "^1.3.0",
"race-signal": "^1.0.2",
"uint8arrays": "^5.1.0"
Expand Down
22 changes: 12 additions & 10 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { Circuit } from '@multiformats/multiaddr-matcher'
import { type ClearableSignal, anySignal } from 'any-signal'
import { CustomProgressEvent } from 'progress-events'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
Expand All @@ -19,21 +20,17 @@
} from './constants.js'
import { resolveMultiaddrs } from './utils.js'
import { DEFAULT_DIAL_PRIORITY } from './index.js'
import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting, IsDialableOptions } from '@libp2p/interface'
import type { TransportManager } from '@libp2p/interface-internal'
import type { AddressSorter, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting, IsDialableOptions } from '@libp2p/interface'
import type { OpenConnectionOptions, OpenConnectionProgressEvents, TransportManager } from '@libp2p/interface-internal'
import type { DNS } from '@multiformats/dns'
import type { ProgressOptions } from 'progress-events'

export interface PendingDialTarget {
resolve(value: any): void
reject(err: Error): void
}

export interface DialOptions extends AbortOptions {
priority?: number
force?: boolean
}

interface DialQueueJobOptions extends PriorityQueueJobOptions {
interface DialQueueJobOptions extends PriorityQueueJobOptions, ProgressOptions<OpenConnectionProgressEvents> {
peerId?: PeerId
multiaddrs: Set<string>
}
Expand Down Expand Up @@ -134,7 +131,7 @@
* The dial to the first address that is successfully able to upgrade a
* connection will be used, all other dials will be aborted when that happens.
*/
async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: DialOptions = {}): Promise<Connection> {
async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise<Connection> {
const { peerId, multiaddrs } = getPeerAddress(peerIdOrMultiaddr)

// make sure we don't have an existing connection to any of the addresses we
Expand All @@ -155,6 +152,7 @@

if (existingConnection != null) {
this.log('already connected to %a', existingConnection.remoteAddr)
options.onProgress?.(new CustomProgressEvent('dial:already-connected'))

Check warning on line 155 in packages/libp2p/src/connection-manager/dial-queue.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/dial-queue.ts#L155

Added line #L155 was not covered by tests
return existingConnection
}

Expand Down Expand Up @@ -189,6 +187,7 @@
existingDial.options.multiaddrs.add(multiaddr.toString())
}

options.onProgress?.(new CustomProgressEvent('dial:already-in-dial-queue'))

Check warning on line 190 in packages/libp2p/src/connection-manager/dial-queue.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/dial-queue.ts#L190

Added line #L190 was not covered by tests
return existingDial.join(options)
}

Expand All @@ -198,6 +197,7 @@

this.log('creating dial target for %p', peerId, multiaddrs.map(ma => ma.toString()))

options.onProgress?.(new CustomProgressEvent('dial:add-to-dial-queue'))
return this.queue.add(async (options) => {
// create abort conditions - need to do this before `calculateMultiaddrs` as
// we may be about to resolve a dns addr which can time out
Expand All @@ -212,6 +212,8 @@
signal
})

options?.onProgress?.(new CustomProgressEvent<Address[]>('dial:calculate-addresses', addrsToDial))

addrsToDial.map(({ multiaddr }) => multiaddr.toString()).forEach(addr => {
options?.multiaddrs.add(addr)
})
Expand Down Expand Up @@ -299,7 +301,7 @@
}

// eslint-disable-next-line complexity
private async calculateMultiaddrs (peerId?: PeerId, multiaddrs: Set<string> = new Set<string>(), options: DialOptions = {}): Promise<Address[]> {
private async calculateMultiaddrs (peerId?: PeerId, multiaddrs: Set<string> = new Set<string>(), options: OpenConnectionOptions = {}): Promise<Address[]> {
const addrs: Address[] = [...multiaddrs].map(ma => ({
multiaddr: multiaddr(ma),
isCertified: false
Expand Down
2 changes: 2 additions & 0 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { RateLimiter } from '@libp2p/utils/rate-limiter'
import { type Multiaddr, type Resolver, multiaddr } from '@multiformats/multiaddr'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { CustomProgressEvent } from 'progress-events'
import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import { AutoDial } from './auto-dial.js'
Expand Down Expand Up @@ -509,6 +510,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
if (existingConnection != null) {
this.log('had an existing non-transient connection to %p', peerId)

options.onProgress?.(new CustomProgressEvent('dial:already-connected'))
return existingConnection
}
}
Expand Down
9 changes: 6 additions & 3 deletions packages/libp2p/src/transport-manager.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { CodeError, FaultTolerance } from '@libp2p/interface'
import { trackedMap } from '@libp2p/utils/tracked-map'
import { CustomProgressEvent } from 'progress-events'
import { codes } from './errors.js'
import type { Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, TypedEventTarget, Metrics, Startable, Listener, Transport, Upgrader } from '@libp2p/interface'
import type { AddressManager, TransportManager } from '@libp2p/interface-internal'
import type { Libp2pEvents, ComponentLogger, Logger, Connection, TypedEventTarget, Metrics, Startable, Listener, Transport, Upgrader } from '@libp2p/interface'
import type { AddressManager, TransportManager, TransportManagerDialOptions } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'

export interface TransportManagerInit {
Expand Down Expand Up @@ -107,13 +108,15 @@ export class DefaultTransportManager implements TransportManager, Startable {
/**
* Dials the given Multiaddr over it's supported transport
*/
async dial (ma: Multiaddr, options?: AbortOptions): Promise<Connection> {
async dial (ma: Multiaddr, options?: TransportManagerDialOptions): Promise<Connection> {
const transport = this.dialTransportForMultiaddr(ma)

if (transport == null) {
throw new CodeError(`No transport available for address ${String(ma)}`, codes.ERR_TRANSPORT_UNAVAILABLE)
}

options?.onProgress?.(new CustomProgressEvent<string>('dial:selected-transport', transport[Symbol.toStringTag]))

try {
return await transport.dial(ma, {
...options,
Expand Down
10 changes: 5 additions & 5 deletions packages/libp2p/test/connection-manager/direct.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ describe('dialing (direct, WebSockets)', () => {
})

sinon.stub(localTM, 'dial').callsFake(async (addr, options) => {
expect(options.signal).to.exist()
expect(options.signal.aborted).to.equal(false)
expect(options?.signal).to.exist()
expect(options?.signal?.aborted).to.equal(false)
expect(addr.toString()).to.eql(remoteAddr.toString())
await delay(60)
expect(options.signal.aborted).to.equal(true)
expect(options?.signal?.aborted).to.equal(true)
throw new AbortError()
})

Expand Down Expand Up @@ -235,10 +235,10 @@ describe('dialing (direct, WebSockets)', () => {
sinon.stub(localTM, 'dial').callsFake(async (_, options) => {
const deferredDial = pDefer<Connection>()
const onAbort = (): void => {
options.signal.removeEventListener('abort', onAbort)
options?.signal?.removeEventListener('abort', onAbort)
deferredDial.reject(new AbortError())
}
options.signal.addEventListener('abort', onAbort)
options?.signal?.addEventListener('abort', onAbort)
return deferredDial.promise
})

Expand Down
Loading