From 4101b23083662ada08e15dba0e79d1340f50225a Mon Sep 17 00:00:00 2001 From: chad Date: Tue, 17 Oct 2023 23:31:41 -0500 Subject: [PATCH] refactor!: remove per-peer parallel dialling (#2090) Since we are now dialling one peer at a time as a means of smarter dialling, we no longer need the option to control the concurrency of parallel dials across a single peers' multiaddrs. Closes https://github.com/libp2p/js-libp2p/issues/2090 --- doc/migrations/v0.46-v1.0.0.md | 7 +- .../connection-manager/constants.defaults.ts | 5 - .../src/connection-manager/dial-queue.ts | 13 +-- .../libp2p/src/connection-manager/index.ts | 11 +- .../connection-manager/dial-queue.spec.ts | 97 ----------------- .../test/connection-manager/direct.node.ts | 100 +++++++----------- .../test/connection-manager/direct.spec.ts | 31 +----- 7 files changed, 46 insertions(+), 218 deletions(-) diff --git a/doc/migrations/v0.46-v1.0.0.md b/doc/migrations/v0.46-v1.0.0.md index e6b04bddee..656d9f2e28 100644 --- a/doc/migrations/v0.46-v1.0.0.md +++ b/doc/migrations/v0.46-v1.0.0.md @@ -1,5 +1,5 @@ -# Migrating to libp2p@1.0.0 +# Migrating to libp2p@1.0 A migration guide for refactoring your application code from libp2p `v0.46` to `v1.0.0`. @@ -9,6 +9,7 @@ A migration guide for refactoring your application code from libp2p `v0.46` to ` - [KeyChain](#keychain) - [Pnet](#pnet) - [Metrics](#metrics) +- [Connection Manager](#connection-manager) ## AutoNAT @@ -77,3 +78,7 @@ The following metrics were renamed: `libp2p_dialler_pending_dials` => `libp2p_dial_queue_pending_dials` `libp2p_dialler_in_progress_dials` => `libp2p_dial_queue_in_progress_dials` + +## Connection Manager + +We now only dial one peer at a time as this is a more efficient use of resources. This means that the `maParallelDialsPerPeer` option has been removed. diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index 13f1f18957..2eb5044e77 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -13,11 +13,6 @@ export const INBOUND_UPGRADE_TIMEOUT = 30e3 */ export const MAX_PEER_ADDRS_TO_DIAL = 25 -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDialsPerPeer - */ -export const MAX_PARALLEL_DIALS_PER_PEER = 1 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialInterval */ diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index f56cc6f651..5ee1032c34 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -12,7 +12,6 @@ import { codes } from '../errors.js' import { getPeerAddress } from '../get-peer.js' import { DIAL_TIMEOUT, - MAX_PARALLEL_DIALS_PER_PEER, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, LAST_DIAL_FAILURE_KEY @@ -45,7 +44,6 @@ interface DialerInit { addressSorter?: AddressSorter maxParallelDials?: number maxPeerAddrsToDial?: number - maxParallelDialsPerPeer?: number dialTimeout?: number resolvers?: Record } @@ -54,7 +52,6 @@ const defaultOptions = { addressSorter: defaultAddressSort, maxParallelDials: MAX_PARALLEL_DIALS, maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL, - maxParallelDialsPerPeer: MAX_PARALLEL_DIALS_PER_PEER, dialTimeout: DIAL_TIMEOUT, resolvers: { dnsaddr: dnsaddrResolver @@ -78,7 +75,6 @@ export class DialQueue { private readonly transportManager: TransportManager private readonly addressSorter: AddressSorter private readonly maxPeerAddrsToDial: number - private readonly maxParallelDialsPerPeer: number private readonly dialTimeout: number private readonly inProgressDialCount?: Metric private readonly pendingDialCount?: Metric @@ -87,7 +83,6 @@ export class DialQueue { constructor (components: DialQueueComponents, init: DialerInit = {}) { this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial - this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout this.peerId = components.peerId @@ -421,12 +416,8 @@ export class DialQueue { const dialAbortControllers: Array<(AbortController | undefined)> = pendingDial.multiaddrs.map(() => new AbortController()) try { - // internal peer dial queue to ensure we only dial the configured number of addresses - // per peer at the same time to prevent one peer with a lot of addresses swamping - // the dial queue - const peerDialQueue = new PQueue({ - concurrency: this.maxParallelDialsPerPeer - }) + // internal peer dial queue - only one dial per peer at a time + const peerDialQueue = new PQueue({ concurrency: 1 }) peerDialQueue.on('error', (err) => { log.error('error dialling', err) }) diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index c13e49a53a..fd14ded1c2 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -10,7 +10,7 @@ import { codes } from '../errors.js' import { getPeerAddress } from '../get-peer.js' import { AutoDial } from './auto-dial.js' import { ConnectionPruner } from './connection-pruner.js' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js' +import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js' import { DialQueue } from './dial-queue.js' import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions } from '@libp2p/interface' import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection' @@ -92,14 +92,6 @@ export interface ConnectionManagerInit { */ maxParallelDials?: number - /** - * To prevent individual peers with large amounts of multiaddrs swamping the - * dial queue, this value controls how many addresses to dial in parallel per - * peer. So for example if two peers have 10 addresses and this value is set - * at 5, we will dial 5 addresses from each at a time. (default: 1) - */ - maxParallelDialsPerPeer?: number - /** * Maximum number of addresses allowed for a given peer - if a peer has more * addresses than this then the dial will fail. (default: 25) @@ -257,7 +249,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { addressSorter: init.addressSorter ?? defaultAddressSort, maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS, maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL, - maxParallelDialsPerPeer: init.maxParallelDialsPerPeer ?? MAX_PARALLEL_DIALS_PER_PEER, dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT, resolvers: init.resolvers ?? { dnsaddr: dnsaddrResolver diff --git a/packages/libp2p/test/connection-manager/dial-queue.spec.ts b/packages/libp2p/test/connection-manager/dial-queue.spec.ts index 2505ac2950..4aa036089d 100644 --- a/packages/libp2p/test/connection-manager/dial-queue.spec.ts +++ b/packages/libp2p/test/connection-manager/dial-queue.spec.ts @@ -186,103 +186,6 @@ describe('dial queue', () => { expect(reject).to.have.property('callCount', addrs.length) }) - it('should abort all dials when its signal is aborted', async () => { - const signals: Record = {} - const slowDial = async (): Promise => { - await delay(1000) - } - const actions: Record Promise> = { - '/ip4/127.0.0.1/tcp/1231': slowDial, - '/ip4/127.0.0.1/tcp/1232': slowDial, - '/ip4/127.0.0.1/tcp/1233': slowDial - } - const controller = new AbortController() - - dialer = new DialQueue(components, { - maxParallelDials: 2, - maxParallelDialsPerPeer: 10 - }) - - components.transportManager.transportForMultiaddr.returns(stubInterface()) - components.transportManager.dial.callsFake(async (ma, options = {}) => { - const maStr = ma.toString() - const action = actions[maStr] - - if (action != null) { - signals[maStr] = options.signal - return action() - } - - throw new Error(`No action found for multiaddr ${maStr}`) - }) - - setTimeout(() => { controller.abort() }, 100) - - await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)), { - signal: controller.signal - })).to.eventually.be.rejected - .with.property('name', 'CodeError') - - expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true) - expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', true) - expect(signals).to.not.have.property('/ip4/127.0.0.1/tcp/1233') // never dialled as above the maxParallelDials limit - }) - - it('should abort other dials when one succeeds', async () => { - const remotePeer = await createEd25519PeerId() - const connection1 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer)) - const connection2 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer)) - const connection3 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer)) - const actions: Record Promise> = { - '/ip4/127.0.0.1/tcp/1231': async () => { - // Slow dial - await delay(1000) - - return connection1 - }, - '/ip4/127.0.0.1/tcp/1232': async () => { - // Fast dial - await delay(10) - - return connection2 - }, - '/ip4/127.0.0.1/tcp/1233': async () => { - // Slow dial - await delay(1000) - - return connection3 - } - } - const signals: Record = {} - - components.transportManager.transportForMultiaddr.returns(stubInterface()) - components.transportManager.dial.callsFake(async (ma, opts = {}) => { - const maStr = ma.toString() - const action = actions[maStr] - - if (action != null) { - signals[maStr] = opts.signal - return action() - } - - throw new Error(`No action found for multiaddr ${maStr}`) - }) - - dialer = new DialQueue(components, { - maxParallelDials: 50, - maxParallelDialsPerPeer: 10 - }) - - await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)))).to.eventually.equal(connection2) - - // Dial attempt finished without connection - expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true) - // Dial attempt led to connection - expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', false) - // Dial attempt finished without connection - expect(signals['/ip4/127.0.0.1/tcp/1233']).to.have.property('aborted', true) - }) - it('should ignore DNS addresses for other peers', async () => { const remotePeer = await createEd25519PeerId() const otherRemotePeer = await createEd25519PeerId() diff --git a/packages/libp2p/test/connection-manager/direct.node.ts b/packages/libp2p/test/connection-manager/direct.node.ts index 793731a574..2dd4bde84a 100644 --- a/packages/libp2p/test/connection-manager/direct.node.ts +++ b/packages/libp2p/test/connection-manager/direct.node.ts @@ -20,7 +20,6 @@ import { MemoryDatastore } from 'datastore-core/memory' import delay from 'delay' import { pipe } from 'it-pipe' import { pushable } from 'it-pushable' -import pDefer from 'p-defer' import pWaitFor from 'p-wait-for' import sinon from 'sinon' import { stubInterface } from 'sinon-ts' @@ -218,84 +217,57 @@ describe('dialing (direct, TCP)', () => { .and.to.have.property('code', ERR_TIMEOUT) }) - it('should dial to the max concurrency', async () => { - const peerId = await createEd25519PeerId() - const addrs = [ - multiaddr('/ip4/0.0.0.0/tcp/8000'), - multiaddr('/ip4/0.0.0.0/tcp/8001'), - multiaddr('/ip4/0.0.0.0/tcp/8002') - ] + it('should only dial to the max concurrency', async () => { + const peerId1 = await createEd25519PeerId() + const peerId2 = await createEd25519PeerId() + const peerId3 = await createEd25519PeerId() + + const addr1 = multiaddr(`/ip4/127.0.0.1/tcp/1234/p2p/${peerId1}`) + const addr2 = multiaddr(`/ip4/127.0.12.4/tcp/3210/p2p/${peerId2}`) + const addr3 = multiaddr(`/ip4/123.3.11.1/tcp/2010/p2p/${peerId3}`) + + const slowDial = async (): Promise => { + await delay(100) + return mockConnection(mockMultiaddrConnection(mockDuplex(), peerId1)) + } + + const actions: Record Promise> = { + [addr1.toString()]: slowDial, + [addr2.toString()]: slowDial, + [addr3.toString()]: async () => mockConnection(mockMultiaddrConnection(mockDuplex(), peerId3)) + } const dialer = new DialQueue(localComponents, { - maxParallelDials: 2, - maxParallelDialsPerPeer: 10 + maxParallelDials: 2 }) - const deferredDial = pDefer() const transportManagerDialStub = sinon.stub(localTM, 'dial') - transportManagerDialStub.callsFake(async () => deferredDial.promise) + transportManagerDialStub.callsFake(async ma => { + const maStr = ma.toString() + const action = actions[maStr] - // Perform 3 multiaddr dials - void dialer.dial(addrs) - - // Let the call stack run - await delay(0) + if (action != null) { + return action() + } - // We should have 2 in progress, and 1 waiting - expect(transportManagerDialStub).to.have.property('callCount', 2) + throw new Error(`No action found for multiaddr ${maStr}`) + }) - deferredDial.resolve(mockConnection(mockMultiaddrConnection(mockDuplex(), peerId))) + // dial 3 different peers + void Promise.all([ + dialer.dial(addr1), + dialer.dial(addr2), + dialer.dial(addr3) + ]) // Let the call stack run await delay(0) - // Only two dials should be executed, as the first dial will succeed + // We should have 2 in progress, and 1 waiting expect(transportManagerDialStub).to.have.property('callCount', 2) - }) - it('should append the remote peerId to multiaddrs', async () => { - const addrs = [ - multiaddr('/ip4/0.0.0.0/tcp/8000'), - multiaddr('/ip4/0.0.0.0/tcp/8001'), - multiaddr('/ip4/0.0.0.0/tcp/8002'), - multiaddr('/unix/tmp/some/path.sock') - ] - - // Inject data into the AddressBook - await localComponents.peerStore.merge(remoteComponents.peerId, { - multiaddrs: addrs - }) - - const dialer = new DialQueue(localComponents, { - maxParallelDialsPerPeer: 10 - }) - - const transportManagerDialStub = sinon.stub(localTM, 'dial') - transportManagerDialStub.callsFake(async (ma) => { - await delay(10) - return mockConnection(mockMultiaddrConnection(mockDuplex(), remoteComponents.peerId)) - }) - - // Perform dial - await dialer.dial(remoteComponents.peerId) + // stop dials dialer.stop() - - // Dialled each address - expect(transportManagerDialStub).to.have.property('callCount', 4) - - for (let i = 0; i < addrs.length; i++) { - const call = transportManagerDialStub.getCall(i) - const ma = call.args[0] - - // should not append peerId to path multiaddrs - if (ma.toString().startsWith('/unix')) { - expect(ma.toString()).to.not.endWith(`/p2p/${remoteComponents.peerId.toString()}`) - - continue - } - - expect(ma.toString()).to.endWith(`/p2p/${remoteComponents.peerId.toString()}`) - } }) }) diff --git a/packages/libp2p/test/connection-manager/direct.spec.ts b/packages/libp2p/test/connection-manager/direct.spec.ts index 7f9d9d1ab4..7ee9976517 100644 --- a/packages/libp2p/test/connection-manager/direct.spec.ts +++ b/packages/libp2p/test/connection-manager/direct.spec.ts @@ -198,8 +198,7 @@ describe('dialing (direct, WebSockets)', () => { connectionManager = new DefaultConnectionManager(localComponents, { addressSorter: addressesSorttSpy, - maxParallelDials: 3, - maxParallelDialsPerPeer: 3 + maxParallelDials: 3 }) await connectionManager.start() @@ -216,8 +215,6 @@ describe('dialing (direct, WebSockets)', () => { .sort(defaultAddressSort) expect(localTMDialStub.getCall(0).args[0].equals(sortedAddresses[0].multiaddr)) - expect(localTMDialStub.getCall(1).args[0].equals(sortedAddresses[1].multiaddr)) - expect(localTMDialStub.getCall(2).args[0].equals(sortedAddresses[2].multiaddr)) }) it('shutting down should abort pending dials', async () => { @@ -261,32 +258,6 @@ describe('dialing (direct, WebSockets)', () => { } }) - it('should dial all multiaddrs for a passed peer id', async () => { - const addrs = [ - multiaddr(`/ip4/0.0.0.0/tcp/8000/ws/p2p/${remoteComponents.peerId.toString()}`), - multiaddr(`/ip4/0.0.0.0/tcp/8001/ws/p2p/${remoteComponents.peerId.toString()}`), - multiaddr(`/ip4/0.0.0.0/tcp/8002/ws/p2p/${remoteComponents.peerId.toString()}`) - ] - - // Inject data into the AddressBook - await localComponents.peerStore.merge(remoteComponents.peerId, { - multiaddrs: addrs - }) - - connectionManager = new DefaultConnectionManager(localComponents, { - maxParallelDialsPerPeer: 10 - }) - await connectionManager.start() - - const transactionManagerDialStub = sinon.stub(localTM, 'dial') - transactionManagerDialStub.callsFake(async (ma) => mockConnection(mockMultiaddrConnection(mockDuplex(), remoteComponents.peerId))) - - // Perform dial - await connectionManager.openConnection(remoteComponents.peerId) - - expect(transactionManagerDialStub).to.have.property('callCount', 3) - }) - it('should dial only the multiaddr that is passed', async () => { const addrs = [ multiaddr(`/ip4/0.0.0.0/tcp/8000/ws/p2p/${remoteComponents.peerId.toString()}`),