Skip to content

Commit

Permalink
refactor!: remove per-peer parallel dialling (#2090)
Browse files Browse the repository at this point in the history
Since we are now dialling one peer at a time as a means of smarter dialling, we
no longer need the option to control the concurrency of parallel dials across
a single peers' multiaddrs.

Closes #2090
  • Loading branch information
maschad authored and achingbrain committed Oct 31, 2023
1 parent 5178617 commit 4101b23
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 218 deletions.
7 changes: 6 additions & 1 deletion doc/migrations/v0.46-v1.0.0.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--Specify versions for migration below-->
# Migrating to [email protected].0 <!-- omit in toc -->
# Migrating to [email protected] <!-- omit in toc -->

A migration guide for refactoring your application code from libp2p `v0.46` to `v1.0.0`.

Expand All @@ -9,6 +9,7 @@ A migration guide for refactoring your application code from libp2p `v0.46` to `
- [KeyChain](#keychain)
- [Pnet](#pnet)
- [Metrics](#metrics)
- [Connection Manager](#connection-manager)

## AutoNAT

Expand Down Expand Up @@ -77,3 +78,7 @@ The following metrics were renamed:

`libp2p_dialler_pending_dials` => `libp2p_dial_queue_pending_dials`
`libp2p_dialler_in_progress_dials` => `libp2p_dial_queue_in_progress_dials`

## Connection Manager

We now only dial one peer at a time as this is a more efficient use of resources. This means that the `maParallelDialsPerPeer` option has been removed.
5 changes: 0 additions & 5 deletions packages/libp2p/src/connection-manager/constants.defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ export const INBOUND_UPGRADE_TIMEOUT = 30e3
*/
export const MAX_PEER_ADDRS_TO_DIAL = 25

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDialsPerPeer
*/
export const MAX_PARALLEL_DIALS_PER_PEER = 1

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialInterval
*/
Expand Down
13 changes: 2 additions & 11 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import {
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS_PER_PEER,
MAX_PARALLEL_DIALS,
MAX_PEER_ADDRS_TO_DIAL,
LAST_DIAL_FAILURE_KEY
Expand Down Expand Up @@ -45,7 +44,6 @@ interface DialerInit {
addressSorter?: AddressSorter
maxParallelDials?: number
maxPeerAddrsToDial?: number
maxParallelDialsPerPeer?: number
dialTimeout?: number
resolvers?: Record<string, Resolver>
}
Expand All @@ -54,7 +52,6 @@ const defaultOptions = {
addressSorter: defaultAddressSort,
maxParallelDials: MAX_PARALLEL_DIALS,
maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL,
maxParallelDialsPerPeer: MAX_PARALLEL_DIALS_PER_PEER,
dialTimeout: DIAL_TIMEOUT,
resolvers: {
dnsaddr: dnsaddrResolver
Expand All @@ -78,7 +75,6 @@ export class DialQueue {
private readonly transportManager: TransportManager
private readonly addressSorter: AddressSorter
private readonly maxPeerAddrsToDial: number
private readonly maxParallelDialsPerPeer: number
private readonly dialTimeout: number
private readonly inProgressDialCount?: Metric
private readonly pendingDialCount?: Metric
Expand All @@ -87,7 +83,6 @@ export class DialQueue {
constructor (components: DialQueueComponents, init: DialerInit = {}) {
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout

this.peerId = components.peerId
Expand Down Expand Up @@ -421,12 +416,8 @@ export class DialQueue {
const dialAbortControllers: Array<(AbortController | undefined)> = pendingDial.multiaddrs.map(() => new AbortController())

try {
// internal peer dial queue to ensure we only dial the configured number of addresses
// per peer at the same time to prevent one peer with a lot of addresses swamping
// the dial queue
const peerDialQueue = new PQueue({
concurrency: this.maxParallelDialsPerPeer
})
// internal peer dial queue - only one dial per peer at a time
const peerDialQueue = new PQueue({ concurrency: 1 })
peerDialQueue.on('error', (err) => {
log.error('error dialling', err)
})
Expand Down
11 changes: 1 addition & 10 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import { AutoDial } from './auto-dial.js'
import { ConnectionPruner } from './connection-pruner.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { DialQueue } from './dial-queue.js'
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions } from '@libp2p/interface'
import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -92,14 +92,6 @@ export interface ConnectionManagerInit {
*/
maxParallelDials?: number

/**
* To prevent individual peers with large amounts of multiaddrs swamping the
* dial queue, this value controls how many addresses to dial in parallel per
* peer. So for example if two peers have 10 addresses and this value is set
* at 5, we will dial 5 addresses from each at a time. (default: 1)
*/
maxParallelDialsPerPeer?: number

/**
* Maximum number of addresses allowed for a given peer - if a peer has more
* addresses than this then the dial will fail. (default: 25)
Expand Down Expand Up @@ -257,7 +249,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
addressSorter: init.addressSorter ?? defaultAddressSort,
maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS,
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,
maxParallelDialsPerPeer: init.maxParallelDialsPerPeer ?? MAX_PARALLEL_DIALS_PER_PEER,
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
resolvers: init.resolvers ?? {
dnsaddr: dnsaddrResolver
Expand Down
97 changes: 0 additions & 97 deletions packages/libp2p/test/connection-manager/dial-queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,103 +186,6 @@ describe('dial queue', () => {
expect(reject).to.have.property('callCount', addrs.length)
})

it('should abort all dials when its signal is aborted', async () => {
const signals: Record<string, AbortSignal | undefined> = {}
const slowDial = async (): Promise<void> => {
await delay(1000)
}
const actions: Record<string, (...args: any[]) => Promise<any>> = {
'/ip4/127.0.0.1/tcp/1231': slowDial,
'/ip4/127.0.0.1/tcp/1232': slowDial,
'/ip4/127.0.0.1/tcp/1233': slowDial
}
const controller = new AbortController()

dialer = new DialQueue(components, {
maxParallelDials: 2,
maxParallelDialsPerPeer: 10
})

components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, options = {}) => {
const maStr = ma.toString()
const action = actions[maStr]

if (action != null) {
signals[maStr] = options.signal
return action()
}

throw new Error(`No action found for multiaddr ${maStr}`)
})

setTimeout(() => { controller.abort() }, 100)

await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)), {
signal: controller.signal
})).to.eventually.be.rejected
.with.property('name', 'CodeError')

expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', true)
expect(signals).to.not.have.property('/ip4/127.0.0.1/tcp/1233') // never dialled as above the maxParallelDials limit
})

it('should abort other dials when one succeeds', async () => {
const remotePeer = await createEd25519PeerId()
const connection1 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
const connection2 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
const connection3 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
const actions: Record<string, () => Promise<Connection>> = {
'/ip4/127.0.0.1/tcp/1231': async () => {
// Slow dial
await delay(1000)

return connection1
},
'/ip4/127.0.0.1/tcp/1232': async () => {
// Fast dial
await delay(10)

return connection2
},
'/ip4/127.0.0.1/tcp/1233': async () => {
// Slow dial
await delay(1000)

return connection3
}
}
const signals: Record<string, AbortSignal> = {}

components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, opts = {}) => {
const maStr = ma.toString()
const action = actions[maStr]

if (action != null) {
signals[maStr] = opts.signal
return action()
}

throw new Error(`No action found for multiaddr ${maStr}`)
})

dialer = new DialQueue(components, {
maxParallelDials: 50,
maxParallelDialsPerPeer: 10
})

await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)))).to.eventually.equal(connection2)

// Dial attempt finished without connection
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
// Dial attempt led to connection
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', false)
// Dial attempt finished without connection
expect(signals['/ip4/127.0.0.1/tcp/1233']).to.have.property('aborted', true)
})

it('should ignore DNS addresses for other peers', async () => {
const remotePeer = await createEd25519PeerId()
const otherRemotePeer = await createEd25519PeerId()
Expand Down
100 changes: 36 additions & 64 deletions packages/libp2p/test/connection-manager/direct.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import { MemoryDatastore } from 'datastore-core/memory'
import delay from 'delay'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import pDefer from 'p-defer'
import pWaitFor from 'p-wait-for'
import sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
Expand Down Expand Up @@ -218,84 +217,57 @@ describe('dialing (direct, TCP)', () => {
.and.to.have.property('code', ERR_TIMEOUT)
})

it('should dial to the max concurrency', async () => {
const peerId = await createEd25519PeerId()
const addrs = [
multiaddr('/ip4/0.0.0.0/tcp/8000'),
multiaddr('/ip4/0.0.0.0/tcp/8001'),
multiaddr('/ip4/0.0.0.0/tcp/8002')
]
it('should only dial to the max concurrency', async () => {
const peerId1 = await createEd25519PeerId()
const peerId2 = await createEd25519PeerId()
const peerId3 = await createEd25519PeerId()

const addr1 = multiaddr(`/ip4/127.0.0.1/tcp/1234/p2p/${peerId1}`)
const addr2 = multiaddr(`/ip4/127.0.12.4/tcp/3210/p2p/${peerId2}`)
const addr3 = multiaddr(`/ip4/123.3.11.1/tcp/2010/p2p/${peerId3}`)

const slowDial = async (): Promise<Connection> => {
await delay(100)
return mockConnection(mockMultiaddrConnection(mockDuplex(), peerId1))
}

const actions: Record<string, (...args: any[]) => Promise<any>> = {
[addr1.toString()]: slowDial,
[addr2.toString()]: slowDial,
[addr3.toString()]: async () => mockConnection(mockMultiaddrConnection(mockDuplex(), peerId3))
}

const dialer = new DialQueue(localComponents, {
maxParallelDials: 2,
maxParallelDialsPerPeer: 10
maxParallelDials: 2
})

const deferredDial = pDefer<Connection>()
const transportManagerDialStub = sinon.stub(localTM, 'dial')
transportManagerDialStub.callsFake(async () => deferredDial.promise)
transportManagerDialStub.callsFake(async ma => {
const maStr = ma.toString()
const action = actions[maStr]

// Perform 3 multiaddr dials
void dialer.dial(addrs)

// Let the call stack run
await delay(0)
if (action != null) {
return action()
}

// We should have 2 in progress, and 1 waiting
expect(transportManagerDialStub).to.have.property('callCount', 2)
throw new Error(`No action found for multiaddr ${maStr}`)

Check warning on line 253 in packages/libp2p/test/connection-manager/direct.node.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/test/connection-manager/direct.node.ts#L253

Added line #L253 was not covered by tests
})

deferredDial.resolve(mockConnection(mockMultiaddrConnection(mockDuplex(), peerId)))
// dial 3 different peers
void Promise.all([
dialer.dial(addr1),
dialer.dial(addr2),
dialer.dial(addr3)
])

// Let the call stack run
await delay(0)

// Only two dials should be executed, as the first dial will succeed
// We should have 2 in progress, and 1 waiting
expect(transportManagerDialStub).to.have.property('callCount', 2)
})

it('should append the remote peerId to multiaddrs', async () => {
const addrs = [
multiaddr('/ip4/0.0.0.0/tcp/8000'),
multiaddr('/ip4/0.0.0.0/tcp/8001'),
multiaddr('/ip4/0.0.0.0/tcp/8002'),
multiaddr('/unix/tmp/some/path.sock')
]

// Inject data into the AddressBook
await localComponents.peerStore.merge(remoteComponents.peerId, {
multiaddrs: addrs
})

const dialer = new DialQueue(localComponents, {
maxParallelDialsPerPeer: 10
})

const transportManagerDialStub = sinon.stub(localTM, 'dial')
transportManagerDialStub.callsFake(async (ma) => {
await delay(10)
return mockConnection(mockMultiaddrConnection(mockDuplex(), remoteComponents.peerId))
})

// Perform dial
await dialer.dial(remoteComponents.peerId)
// stop dials
dialer.stop()

// Dialled each address
expect(transportManagerDialStub).to.have.property('callCount', 4)

for (let i = 0; i < addrs.length; i++) {
const call = transportManagerDialStub.getCall(i)
const ma = call.args[0]

// should not append peerId to path multiaddrs
if (ma.toString().startsWith('/unix')) {
expect(ma.toString()).to.not.endWith(`/p2p/${remoteComponents.peerId.toString()}`)

continue
}

expect(ma.toString()).to.endWith(`/p2p/${remoteComponents.peerId.toString()}`)
}
})
})

Expand Down
Loading

0 comments on commit 4101b23

Please sign in to comment.