Skip to content

Commit

Permalink
refactor!: rename event emitter class
Browse files Browse the repository at this point in the history
We add types to a EventEmitter class which extends the plain js EventTarget.

The name is chosen to give familiarity to node developers but it's
not generally a good idea.

This refactor:

1. Exports a new TypedEventTarget interface which adds types to EventTarget
2. Renames EventEmitter to TypedEventEmitter to draw a distinction between them

In the future all consuming code should rely on the TypedEventTarget interface
while implemmenting code uses TypedEventEmitter as an implementation of
a TypedEventEmitter.

By depending on the interface and not the implementation consuming code is
isolated from problems that arise when two versions of `@libp2p/interface`
is in the dependency tree and the event subsystems would otherwise be
compatible due to type overlap.

This maifests as an error message about incompatible implementations of
the private `#listeners` field in EventEmitter.
  • Loading branch information
achingbrain committed Oct 25, 2023
1 parent f9d1c07 commit 7be6dfd
Show file tree
Hide file tree
Showing 81 changed files with 218 additions and 200 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { CodeError } from '@libp2p/interface/errors'
import { CustomEvent } from '@libp2p/interface/events'
import { isPeerId, type PeerId } from '@libp2p/interface/peer-id'
import { PeerMap } from '@libp2p/peer-collections'
import { peerIdFromString } from '@libp2p/peer-id'
import { isMultiaddr, type Multiaddr } from '@multiformats/multiaddr'
import { connectionPair } from './connection.js'
import type { Libp2pEvents, PendingDial } from '@libp2p/interface'
import type { Connection } from '@libp2p/interface/connection'
import type { EventEmitter } from '@libp2p/interface/events'
import type { TypedEventTarget } from '@libp2p/interface/events'
import type { PubSub } from '@libp2p/interface/pubsub'
import type { Startable } from '@libp2p/interface/startable'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
Expand All @@ -16,7 +17,7 @@ export interface MockNetworkComponents {
peerId: PeerId
registrar: Registrar
connectionManager: ConnectionManager
events: EventEmitter<Libp2pEvents>
events: TypedEventTarget<Libp2pEvents>
pubsub?: PubSub
}

Expand Down Expand Up @@ -51,7 +52,7 @@ export const mockNetwork = new MockNetwork()
export interface MockConnectionManagerComponents {
peerId: PeerId
registrar: Registrar
events: EventEmitter<Libp2pEvents>
events: TypedEventTarget<Libp2pEvents>
}

class MockConnectionManager implements ConnectionManager, Startable {
Expand Down Expand Up @@ -130,21 +131,21 @@ class MockConnectionManager implements ConnectionManager, Startable {
this.connections.push(aToB)
;(componentsB.connectionManager as MockConnectionManager).connections.push(bToA)

this.components.events.safeDispatchEvent('connection:open', {
this.components.events.dispatchEvent(new CustomEvent('connection:open', {
detail: aToB
})
}))

for (const protocol of this.components.registrar.getProtocols()) {
for (const topology of this.components.registrar.getTopologies(protocol)) {
topology.onConnect?.(componentsB.peerId, aToB)
}
}

this.components.events.safeDispatchEvent('peer:connect', { detail: componentsB.peerId })
this.components.events.dispatchEvent(new CustomEvent('peer:connect', { detail: componentsB.peerId }))

componentsB.events.safeDispatchEvent('connection:open', {
componentsB.events.dispatchEvent(new CustomEvent('connection:open', {
detail: bToA
})
}))

for (const protocol of componentsB.registrar.getProtocols()) {
for (const topology of componentsB.registrar.getTopologies(protocol)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventEmitter } from '@libp2p/interface/events'
import { TypedEventEmitter } from '@libp2p/interface/events'
import { peerDiscovery } from '@libp2p/interface/peer-discovery'
import * as PeerIdFactory from '@libp2p/peer-id-factory'
import { multiaddr } from '@multiformats/multiaddr'
Expand All @@ -12,7 +12,7 @@ interface MockDiscoveryInit {
/**
* Emits 'peer' events on discovery.
*/
export class MockDiscovery extends EventEmitter<PeerDiscoveryEvents> implements PeerDiscovery {
export class MockDiscovery extends TypedEventEmitter<PeerDiscoveryEvents> implements PeerDiscovery {
public readonly options: MockDiscoveryInit
private _isRunning: boolean
private _timer: any
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-compliance-tests/src/mocks/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { mockConnection } from './connection.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection'
import type { EventEmitter } from '@libp2p/interface/events'
import type { TypedEventTarget } from '@libp2p/interface/events'
import type { Upgrader, UpgraderOptions } from '@libp2p/interface/transport'
import type { Registrar } from '@libp2p/interface-internal/registrar'

export interface MockUpgraderInit {
registrar?: Registrar
events?: EventEmitter<Libp2pEvents>
events?: TypedEventTarget<Libp2pEvents>
}

class MockUpgrader implements Upgrader {
private readonly registrar?: Registrar
private readonly events?: EventEmitter<Libp2pEvents>
private readonly events?: TypedEventTarget<Libp2pEvents>

constructor (init: MockUpgraderInit) {
this.registrar = init.registrar
Expand Down
4 changes: 2 additions & 2 deletions packages/interface-compliance-tests/src/pubsub/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventEmitter } from '@libp2p/interface/events'
import { TypedEventEmitter } from '@libp2p/interface/events'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { pEvent } from 'p-event'
import pWaitFor from 'p-wait-for'
Expand All @@ -19,7 +19,7 @@ export async function createComponents (): Promise<MockNetworkComponents> {
const components: any = {
peerId: await createEd25519PeerId(),
registrar: mockRegistrar(),
events: new EventEmitter()
events: new TypedEventEmitter()
}
components.connectionManager = mockConnectionManager(components)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AbortError } from '@libp2p/interface/errors'
import { EventEmitter } from '@libp2p/interface/events'
import { TypedEventEmitter } from '@libp2p/interface/events'
import { expect } from 'aegir/chai'
import all from 'it-all'
import drain from 'it-drain'
Expand Down Expand Up @@ -27,7 +27,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
registrar = mockRegistrar()
upgrader = mockUpgrader({
registrar,
events: new EventEmitter()
events: new TypedEventEmitter()
});

({ addrs, transport, connector } = await common.setup())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint max-nested-callbacks: ["error", 8] */
import { CustomEvent, EventEmitter } from '@libp2p/interface/events'
import { CustomEvent, TypedEventEmitter } from '@libp2p/interface/events'
import { expect } from 'aegir/chai'
import drain from 'it-drain'
import { pipe } from 'it-pipe'
Expand Down Expand Up @@ -27,7 +27,7 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
registrar = mockRegistrar()
upgrader = mockUpgrader({
registrar,
events: new EventEmitter()
events: new TypedEventEmitter()
});

({ transport, addrs } = await common.setup())
Expand Down
21 changes: 20 additions & 1 deletion packages/interface/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,23 @@ interface Listener {
* https://github.com/microsoft/TypeScript/issues/299
* etc
*/
export class EventEmitter<EventMap extends Record<string, any>> extends EventTarget {
export interface TypedEventTarget <EventMap extends Record<string, any>> extends EventTarget {
addEventListener<K extends keyof EventMap>(type: K, listener: EventHandler<EventMap[K]> | null, options?: boolean | AddEventListenerOptions): void

listenerCount (type: string): number

removeEventListener<K extends keyof EventMap>(type: K, listener?: EventHandler<EventMap[K]> | null, options?: boolean | EventListenerOptions): void

removeEventListener (type: string, listener?: EventHandler<Event>, options?: boolean | EventListenerOptions): void

safeDispatchEvent<Detail>(type: keyof EventMap, detail: CustomEventInit<Detail>): boolean
}

/**
* An implementation of a typed event target
* etc
*/
export class TypedEventEmitter<EventMap extends Record<string, any>> extends EventTarget implements TypedEventTarget<EventMap> {
#listeners = new Map<any, Listener[]>()

listenerCount (type: string): number {
Expand Down Expand Up @@ -98,3 +114,6 @@ class CustomEventPolyfill<T = any> extends Event {
}

export const CustomEvent = globalThis.CustomEvent ?? CustomEventPolyfill

// TODO: remove this in v1
export { TypedEventEmitter as EventEmitter }
4 changes: 2 additions & 2 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import type { Connection, NewStreamOptions, Stream } from './connection/index.js'
import type { ContentRouting } from './content-routing/index.js'
import type { EventEmitter } from './events.js'
import type { TypedEventTarget } from './events.js'
import type { KeyChain } from './keychain/index.js'
import type { Metrics } from './metrics/index.js'
import type { PeerId } from './peer-id/index.js'
Expand Down Expand Up @@ -303,7 +303,7 @@ export interface PendingDial {
/**
* Libp2p nodes implement this interface.
*/
export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, EventEmitter<Libp2pEvents<T>> {
export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, TypedEventTarget<Libp2pEvents<T>> {
/**
* The PeerId is a unique identifier for a node on the network.
*
Expand Down
4 changes: 2 additions & 2 deletions packages/interface/src/peer-discovery/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { EventEmitter } from '../events.js'
import type { TypedEventTarget } from '../events.js'
import type { PeerInfo } from '../peer-info/index.js'

/**
Expand Down Expand Up @@ -26,4 +26,4 @@ export interface PeerDiscoveryEvents {
'peer': CustomEvent<PeerInfo>
}

export interface PeerDiscovery extends EventEmitter<PeerDiscoveryEvents> {}
export interface PeerDiscovery extends TypedEventTarget<PeerDiscoveryEvents> {}
6 changes: 3 additions & 3 deletions packages/interface/src/pubsub/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Stream } from '../connection/index.js'
import type { EventEmitter } from '../events.js'
import type { TypedEventTarget } from '../events.js'
import type { PeerId } from '../peer-id/index.js'
import type { Pushable } from 'it-pushable'
import type { Uint8ArrayList } from 'uint8arraylist'
Expand Down Expand Up @@ -65,7 +65,7 @@ export interface PubSubRPC {
messages: PubSubRPCMessage[]
}

export interface PeerStreams extends EventEmitter<PeerStreamEvents> {
export interface PeerStreams extends TypedEventTarget<PeerStreamEvents> {
id: PeerId
protocol: string
outboundStream?: Pushable<Uint8ArrayList>
Expand Down Expand Up @@ -152,7 +152,7 @@ export interface TopicValidatorFn {
(peer: PeerId, message: Message): TopicValidatorResult | Promise<TopicValidatorResult>
}

export interface PubSub<Events extends Record<string, any> = PubSubEvents> extends EventEmitter<Events> {
export interface PubSub<Events extends Record<string, any> = PubSubEvents> extends TypedEventTarget<Events> {
/**
* The global signature policy controls whether or not we sill send and receive
* signed or unsigned messages.
Expand Down
4 changes: 2 additions & 2 deletions packages/interface/src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Connection, MultiaddrConnection } from '../connection/index.js'
import type { EventEmitter } from '../events.js'
import type { TypedEventTarget } from '../events.js'
import type { AbortOptions } from '../index.js'
import type { StreamMuxerFactory } from '../stream-muxer/index.js'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand All @@ -11,7 +11,7 @@ export interface ListenerEvents {
'close': CustomEvent
}

export interface Listener extends EventEmitter<ListenerEvents> {
export interface Listener extends TypedEventTarget<ListenerEvents> {
/**
* Start a listener
*/
Expand Down
4 changes: 2 additions & 2 deletions packages/kad-dht/src/dual-kad-dht.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type ContentRouting, contentRouting } from '@libp2p/interface/content-routing'
import { CodeError } from '@libp2p/interface/errors'
import { EventEmitter, CustomEvent } from '@libp2p/interface/events'
import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events'
import { type PeerDiscovery, peerDiscovery, type PeerDiscoveryEvents } from '@libp2p/interface/peer-discovery'
import { type PeerRouting, peerRouting } from '@libp2p/interface/peer-routing'
import { logger } from '@libp2p/logger'
Expand Down Expand Up @@ -121,7 +121,7 @@ function multiaddrIsPublic (multiaddr: Multiaddr): boolean {
* A DHT implementation modelled after Kademlia with S/Kademlia modifications.
* Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht.
*/
export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> implements DualKadDHT, PeerDiscovery {
export class DefaultDualKadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements DualKadDHT, PeerDiscovery {
public readonly wan: DefaultKadDHT
public readonly lan: DefaultKadDHT
public readonly components: KadDHTComponents
Expand Down
4 changes: 2 additions & 2 deletions packages/kad-dht/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { DefaultDualKadDHT } from './dual-kad-dht.js'
import type { ProvidersInit } from './providers.js'
import type { Libp2pEvents, AbortOptions } from '@libp2p/interface'
import type { EventEmitter } from '@libp2p/interface/events'
import type { TypedEventTarget } from '@libp2p/interface/events'
import type { Metrics } from '@libp2p/interface/metrics'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerInfo } from '@libp2p/interface/peer-info'
Expand Down Expand Up @@ -314,7 +314,7 @@ export interface KadDHTComponents {
metrics?: Metrics
connectionManager: ConnectionManager
datastore: Datastore
events: EventEmitter<Libp2pEvents>
events: TypedEventTarget<Libp2pEvents>
}

export function kadDHT (init?: KadDHTInit): (components: KadDHTComponents) => DualKadDHT {
Expand Down
4 changes: 2 additions & 2 deletions packages/kad-dht/src/kad-dht.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CustomEvent, EventEmitter } from '@libp2p/interface/events'
import { CustomEvent, TypedEventEmitter } from '@libp2p/interface/events'
import { type Logger, logger } from '@libp2p/logger'
import pDefer from 'p-defer'
import { PROTOCOL_DHT, PROTOCOL_PREFIX, LAN_PREFIX } from './constants.js'
Expand Down Expand Up @@ -39,7 +39,7 @@ export interface SingleKadDHTInit extends KadDHTInit {
* A DHT implementation modelled after Kademlia with S/Kademlia modifications.
* Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht.
*/
export class DefaultKadDHT extends EventEmitter<PeerDiscoveryEvents> implements KadDHT {
export class DefaultKadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements KadDHT {
public protocol: string
public routingTable: RoutingTable
public providers: Providers
Expand Down
4 changes: 2 additions & 2 deletions packages/kad-dht/src/network.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CodeError } from '@libp2p/interface/errors'
import { EventEmitter, CustomEvent } from '@libp2p/interface/events'
import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { abortableDuplex } from 'abortable-iterator'
import drain from 'it-drain'
Expand Down Expand Up @@ -35,7 +35,7 @@ interface NetworkEvents {
/**
* Handle network operations for the dht
*/
export class Network extends EventEmitter<NetworkEvents> implements Startable {
export class Network extends TypedEventEmitter<NetworkEvents> implements Startable {
private readonly log: Logger
private readonly protocol: string
private running: boolean
Expand Down
4 changes: 2 additions & 2 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { setMaxListeners } from 'events'
import { AbortError } from '@libp2p/interface/errors'
import { EventEmitter, CustomEvent } from '@libp2p/interface/events'
import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerSet } from '@libp2p/peer-collections'
import { anySignal } from 'any-signal'
Expand Down Expand Up @@ -143,7 +143,7 @@ export class QueryManager implements Startable {

// query a subset of peers up to `kBucketSize / 2` in length
const startTime = Date.now()
const cleanUp = new EventEmitter<CleanUpEvents>()
const cleanUp = new TypedEventEmitter<CleanUpEvents>()

try {
if (options.isSelfQuery !== true && this.initialQuerySelfHasRun != null) {
Expand Down
6 changes: 3 additions & 3 deletions packages/kad-dht/src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { queryErrorEvent } from './events.js'
import type { CleanUpEvents } from './manager.js'
import type { QueryEvent, QueryOptions } from '../index.js'
import type { QueryFunc } from '../query/types.js'
import type { EventEmitter } from '@libp2p/interface/events'
import type { TypedEventTarget } from '@libp2p/interface/events'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Logger } from '@libp2p/logger'
import type { PeerSet } from '@libp2p/peer-collections'
Expand Down Expand Up @@ -60,7 +60,7 @@ export interface QueryPathOptions extends QueryOptions {
/**
* will emit a 'cleanup' event if the caller exits the for..await of early
*/
cleanUp: EventEmitter<CleanUpEvents>
cleanUp: TypedEventTarget<CleanUpEvents>

/**
* A timeout for queryFunc in ms
Expand Down Expand Up @@ -185,7 +185,7 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
yield * toGenerator(queue, signal, cleanUp, log)
}

async function * toGenerator (queue: Queue, signal: AbortSignal, cleanUp: EventEmitter<CleanUpEvents>, log: Logger): AsyncGenerator<QueryEvent, void, undefined> {
async function * toGenerator (queue: Queue, signal: AbortSignal, cleanUp: TypedEventTarget<CleanUpEvents>, log: Logger): AsyncGenerator<QueryEvent, void, undefined> {
let deferred = defer()
let running = true
const results: QueryEvent[] = []
Expand Down
4 changes: 2 additions & 2 deletions packages/kad-dht/src/routing-table/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventEmitter } from '@libp2p/interface/events'
import { TypedEventEmitter } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerSet } from '@libp2p/peer-collections'
import Queue from 'p-queue'
Expand Down Expand Up @@ -43,7 +43,7 @@ export interface RoutingTableEvents {
* A wrapper around `k-bucket`, to provide easy store and
* retrieval for peers.
*/
export class RoutingTable extends EventEmitter<RoutingTableEvents> implements Startable {
export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implements Startable {
public kBucketSize: number
public kb?: KBucket
public pingQueue: Queue
Expand Down
6 changes: 2 additions & 4 deletions packages/kad-dht/src/routing-table/k-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
*/

import { EventEmitter } from '@libp2p/interface/events'
import { TypedEventEmitter } from '@libp2p/interface/events'
import type { PeerId } from '@libp2p/interface/peer-id'

function arrayEquals (array1: Uint8Array, array2: Uint8Array): boolean {
Expand Down Expand Up @@ -122,10 +122,8 @@ export interface Bucket {
/**
* Implementation of a Kademlia DHT k-bucket used for storing
* contact (peer node) information.
*
* @extends EventEmitter
*/
export class KBucket extends EventEmitter<KBucketEvents> {
export class KBucket extends TypedEventEmitter<KBucketEvents> {
public localNodeId: Uint8Array
public root: Bucket
private readonly numberOfNodesPerKBucket: number
Expand Down
Loading

0 comments on commit 7be6dfd

Please sign in to comment.