diff --git a/.coveralls.yml b/.coveralls.yml new file mode 100644 index 0000000..e69de29 diff --git a/README.md b/README.md index dccfb94..73bde91 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,12 @@ This library bundles different components for lower-level peer-to-peer connectio - Light Ethereum Subprotocol (LES/2) The library is based on [ethereumjs/node-devp2p](https://github.com/ethereumjs/node-devp2p) as well -as other sub-libraries (``node-*`` named) (all outdated). +as other sub-libraries (`node-*` named) (all outdated). ## Run/Build -This library has to be compiled with babel to a ``Node 6`` friendly source format. -For triggering a (first) build to create the ``lib/`` directory run: +This library has to be compiled with babel to a `Node 6` friendly source format. +For triggering a (first) build to create the `lib/` directory run: ``` npm run build @@ -32,7 +32,7 @@ node -r babel-register [YOUR_SCRIPT_TO_RUN.js] ## Usage/Examples -All components of this library are implemented as Node ``EventEmitter`` objects +All components of this library are implemented as Node `EventEmitter` objects and make heavy use of the Node.js network stack. You can react on events from the network like this: @@ -45,11 +45,11 @@ dpt.on('peer:added', (peer) => { Basic example to connect to some bootstrap nodes and get basic peer info: - - [simple](examples/simple.js) +- [simple](examples/simple.js) Communicate with peers to read new transaction and block information: - - [peer-communication](examples/peer-communication.js) +- [peer-communication](examples/peer-communication.js) Run an example with: @@ -59,9 +59,62 @@ node -r babel-register ./examples/peer-communication.js ## Distributed Peer Table (DPT) / Node Discovery -Maintain/manage a list of peers, see [./src/dpt/](./src/dpt/), also +Maintain/manage a list of peers, see [./src/dpt/](./src/dpt/), also includes node discovery ([./src/dpt/server.js](./src/dpt/server.js)) + +## Branches +- [master](https://github.com/ethereumjs/ethereumjs-devp2p) +- [discovery-v5](https://github.com/tcsiwula/ethereumjs-devp2p/tree/discovery-v5) (wip) +- [discv4-enr-extension-support](https://github.com/tcsiwula/ethereumjs-devp2p/tree/discv4-enr-extension-support) (wip) +- [les](https://github.com/ethereumjs/ethereumjs-devp2p) (todo) + + +### Node discovery v5 support (dscv5) + +Node discovery v5 (dscv5) support is turned off by default. + +It is currently in development on this branch: +[github.com/tcsiwula/ethereumjs-devp2p/tree/discovery-v5](https://github.com/tcsiwula/ethereumjs-devp2p/tree/discovery-v5). + +#### Running node discovery v5 + +Try the experimental discv5 peer-communication.js example. +Run the following: + +``` +npm install +npm run build +npm run v5 +``` + +or: + +``` +npm install +npm run build +node -r babel-register ./examples/peer-communication.js 5 +``` + +#### Running node discovery v4 + +Try the stable discv4 peer-communication.js example. +Run the following: + +``` +npm install +npm run build +npm run v4 +``` + +or: + +``` +npm install +npm run build +node -r babel-register ./examples/peer-communication.js 4 +``` + ### Usage Create your peer table: @@ -76,7 +129,7 @@ const dpt = new DPT(Buffer.from(PRIVATE_KEY, 'hex'), { }) ``` -Add some bootstrap nodes (or some custom nodes with ``dpt.addPeer()``): +Add some bootstrap nodes (or some custom nodes with `dpt.addPeer()`): ``` dpt.bootstrap(bootnode).catch((err) => console.error('Something went wrong!')) @@ -84,42 +137,48 @@ dpt.bootstrap(bootnode).catch((err) => console.error('Something went wrong!')) ### API - #### `DPT` (extends `EventEmitter`) -Distributed Peer Table. Manages a Kademlia DHT K-bucket (``Kbucket``) for storing peer information -and a ``BanList`` for keeping a list of bad peers. ``Server`` implements the node discovery (``ping``, -``pong``, ``findNeighbours``). + +Distributed Peer Table. Manages a Kademlia DHT K-bucket (`Kbucket`) for storing peer information +and a `BanList` for keeping a list of bad peers. `Server` implements the node discovery (`ping`, +`pong`, `findNeighbours`). ##### `new DPT(privateKey, options)` + Creates new DPT object + - `privateKey` - Key for message encoding/signing. -- `options.refreshInterval` - Interval in ms for refreshing (calling ``findNeighbours``) the peer list (default: ``60s``). -- `options.createSocket` - A datagram (dgram) ``createSocket`` function, passed to ``Server`` (default: ``dgram.createSocket.bind(null, 'udp4')``). -- `options.timeout` - Timeout in ms for server ``ping``, passed to ``Server`` (default: ``10s``). -- `options.endpoint` - Endpoint information to send with the server ``ping``, passed to ``Server`` (default: ``{ address: '0.0.0.0', udpPort: null, tcpPort: null }``). +- `options.refreshInterval` - Interval in ms for refreshing (calling `findNeighbours`) the peer list (default: `60s`). +- `options.createSocket` - A datagram (dgram) `createSocket` function, passed to `Server` (default: `dgram.createSocket.bind(null, 'udp4')`). +- `options.timeout` - Timeout in ms for server `ping`, passed to `Server` (default: `10s`). +- `options.endpoint` - Endpoint information to send with the server `ping`, passed to `Server` (default: `{ address: '0.0.0.0', udpPort: null, tcpPort: null }`). + +#### `dpt.bootstrap(peer)` (`async`) + +Uses a peer as new bootstrap peer and calls `findNeighbouts`. + +- `peer` - Peer to be added, format `{ address: [ADDRESS], udpPort: [UDPPORT], tcpPort: [TCPPORT] }`. -#### `dpt.bootstrap(peer)` (``async``) -Uses a peer as new bootstrap peer and calls ``findNeighbouts``. -- `peer` - Peer to be added, format ``{ address: [ADDRESS], udpPort: [UDPPORT], tcpPort: [TCPPORT] }``. +#### `dpt.addPeer(object)` (`async`) -#### `dpt.addPeer(object)` (``async``) Adds a new peer. -- `object` - Peer to be added, format ``{ address: [ADDRESS], udpPort: [UDPPORT], tcpPort: [TCPPORT] }``. -For other utility functions like ``getPeer``, ``getPeers`` see [./src/dpt/index.js](./src/dpt/index.js). +- `object` - Peer to be added, format `{ address: [ADDRESS], udpPort: [UDPPORT], tcpPort: [TCPPORT] }`. + +For other utility functions like `getPeer`, `getPeers` see [./src/dpt/index.js](./src/dpt/index.js). ### Events Events emitted: -| Event | Description | -| ------------- |:----------------------------------------:| -| peer:added | Peer added to DHT bucket | -| peer:removed | Peer removed from DHT bucket | -| peer:new | New peer added | -| listening | Forwarded from server | -| close | Forwarded from server | -| error | Forwarded from server | +| Event | Description | +| ------------ | :--------------------------: | +| peer:added | Peer added to DHT bucket | +| peer:removed | Peer removed from DHT bucket | +| peer:new | New peer added | +| listening | Forwarded from server | +| close | Forwarded from server | +| error | Forwarded from server | ### Reference @@ -133,7 +192,7 @@ Connect to a peer, organize the communication, see [./src/rlpx/](./src/rlpx/) ### Usage -Create your ``RLPx`` object, e.g.: +Create your `RLPx` object, e.g.: ``` const rlpx = new devp2p.RLPx(PRIVATE_KEY, { @@ -150,38 +209,42 @@ const rlpx = new devp2p.RLPx(PRIVATE_KEY, { ### API #### `RLPx` (extends `EventEmitter`) -Manages the handshake (`ECIES`) and the handling of the peer communication (``Peer``). + +Manages the handshake (`ECIES`) and the handling of the peer communication (`Peer`). ##### `new RLPx(privateKey, options)` + Creates new RLPx object + - `privateKey` - Key for message encoding/signing. -- `options.timeout` - Peer `ping` timeout in ms (default: ``10s``). -- `options.maxPeers` - Max number of peer connections (default: ``10``). -- `options.clientId` - Client ID string (default example: ``ethereumjs-devp2p/v2.1.3/darwin-x64/nodejs``). +- `options.timeout` - Peer `ping` timeout in ms (default: `10s`). +- `options.maxPeers` - Max number of peer connections (default: `10`). +- `options.clientId` - Client ID string (default example: `ethereumjs-devp2p/v2.1.3/darwin-x64/nodejs`). - `options.remoteClientIdFilter` - Optional list of client ID filter strings (e.g. `['go1.5', 'quorum']`). - `options.capabilities` - Upper layer protocol capabilities, e.g. `[devp2p.ETH.eth63, devp2p.ETH.eth62]`. -- `options.listenPort` - The listening port for the server or ``null`` for default. -- `options.dpt` - `DPT` object for the peers to connect to (default: ``null``, no `DPT` peer management). +- `options.listenPort` - The listening port for the server or `null` for default. +- `options.dpt` - `DPT` object for the peers to connect to (default: `null`, no `DPT` peer management). + +#### `rlpx.connect(peer)` (`async`) -#### `rlpx.connect(peer)` (``async``) Manually connect to peer without `DPT`. -- `peer` - Peer to connect to, format ``{ id: PEER_ID, address: PEER_ADDRESS, port: PEER_PORT }``. -For other connection/utility functions like ``listen``, ``getPeers`` see [./src/rlpx/index.js](./src/rlpx/index.js). +- `peer` - Peer to connect to, format `{ id: PEER_ID, address: PEER_ADDRESS, port: PEER_PORT }`. + +For other connection/utility functions like `listen`, `getPeers` see [./src/rlpx/index.js](./src/rlpx/index.js). ### Events Events emitted: -| Event | Description | -| ------------- |:----------------------------------------:| -| peer:added | Handshake with peer successful | -| peer:removed | Disconnected from peer | -| peer:error | Error connecting to peer | -| listening | Forwarded from server | -| close | Forwarded from server | -| error | Forwarded from server | - +| Event | Description | +| ------------ | :----------------------------: | +| peer:added | Handshake with peer successful | +| peer:removed | Disconnected from peer | +| peer:error | Error connecting to peer | +| listening | Forwarded from server | +| close | Forwarded from server | +| error | Forwarded from server | ### Reference @@ -194,7 +257,7 @@ Upper layer protocol for exchanging Ethereum network data like block headers or ### Usage -Send the initial status message with ``sendStatus()``, then wait for the corresponding `status` message +Send the initial status message with `sendStatus()`, then wait for the corresponding `status` message to arrive to start the communication. ``` @@ -204,7 +267,7 @@ eth.once('status', () => { }) ``` -Wait for follow-up messages to arrive, send your responses. +Wait for follow-up messages to arrive, send your responses. ``` eth.on('message', async (code, payload) => { @@ -214,26 +277,33 @@ eth.on('message', async (code, payload) => { }) ``` -See the ``peer-communication.js`` example for a more detailed use case. +See the `peer-communication.js` example for a more detailed use case. ### API #### `ETH` (extends `EventEmitter`) + Handles the different message types like `NEW_BLOCK_HASHES` or `GET_NODE_DATA` (see `MESSAGE_CODES`) for a complete list. Currently protocol versions `PV62` and `PV63` are supported. ##### `new ETH(privateKey, options)` -Normally not instantiated directly but created as a ``SubProtocol`` in the ``Peer`` object. + +Normally not instantiated directly but created as a `SubProtocol` in the `Peer` object. + - `version` - The protocol version for communicating, e.g. `63`. - `peer` - `Peer` object to communicate with. -- `send` - Wrapped ``peer.sendMessage()`` function where the communication is routed to. +- `send` - Wrapped `peer.sendMessage()` function where the communication is routed to. #### `eth.sendStatus(status)` + Send initial status message. -- `status` - Status message to send, format ``{ networkId: CHAIN_ID, td: TOTAL_DIFFICULTY_BUFFER, bestHash: BEST_HASH_BUFFER, genesisHash: GENESIS_HASH_BUFFER }``. + +- `status` - Status message to send, format `{ networkId: CHAIN_ID, td: TOTAL_DIFFICULTY_BUFFER, bestHash: BEST_HASH_BUFFER, genesisHash: GENESIS_HASH_BUFFER }`. #### `eth.sendMessage(code, payload)` + Send initial status message. + - `code` - The message code, see `MESSAGE_CODES` for available message types. - `payload` - Payload as a list, will be rlp-encoded. @@ -241,10 +311,10 @@ Send initial status message. Events emitted: -| Event | Description | -| ------------- |:----------------------------------------:| -| message | Message received | -| status | Status info received | +| Event | Description | +| ------- | :------------------: | +| message | Message received | +| status | Status info received | ### Reference @@ -256,7 +326,7 @@ Upper layer protocol used by light clients, see [./src/les/](./src/les/). ### Usage -Send the initial status message with ``sendStatus()``, then wait for the corresponding `status` message +Send the initial status message with `sendStatus()`, then wait for the corresponding `status` message to arrive to start the communication. ``` @@ -266,7 +336,7 @@ les.once('status', () => { }) ``` -Wait for follow-up messages to arrive, send your responses. +Wait for follow-up messages to arrive, send your responses. ``` les.on('message', async (code, payload) => { @@ -276,26 +346,33 @@ les.on('message', async (code, payload) => { }) ``` -See the ``peer-communication-les.js`` example for a more detailed use case. +See the `peer-communication-les.js` example for a more detailed use case. ### API #### `LES` (extends `EventEmitter`) + Handles the different message types like `BLOCK_HEADERS` or `GET_PROOFS_V2` (see `MESSAGE_CODES`) for -a complete list. Currently protocol version ``LES/2`` running in client-mode is supported. +a complete list. Currently protocol version `LES/2` running in client-mode is supported. ##### `new LES(privateKey, options)` -Normally not instantiated directly but created as a ``SubProtocol`` in the ``Peer`` object. + +Normally not instantiated directly but created as a `SubProtocol` in the `Peer` object. + - `version` - The protocol version for communicating, e.g. `2`. - `peer` - `Peer` object to communicate with. -- `send` - Wrapped ``peer.sendMessage()`` function where the communication is routed to. +- `send` - Wrapped `peer.sendMessage()` function where the communication is routed to. #### `les.sendStatus(status)` + Send initial status message. -- `status` - Status message to send, format ``{ networkId: CHAIN_ID, headTd: TOTAL_DIFFICULTY_BUFFER, headHash: HEAD_HASH_BUFFER, headNum: HEAD_NUM_BUFFER, genesisHash: GENESIS_HASH_BUFFER }``. + +- `status` - Status message to send, format `{ networkId: CHAIN_ID, headTd: TOTAL_DIFFICULTY_BUFFER, headHash: HEAD_HASH_BUFFER, headNum: HEAD_NUM_BUFFER, genesisHash: GENESIS_HASH_BUFFER }`. #### `les.sendMessage(code, reqId, payload)` + Send initial status message. + - `code` - The message code, see `MESSAGE_CODES` for available message types. - `reqId` - Request ID, will be echoed back on response. - `payload` - Payload as a list, will be rlp-encoded. @@ -304,10 +381,10 @@ Send initial status message. Events emitted: -| Event | Description | -| ------------- |:----------------------------------------:| -| message | Message received | -| status | Status info received | +| Event | Description | +| ------- | :------------------: | +| message | Message received | +| status | Status info received | ### Reference @@ -315,7 +392,7 @@ Events emitted: ## Tests -There are unit tests in the ``test/`` directory which can be run with: +There are unit tests in the `test/` directory which can be run with: ``` npm run test @@ -325,8 +402,8 @@ npm run test This library uses [debug](https://github.com/visionmedia/debug) debugging utility package. -For the debugging output to show up, set the ``DEBUG`` environment variable (e.g. in Linux/Mac OS: -``export DEBUG=*,-babel``). +For the debugging output to show up, set the `DEBUG` environment variable (e.g. in Linux/Mac OS: +`export DEBUG=*,-babel`). You should now see debug output like to following when running one of the examples above (the indented lines): @@ -343,7 +420,7 @@ Remove peer: 52.169.42.101:30303 (peer disconnect, reason code: 16) (total: 1) ### Other Implementations -The following is a list of major implementations of the ``devp2p`` stack in other languages: +The following is a list of major implementations of the `devp2p` stack in other languages: - [pydevp2p](https://github.com/ethereum/pydevp2p) (Python) - [Go Ethereum](https://github.com/ethereum/go-ethereum/tree/master/p2p) (Go) @@ -351,7 +428,7 @@ The following is a list of major implementations of the ``devp2p`` stack in othe ### Links -- [Blog article series](https://ocalog.com/post/10/) on implementing Ethereum protocol stack +- [Blog article series](https://ocalog.com/post/10/) on implementing Ethereum protocol stack ## License diff --git a/examples/bootstrapNodes.json b/examples/bootstrapNodes.json new file mode 100644 index 0000000..08811e2 --- /dev/null +++ b/examples/bootstrapNodes.json @@ -0,0 +1,47 @@ +[ + { + "ip": "52.16.188.185", + "port": "30303", + "id": "a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node1", + "comment": "MAINNET_BOOTNODES_Node1" + }, + { + "ip": "13.93.211.84", + "port": "30303", + "id": "aa36fdf33dd030378a0168efe6ed7d5cc587fafa3cdd375854fe735a2e11ea3650ba29644e2db48368c46e1f60e716300ba49396cd63778bf8a818c09bded46f", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node2", + "comment": "MAINNET_BOOTNODES_Node2" + }, + { + "ip": "191.235.84.50", + "port": "30303", + "id": "78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node3", + "comment": "MAINNET_BOOTNODES_Node3" + }, + { + "ip": "13.75.154.138", + "port": "30303", + "id": "158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node4", + "comment": "MAINNET_BOOTNODES_Node4" + }, + { + "ip": "52.74.57.123", + "port": "30303", + "id": "1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node5", + "comment": "MAINNET_BOOTNODES_Node5" + } +] diff --git a/examples/discv5/peer-communication.js b/examples/discv5/peer-communication.js new file mode 100644 index 0000000..35a81a6 --- /dev/null +++ b/examples/discv5/peer-communication.js @@ -0,0 +1,375 @@ +// run it: +// npm install +// node -r babel-register ./examples/peer-communication.js + +const devp2p = require('../src') +const EthereumTx = require('ethereumjs-tx') +const EthereumBlock = require('ethereumjs-block') +const LRUCache = require('lru-cache') +const ms = require('ms') +const chalk = require('chalk') +const assert = require('assert') +const { randomBytes } = require('crypto') +const rlp = require('rlp-encoding') +const Buffer = require('safe-buffer').Buffer + +const PRIVATE_KEY = randomBytes(32) +const CHAIN_ID = 1 + +const BOOTNODES = require('./bootstrapNodes.json') + .filter(node => { + return node.chainId === CHAIN_ID + }) + .map(node => { + return { + address: node.ip, + udpPort: node.port, + tcpPort: node.port + } + }) + +const CHECK_BLOCK_TITLE = 'Byzantium Fork' // Only for debugging/console output +const CHECK_BLOCK_NR = 4370000 +const CHECK_BLOCK = + 'b1fcff633029ee18ab6482b58ff8b6e95dd7c82a954c852157152a7a6d32785e' +const CHECK_BLOCK_HEADER = rlp.decode( + Buffer.from( + 'f9020aa0a0890da724dd95c90a72614c3a906e402134d3859865f715f5dfb398ac00f955a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942a65aca4d5fc5b5c859090a6c34d164135398226a074cccff74c5490fbffc0e6883ea15c0e1139e2652e671f31f25f2a36970d2f87a00e750bf284c2b3ed1785b178b6f49ff3690a3a91779d400de3b9a3333f699a80a0c68e3e82035e027ade5d966c36a1d49abaeec04b83d64976621c355e58724b8bb90100040019000040000000010000000000021000004020100688001a05000020816800000010a0000100201400000000080100020000000400080000800004c0200000201040000000018110400c000000200001000000280000000100000010010080000120010000050041004000018000204002200804000081000011800022002020020140000000020005080001800000000008102008140008600000000100000500000010080082002000102080000002040120008820400020100004a40801000002a0040c000010000114000000800000050008300020100000000008010000000100120000000040000000808448200000080a00000624013000000080870552416761fabf83475b02836652b383661a72845a25c530894477617266506f6f6ca0dc425fdb323c469c91efac1d2672dfdd3ebfde8fa25d68c1b3261582503c433788c35ca7100349f430', + 'hex' + ) +) + +const getPeerAddr = peer => + `${peer._socket.remoteAddress}:${peer._socket.remotePort}` + +// set the default version to 4 +let VERSION = devp2p._util.v4 + +// option to run version 5 via cli: node -r babel-register ./examples/peer-communication.js 5 +const cliVersion = process.argv[2] + +if (cliVersion == 5) { + VERSION = devp2p._util.v5 +} + +// DPT +const dpt = new devp2p.DPT(PRIVATE_KEY, { + refreshInterval: 30000, + version: VERSION, + endpoint: { + address: '0.0.0.0', + udpPort: null, + tcpPort: null + } +}) + +dpt.on('error', err => console.error(chalk.red(`DPT error: ${err}`))) + +// RLPx +const rlpx = new devp2p.RLPx(PRIVATE_KEY, { + dpt: dpt, + maxPeers: 25, + capabilities: [devp2p.ETH.eth63, devp2p.ETH.eth62], + listenPort: null +}) + +rlpx.on('error', err => + console.error(chalk.red(`RLPx error: ${err.stack || err}`)) +) + +rlpx.on('peer:added', peer => { + const addr = getPeerAddr(peer) + const eth = peer.getProtocols()[0] + const requests = { headers: [], bodies: [], msgTypes: {} } + + const clientId = peer.getHelloMessage().clientId + + console.log( + chalk.green( + `Add peer: ${addr} ${clientId} (eth${eth.getVersion()}) (total: ${ + rlpx.getPeers().length + })` + ) + ) + + eth.sendStatus({ + networkId: CHAIN_ID, + td: devp2p._util.int2buffer(17179869184), // total difficulty in genesis block + bestHash: Buffer.from( + 'd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3', + 'hex' + ), + genesisHash: Buffer.from( + 'd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3', + 'hex' + ) + }) + + // check CHECK_BLOCK + let forkDrop = null + let forkVerified = false + eth.once('status', () => { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS, [ + CHECK_BLOCK_NR, + 1, + 0, + 0 + ]) + forkDrop = setTimeout(() => { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) + }, ms('15s')) + peer.once('close', () => clearTimeout(forkDrop)) + }) + + eth.on('message', async (code, payload) => { + if (code in requests.msgTypes) { + requests.msgTypes[code] += 1 + } else { + requests.msgTypes[code] = 1 + } + + switch (code) { + case devp2p.ETH.MESSAGE_CODES.NEW_BLOCK_HASHES: + if (!forkVerified) break + + for (let item of payload) { + const blockHash = item[0] + if (blocksCache.has(blockHash.toString('hex'))) continue + setTimeout(() => { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS, [ + blockHash, + 1, + 0, + 0 + ]) + requests.headers.push(blockHash) + }, ms('0.1s')) + } + break + + case devp2p.ETH.MESSAGE_CODES.TX: + if (!forkVerified) break + + for (let item of payload) { + const tx = new EthereumTx(item) + if (isValidTx(tx)) onNewTx(tx, peer) + } + + break + + case devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS: + const headers = [] + // hack + if (devp2p._util.buffer2int(payload[0]) === CHECK_BLOCK_NR) { + headers.push(CHECK_BLOCK_HEADER) + } + + if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) + } else { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.BLOCK_HEADERS, headers) + } + break + + case devp2p.ETH.MESSAGE_CODES.BLOCK_HEADERS: + if (!forkVerified) { + if (payload.length !== 1) { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) + break + } + + const expectedHash = CHECK_BLOCK + const header = new EthereumBlock.Header(payload[0]) + if (header.hash().toString('hex') === expectedHash) { + clearTimeout(forkDrop) + forkVerified = true + } + } else { + let isValidPayload = false + const header = new EthereumBlock.Header(payload[0]) + while (requests.headers.length > 0) { + const blockHash = requests.headers.shift() + if (header.hash().equals(blockHash)) { + isValidPayload = true + setTimeout(() => { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_BODIES, [ + blockHash + ]) + requests.bodies.push(header) + }, ms('0.1s')) + break + } + } + + if (!isValidPayload) { + console.log( + `${addr} received wrong block header ${header + .hash() + .toString('hex')}` + ) + } + } + + break + + case devp2p.ETH.MESSAGE_CODES.GET_BLOCK_BODIES: + if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) + } else { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.BLOCK_BODIES, []) + } + break + + case devp2p.ETH.MESSAGE_CODES.BLOCK_BODIES: + if (!forkVerified) break + + if (payload.length !== 1) { + console.log( + `${addr} not more than one block body expected (received: ${ + payload.length + })` + ) + break + } + + let isValidPayload = false + while (requests.bodies.length > 0) { + const header = requests.bodies.shift() + const block = new EthereumBlock([ + header.raw, + payload[0][0], + payload[0][1] + ]) + const isValid = await isValidBlock(block) + if (isValid) { + isValidPayload = true + onNewBlock(block, peer) + break + } + } + + if (!isValidPayload) { + console.log(`${addr} received wrong block body`) + } + + break + + case devp2p.ETH.MESSAGE_CODES.NEW_BLOCK: + if (!forkVerified) break + + const newBlock = new EthereumBlock(payload[0]) + const isValidNewBlock = await isValidBlock(newBlock) + if (isValidNewBlock) onNewBlock(newBlock, peer) + + break + + case devp2p.ETH.MESSAGE_CODES.GET_NODE_DATA: + if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) + } else { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.NODE_DATA, []) + } + break + + case devp2p.ETH.MESSAGE_CODES.NODE_DATA: + break + + case devp2p.ETH.MESSAGE_CODES.GET_RECEIPTS: + if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) + } else { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.RECEIPTS, []) + } + break + + case devp2p.ETH.MESSAGE_CODES.RECEIPTS: + break + } + }) +}) + +rlpx.on('peer:removed', (peer, reasonCode, disconnectWe) => { + const who = disconnectWe ? 'we disconnect' : 'peer disconnect' + const total = rlpx.getPeers().length + console.log( + chalk.yellow( + `Remove peer: ${getPeerAddr( + peer + )} - ${who}, reason: ${peer.getDisconnectPrefix(reasonCode)} (${String( + reasonCode + )}) (total: ${total})` + ) + ) +}) + +rlpx.on('peer:error', (peer, err) => { + if (err.code === 'ECONNRESET') return + if (err instanceof assert.AssertionError) { + const peerId = peer.getId() + if (peerId !== null) dpt.banPeer(peerId, ms('5m')) + console.error( + chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.message}`) + ) + return + } + console.error( + chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.stack || err}`) + ) +}) + +// // uncomment, if you want accept incoming connections +// rlpx.listen(30303, '0.0.0.0') +// dpt.bind(30303, '0.0.0.0') + +for (let bootnode of BOOTNODES) { + dpt.bootstrap(bootnode).catch(err => { + console.error(chalk.bold.red(`DPT bootstrap error: ${err.stack || err}`)) + }) +} + +const txCache = new LRUCache({ max: 1000 }) +function onNewTx (tx, peer) { + const txHashHex = tx.hash().toString('hex') + if (txCache.has(txHashHex)) return + txCache.set(txHashHex, true) +} + +const blocksCache = new LRUCache({ max: 100 }) +function onNewBlock (block, peer) { + const blockHashHex = block.hash().toString('hex') + const blockNumber = devp2p._util.buffer2int(block.header.number) + if (blocksCache.has(blockHashHex)) return + blocksCache.set(blockHashHex, true) + for (let tx of block.transactions) onNewTx(tx, peer) +} + +function isValidTx (tx) { + return tx.validate(false) +} + +async function isValidBlock (block) { + if (!block.validateUnclesHash()) return false + if (!block.transactions.every(isValidTx)) return false + return new Promise((resolve, reject) => { + block.genTxTrie(() => { + try { + resolve(block.validateTransactionsTrie()) + } catch (err) { + reject(err) + } + }) + }) +} + +setInterval(() => { + const peersCount = dpt.getPeers().length + const openSlots = rlpx._getOpenSlots() + const queueLength = rlpx._peersQueue.length + const queueLength2 = rlpx._peersQueue.filter(o => o.ts <= Date.now()).length + + console.log( + chalk.yellow( + `Total nodes in DPT: ${peersCount}, open slots: ${openSlots}, queue: ${queueLength} / ${queueLength2}` + ) + ) +}, ms('30s')) diff --git a/examples/discv5/simple.js b/examples/discv5/simple.js new file mode 100644 index 0000000..d146855 --- /dev/null +++ b/examples/discv5/simple.js @@ -0,0 +1,52 @@ +const chalk = require('chalk') +const { DISCV5 } = require('../../src') +const Buffer = require('safe-buffer').Buffer + +const PRIVATE_KEY = + 'd772e3d6a001a38064dd23964dd2836239fa0e6cec8b28972a87460a17210fe9' +const BOOTNODES = require('./../bootstrapNodes.json').map(node => { + return { + address: node.ip, + udpPort: node.port, + tcpPort: node.port + } +}) + +const discv5 = new DISCV5(Buffer.from(PRIVATE_KEY, 'hex'), { + version: '5', + endpoint: { + address: '0.0.0.0', + udpPort: null, + tcpPort: null + } +}) + +discv5.on('error', err => console.error(chalk.red(err.stack || err))) + +discv5.on('peer:added', peer => { + const info = `(${peer.id.toString('hex')},${peer.address},${peer.udpPort},${ + peer.tcpPort + })` + console.log( + chalk.green(`New peer: ${info} (total: ${discv5.getPeers().length})`) + ) +}) + +discv5.on('peer:removed', peer => { + console.log( + chalk.yellow( + `Remove peer: ${peer.id.toString('hex')} (total: ${ + discv5.getPeers().length + })` + ) + ) +}) + +// for accept incoming connections uncomment next line +discv5.bind(30303, '0.0.0.0') + +for (let bootnode of BOOTNODES) { + discv5 + .bootstrap(bootnode) + .catch(err => console.error(chalk.bold.red(err.stack || err))) +} diff --git a/examples/peer-communication.js b/examples/peer-communication.js index c71cdd5..85ca86e 100644 --- a/examples/peer-communication.js +++ b/examples/peer-communication.js @@ -1,3 +1,7 @@ +// run it: +// npm install +// node -r babel-register ./examples/peer-communication.js + const devp2p = require('../src') const EthereumTx = require('ethereumjs-tx') const EthereumBlock = require('ethereumjs-block') @@ -8,11 +12,10 @@ const assert = require('assert') const { randomBytes } = require('crypto') const rlp = require('rlp-encoding') const Buffer = require('safe-buffer').Buffer - const PRIVATE_KEY = randomBytes(32) const CHAIN_ID = 1 -const BOOTNODES = require('ethereum-common').bootstrapNodes.filter((node) => { +const BOOTNODES = require('./bootstrapNodes.json').filter((node) => { return node.chainId === CHAIN_ID }).map((node) => { return { @@ -21,7 +24,6 @@ const BOOTNODES = require('ethereum-common').bootstrapNodes.filter((node) => { tcpPort: node.port } }) -const REMOTE_CLIENTID_FILTER = ['go1.5', 'go1.6', 'go1.7', 'quorum', 'pirl', 'ubiq', 'gmc', 'gwhale', 'prichain'] const CHECK_BLOCK_TITLE = 'Byzantium Fork' // Only for debugging/console output const CHECK_BLOCK_NR = 4370000 @@ -30,9 +32,20 @@ const CHECK_BLOCK_HEADER = rlp.decode(Buffer.from('f9020aa0a0890da724dd95c90a726 const getPeerAddr = (peer) => `${peer._socket.remoteAddress}:${peer._socket.remotePort}` +// set the default version to 4 +let VERSION = devp2p._util.v4 + +// option to run version 5 via cli: node -r babel-register ./examples/peer-communication.js 5 +const cliVersion = process.argv[2] + +if (cliVersion === '5') { + VERSION = devp2p._util.v5 +} + // DPT const dpt = new devp2p.DPT(PRIVATE_KEY, { refreshInterval: 30000, + version: VERSION, endpoint: { address: '0.0.0.0', udpPort: null, @@ -50,7 +63,6 @@ const rlpx = new devp2p.RLPx(PRIVATE_KEY, { devp2p.ETH.eth63, devp2p.ETH.eth62 ], - remoteClientIdFilter: REMOTE_CLIENTID_FILTER, listenPort: null }) @@ -62,6 +74,7 @@ rlpx.on('peer:added', (peer) => { const requests = { headers: [], bodies: [], msgTypes: {} } const clientId = peer.getHelloMessage().clientId + console.log(chalk.green(`Add peer: ${addr} ${clientId} (eth${eth.getVersion()}) (total: ${rlpx.getPeers().length})`)) eth.sendStatus({ @@ -130,7 +143,7 @@ rlpx.on('peer:added', (peer) => { case devp2p.ETH.MESSAGE_CODES.BLOCK_HEADERS: if (!forkVerified) { if (payload.length !== 1) { - console.log(`${addr} expected one header for ${CHECK_BLOCK_TITLE} verify (received: ${payload.length})`) + // console.log(`${addr} expected one header for ${CHECK_BLOCK_TITLE} verify (received: ${payload.length})`) peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER) break } @@ -138,7 +151,7 @@ rlpx.on('peer:added', (peer) => { const expectedHash = CHECK_BLOCK const header = new EthereumBlock.Header(payload[0]) if (header.hash().toString('hex') === expectedHash) { - console.log(`${addr} verified to be on the same side of the ${CHECK_BLOCK_TITLE}`) + // console.log(`${addr} verified to be on the same side of the ${CHECK_BLOCK_TITLE}`) clearTimeout(forkDrop) forkVerified = true } @@ -286,7 +299,9 @@ function onNewTx (tx, peer) { if (txCache.has(txHashHex)) return txCache.set(txHashHex, true) - console.log(`New tx: ${txHashHex} (from ${getPeerAddr(peer)})`) + + // uncomment if you want tx:hostname:port details (debug) + // console.log(`New tx: ${txHashHex} (from ${getPeerAddr(peer)})`) } const blocksCache = new LRUCache({ max: 100 }) diff --git a/examples/simple.js b/examples/simple.js index be5be48..72e2668 100644 --- a/examples/simple.js +++ b/examples/simple.js @@ -2,8 +2,9 @@ const chalk = require('chalk') const { DPT } = require('../src') const Buffer = require('safe-buffer').Buffer -const PRIVATE_KEY = 'd772e3d6a001a38064dd23964dd2836239fa0e6cec8b28972a87460a17210fe9' -const BOOTNODES = require('ethereum-common').bootstrapNodes.map((node) => { +const PRIVATE_KEY = + 'd772e3d6a001a38064dd23964dd2836239fa0e6cec8b28972a87460a17210fe9' +const BOOTNODES = require('ethereum-common').bootstrapNodes.map(node => { return { address: node.ip, udpPort: node.port, @@ -19,20 +20,32 @@ const dpt = new DPT(Buffer.from(PRIVATE_KEY, 'hex'), { } }) -dpt.on('error', (err) => console.error(chalk.red(err.stack || err))) +dpt.on('error', err => console.error(chalk.red(err.stack || err))) -dpt.on('peer:added', (peer) => { - const info = `(${peer.id.toString('hex')},${peer.address},${peer.udpPort},${peer.tcpPort})` - console.log(chalk.green(`New peer: ${info} (total: ${dpt.getPeers().length})`)) +dpt.on('peer:added', peer => { + const info = `(${peer.id.toString('hex')},${peer.address},${peer.udpPort},${ + peer.tcpPort + })` + console.log( + chalk.green(`New peer: ${info} (total: ${dpt.getPeers().length})`) + ) }) -dpt.on('peer:removed', (peer) => { - console.log(chalk.yellow(`Remove peer: ${peer.id.toString('hex')} (total: ${dpt.getPeers().length})`)) +dpt.on('peer:removed', peer => { + console.log( + chalk.yellow( + `Remove peer: ${peer.id.toString('hex')} (total: ${ + dpt.getPeers().length + })` + ) + ) }) // for accept incoming connections uncomment next line -// dpt.bind(30303, '0.0.0.0') +dpt.bind(30303, '0.0.0.0') for (let bootnode of BOOTNODES) { - dpt.bootstrap(bootnode).catch((err) => console.error(chalk.bold.red(err.stack || err))) + dpt + .bootstrap(bootnode) + .catch(err => console.error(chalk.bold.red(err.stack || err))) } diff --git a/package.json b/package.json index 8715a2a..374cd5f 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,8 @@ "eth62", "eth63", "les", - "les2" + "les2", + "discv5" ], "homepage": "https://github.com/ethereumjs/ethereumjs-devp2p", "bugs": { @@ -23,7 +24,8 @@ "Alex Beregszaszi ", "Kirill Fomichev (https://github.com/fanatid)", "Martin Becze ", - "Holger Drewes " + "Holger Drewes ", + "Tim Siwula " ], "files": [ "src", @@ -38,14 +40,18 @@ "node": ">=6.0" }, "scripts": { - "coverage": "nyc npm run test && nyc report --reporter=text-lcov > .nyc_output/lcov.info", + "coverage": "npm run test && nyc report --reporter=text-lcov > .nyc_output/lcov.info", "coveralls": "npm run coverage && coveralls <.nyc_output/lcov.info", "build": "babel src -d lib", "integration": "tape -r babel-register test/integration/*.js", - "lint": "standard", + "lint": "standard --fix", "prepublish": "npm run build", "test": "npm run lint && npm run unit && npm run integration", - "unit": "tape -r babel-register test/*.js" + "unit": "tape -r babel-register test/*.js", + "dev": "nodemon lib/index.js", + "v5-simple": "node examples/discv5/simple.js", + "v5-peer": "node -r babel-register ./examples/peer-communication.js 5", + "v4": "node -r babel-register ./examples/peer-communication.js 4" }, "dependencies": { "babel-runtime": "^6.11.6", @@ -70,12 +76,18 @@ "babel-preset-env": "^1.6.1", "babel-register": "^6.14.0", "chalk": "^1.1.3", - "coveralls": "^3.0.0", + "coveralls": "^3.0.2", + "eslint-config-standard": "^11.0.0", + "eslint-plugin-import": "^2.13.0", + "eslint-plugin-node": "^6.0.1", + "eslint-plugin-promise": "^3.8.0", + "eslint-plugin-standard": "^3.1.0", "ethereum-common": "~0.2.0", "ethereumjs-block": "^1.3.0", "ethereumjs-tx": "^1.1.1", - "nyc": "^11.4.1", - "standard": "*", + "nodemon": "^1.18.4", + "nyc": "^11.9.0", + "standard": "^12.0.1", "tape": "^4.5.1" }, "standard": { diff --git a/src/discv5/index.js b/src/discv5/index.js new file mode 100644 index 0000000..c6e93fc --- /dev/null +++ b/src/discv5/index.js @@ -0,0 +1,143 @@ +const { EventEmitter } = require('events') +const secp256k1 = require('secp256k1') +const Buffer = require('safe-buffer').Buffer +const { randomBytes } = require('crypto') +const createDebugLogger = require('debug') +const ms = require('ms') +const { pk2id, id2pk } = require('../util') +const KBucket = require('./kbucket') +const BanList = require('../dpt/ban-list') +const Server = require('./server') +const debug = createDebugLogger('devp2p:dpt') +const chalk = require('chalk') +const message = require('./message') + +class DISCV5 extends EventEmitter { + constructor (privateKey, options) { + super() + + this._privateKey = Buffer.from(privateKey) + this._id = pk2id(secp256k1.publicKeyCreate(this._privateKey, false)) + + // debug binary data + const info = id2pk(this._id) + + console.log(chalk.red(`+++++ index.js == DISCV5.this._id == ${info}`)) + + this._banlist = new BanList() + this._kbucket = new KBucket(this._id) + this._kbucket.on('added', peer => this.emit('peer:added', peer)) + this._kbucket.on('removed', peer => this.emit('peer:removed', peer)) + this._kbucket.on('hey', (...args) => this._onKBucketPing(...args)) + + this._server = new Server(this, this._privateKey, { + createSocket: options.createSocket, + timeout: options.timeout, + version: options.version, + endpoint: options.endpoint + }) + + this._server.once('listening', () => this.emit('listening')) + this._server.once('close', () => this.emit('close')) + this._server.on('peers', peers => this._onServerPeers(peers)) + this._server.on('error', err => this.emit('error', err)) + + const refreshInterval = options.refreshInterval || ms('60s') + this._refreshIntervalId = setInterval( + () => this.refresh(), + refreshInterval + ) + } + + bind (...args) { + this._server.bind(...args) + } + + destroy (...args) { + clearInterval(this._refreshIntervalId) + this._server.destroy(...args) + } + + _onKBucketPing (oldPeers, newPeer) { + if (this._banlist.has(newPeer)) return + + let count = 0 + let err = null + for (let peer of oldPeers) { + this._server + .hey(peer) + .catch(_err => { + this._banlist.add(peer, ms('5m')) + this._kbucket.remove(peer) + err = err || _err + }) + .then(() => { + if (++count < oldPeers.length) return + + if (err === null) this._banlist.add(newPeer, ms('5m')) + else this._kbucket.add(newPeer) + }) + } + } + + _onServerPeers (peers) { + for (let peer of peers) this.addPeer(peer).catch(() => {}) + } + + async bootstrap (peer) { + debug(`bootstrap with peer ${peer.address}:${peer.udpPort}`) + + peer = await this.addPeer(peer) + this._server.neighbors(peer, this._id) + } + + async addPeer (obj) { + if (this._banlist.has(obj)) throw new Error('Peer is banned') + debug(`attempt adding peer ${obj.address}:${obj.udpPort}`) + + // check k-bucket first + const peer = this._kbucket.get(obj) + if (peer !== null) return peer + + // check that peer is alive + try { + const peer = await this._server.hey(obj) + this.emit('peer:new', peer) + this._kbucket.add(peer) + return peer + } catch (err) { + this._banlist.add(obj, ms('10m')) + throw err + } + } + + getPeer (obj) { + return this._kbucket.get(obj) + } + + getPeers () { + return this._kbucket.getAll() + } + + getClosestPeers (id) { + return this._kbucket.closest(id) + } + + removePeer (obj) { + this._kbucket.remove(obj) + } + + banPeer (obj, maxAge) { + this._banlist.add(obj, maxAge) + this._kbucket.remove(obj) + } + + refresh () { + const peers = this.getPeers() + debug(`call .refresh (${peers.length} peers in table)`) + + for (let peer of peers) this._server.neighbors(peer, randomBytes(64)) + } +} + +module.exports = DISCV5 diff --git a/src/discv5/kbucket.js b/src/discv5/kbucket.js new file mode 100644 index 0000000..4bf9312 --- /dev/null +++ b/src/discv5/kbucket.js @@ -0,0 +1,73 @@ +const { EventEmitter } = require('events') +const Buffer = require('safe-buffer').Buffer +const _KBucket = require('k-bucket') + +const KBUCKET_SIZE = 16 +const KBUCKET_CONCURRENCY = 3 + +class KBucket extends EventEmitter { + constructor (id) { + super() + + this._peers = new Map() + + this._kbucket = new _KBucket({ + localNodeId: id, + numberOfNodesPerKBucket: KBUCKET_SIZE, + numberOfNodesToPing: KBUCKET_CONCURRENCY + }) + + this._kbucket.on('added', peer => { + KBucket.getKeys(peer).forEach(key => this._peers.set(key, peer)) + this.emit('added', peer) + }) + + this._kbucket.on('removed', peer => { + KBucket.getKeys(peer).forEach(key => this._peers.delete(key, peer)) + this.emit('removed', peer) + }) + + this._kbucket.on('hey', (...args) => this.emit('hey', ...args)) + + // this._kbucket.on("ping", (...args) => this.emit("ping", ...args)); + } + + static getKeys (obj) { + if (Buffer.isBuffer(obj)) return [obj.toString('hex')] + if (typeof obj === 'string') return [obj] + + const keys = [] + if (Buffer.isBuffer(obj.id)) keys.push(obj.id.toString('hex')) + if (obj.address && obj.port) keys.push(`${obj.address}:${obj.port}`) + return keys + } + + add (peer) { + const isExists = KBucket.getKeys(peer).some(key => this._peers.has(key)) + if (!isExists) this._kbucket.add(peer) + } + + get (obj) { + for (let key of KBucket.getKeys(obj)) { + const peer = this._peers.get(key) + if (peer !== undefined) return peer + } + + return null + } + + getAll () { + return this._kbucket.toArray() + } + + closest (id) { + return this._kbucket.closest(id, KBUCKET_SIZE) + } + + remove (obj) { + const peer = this.get(obj) + if (peer !== null) this._kbucket.remove(peer.id) + } +} + +module.exports = KBucket diff --git a/src/discv5/message.js b/src/discv5/message.js new file mode 100644 index 0000000..79bdeca --- /dev/null +++ b/src/discv5/message.js @@ -0,0 +1,278 @@ +/* + discv5 message packet types + UDP {packet types} for node discovery protocol version 5 + Max packet size = 1280 bytes + See: https://github.com/fjl/p2p-drafts/blob/master/discv5-packets.md#packets + */ +const ip = require('ip') +const rlp = require('rlp-encoding') +const secp256k1 = require('secp256k1') +const Buffer = require('safe-buffer').Buffer +const { keccak256, int2buffer, buffer2int, assertEq } = require('../util') + +// ping +const hey = { + encode: function (obj) { + return [ + int2buffer(obj.version), + endpoint.encode(obj.from), + endpoint.encode(obj.to), + timestamp.encode(obj.timestamp) + ] + }, + decode: function (payload) { + return { + version: buffer2int(payload[0]), + from: endpoint.decode(payload[1]), + to: endpoint.decode(payload[2]), + timestamp: timestamp.decode(payload[3]) + } + } +} + +/* + findNode packet (0x03) + requests a neightbors packet containing the closest know nodes to the target hash. +*/ +const findNode = { + encode: function (obj) { + return [endpoint.encode(obj.to), obj.hash, timestamp.encode(obj.timestamp)] + }, + decode: function (payload) { + return { + to: endpoint.decode(payload[0]), + hash: payload[1], + timestamp: timestamp.decode(payload[2]) + } + } +} + +const neighbors = { + encode: function (obj) { + return [obj.id, timestamp.encode(obj.timestamp)] + }, + decode: function (payload) { + return { + id: payload[0], + timestamp: timestamp.decode(payload[1]) + } + } +} + +const requestTicket = { + encode: function (obj) { + return [obj.id, timestamp.encode(obj.timestamp)] + }, + decode: function (payload) { + return { + id: payload[0], + timestamp: timestamp.decode(payload[1]) + } + } +} + +const ticket = { + encode: function (obj) { + return [ + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), + timestamp.encode(obj.timestamp) + ] + }, + decode: function (payload) { + return { + peers: payload[0].map(data => { + return { endpoint: endpoint.decode(data), id: data[3] } // hack for id + }), + timestamp: timestamp.decode(payload[1]) + } + } +} + +const topicRegister = { + encode: function (obj) { + return [ + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), + timestamp.encode(obj.timestamp) + ] + }, + decode: function (payload) { + return { + peers: payload[0].map(data => { + return { endpoint: endpoint.decode(data), id: data[3] } + }), + timestamp: timestamp.decode(payload[1]) + } + } +} + +const topicQuery = { + encode: function (obj) { + return [ + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), + timestamp.encode(obj.timestamp) + ] + }, + decode: function (payload) { + return { + peers: payload[0].map(data => { + return { endpoint: endpoint.decode(data), id: data[3] } + }), + timestamp: timestamp.decode(payload[1]) + } + } +} + +const topicNodes = { + encode: function (obj) { + return [ + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), + timestamp.encode(obj.timestamp) + ] + }, + decode: function (payload) { + return { + peers: payload[0].map(data => { + return { endpoint: endpoint.decode(data), id: data[3] } + }), + timestamp: timestamp.decode(payload[1]) + } + } +} + +function getTimestamp () { + return (Date.now() / 1000) | 0 +} + +const timestamp = { + encode: function (value = getTimestamp() + 60) { + const buffer = Buffer.allocUnsafe(4) + buffer.writeUInt32BE(value) + return buffer + }, + decode: function (buffer) { + if (buffer.length !== 4) { + throw new RangeError( + `Invalid timestamp buffer :${buffer.toString('hex')}` + ) + } + return buffer.readUInt32BE(0) + } +} + +const address = { + encode: function (value) { + if (ip.isV4Format(value)) return ip.toBuffer(value) + if (ip.isV6Format(value)) return ip.toBuffer(value) + throw new Error(`Invalid address: ${value}`) + }, + decode: function (buffer) { + if (buffer.length === 4) return ip.toString(buffer) + if (buffer.length === 16) return ip.toString(buffer) + + const str = buffer.toString() + if (ip.isV4Format(str) || ip.isV6Format(str)) return str + /* + also can be host, but skip it right now (because need async function for resolve) + throw new Error(`Invalid address buffer: ${buffer.toString("hex")}`); + */ + } +} + +const port = { + encode: function (value) { + if (value === null) return Buffer.allocUnsafe(0) + if (value >>> 16 > 0) throw new RangeError(`Invalid port: ${value}`) + return Buffer.from([(value >>> 8) & 0xff, (value >>> 0) & 0xff]) + }, + decode: function (buffer) { + if (buffer.length === 0) return null + return buffer2int(buffer) + } +} + +const endpoint = { + encode: function (obj) { + return [ + address.encode(obj.address), + port.encode(obj.udpPort), + port.encode(obj.tcpPort) + ] + }, + decode: function (payload) { + return { + address: address.decode(payload[0]), + udpPort: port.decode(payload[1]), + tcpPort: port.decode(payload[2]) + } + } +} + +function encode (typename, data, privateKey) { + const type = types.byName[typename] + if (type === undefined) throw new Error(`Invalid typename: ${typename}`) + const encodedMsg = messages[typename].encode(data) + const typedata = Buffer.concat([Buffer.from([type]), rlp.encode(encodedMsg)]) + const sighash = keccak256(typedata) + const sig = secp256k1.sign(sighash, privateKey) + const hashdata = Buffer.concat([ + sig.signature, + Buffer.from([sig.recovery]), + typedata + ]) + const hash = keccak256(hashdata) + return Buffer.concat([hash, hashdata]) +} + +function decode (buffer) { + const hash = keccak256(buffer.slice(32)) + assertEq(buffer.slice(0, 32), hash, 'Hash verification failed') + + const typedata = buffer.slice(97) + const type = typedata[0] + const typename = types.byType[type] + if (typename === undefined) throw new Error(`Invalid type: ${type}`) + const data = messages[typename].decode(rlp.decode(typedata.slice(1))) + + const sighash = keccak256(typedata) + const signature = buffer.slice(32, 96) + const recoverId = buffer[96] + const publicKey = secp256k1.recover(sighash, signature, recoverId, false) + + return { typename, data, publicKey } +} + +const messages = { + hey, + findNode, + neighbors, + requestTicket, + ticket, + topicRegister, + topicQuery, + topicNodes +} + +const types = { + byName: { + hey: 0x01, + findNode: 0x02, + neighbors: 0x03, + requestTicket: 0x04, + ticket: 0x05, + topicRegister: 0x06, + topicQuery: 0x07, + topicNodes: 0x08 + }, + byType: { + 0x01: 'hey', + 0x02: 'findNode', + 0x03: 'neighbors', + 0x04: 'requestTicket', + 0x05: 'ticket', + 0x06: 'topicRegister', + 0x07: 'topicQuery', + 0x08: 'topicNodes' + } +} + +module.exports = { encode, decode } diff --git a/src/discv5/node_record.js b/src/discv5/node_record.js new file mode 100644 index 0000000..84ffa8d --- /dev/null +++ b/src/discv5/node_record.js @@ -0,0 +1,124 @@ +// /* ------------------------------------------------------------------------------ +// File: node_record.js +// Description: Ethereum Node Record (ENR) data structure +// ------------------------------------------------------------------------------ +// Specification: https://github.com/fjl/p2p-drafts/blob/master/discv5-enr.md +// */ +// const secp256k1 = require("secp256k1"); +// const ip = require("ip"); +// const rlp = require("rlp-encoding"); +// const Buffer = require("safe-buffer").Buffer; +// const { keccak256, int2buffer, buffer2int, assertEq } = require("../util"); +// +// /* --------------------------------------- +// *** Constants *** +// --------------------------------------- +// MAX_RECORD_SIZE = 300 BYTES +// SEQUENCE_SIZE = 64BITS = 8 bytes +// */ +// const MAX_RECORD_SIZE = 300; +// const SEQUENCE_SIZE = 8; +// +// /* --------------------------------------- +// *** Node Record Specification *** +// --------------------------------------- +// Record { +// signature: IdentityScheme, +// sequence: 64bit int, +// key/value: mapping +// } +// */ +// +// +class EthereumNodeRecord { + constructor () { + super() + } + + // sequence number that acts like a nonce for record updates + // let sequence = BigInt(0); + + // this is how records are signed and encoded + // let content = Buffer.concat([ + // rlp.encode(signature), + // rpl.encode(sequence), + // rlp.encode("id"), rlp.encode(defaultKeyValuePairs.get("id")), + // rlp.encode("secp256k1"), rlp.encode(defaultKeyValuePairs.get("secp256k1")), + // rlp.encode("ip"), rlp.encode(defaultKeyValuePairs.get("ip")), + // rlp.encode("tcp"), rlp.encode(defaultKeyValuePairs.get("tcp")), + // rlp.encode("udp"), rlp.encode(defaultKeyValuePairs.get("udp")) + // ]); +} + +// +// /* --------------------------------------- +// *** RLP Encoding *** +// --------------------------------------- +// */ +// function encode(typename, data, privateKey) { +// const type = types.byName[typename]; +// if (type === undefined) throw new Error(`Invalid typename: ${typename}`); +// const encodedMsg = messages[typename].encode(data); +// const typedata = Buffer.concat([Buffer.from([type]), rlp.encode(encodedMsg)]); +// const sighash = keccak256(typedata); +// const sig = secp256k1.sign(sighash, privateKey); +// const hashdata = Buffer.concat([ +// sig.signature, +// Buffer.from([sig.recovery]), +// typedata +// ]); +// const hash = keccak256(hashdata); +// return Buffer.concat([hash, hashdata]); +// } +// +// /* --------------------------------------- +// *** Pre-defined Key/Value Pairs *** +// --------------------------------------- +// Key { +// id = vlaue, +// secp256k1 = vlaue, +// ip = vlaue, +// tcp = vlaue, +// udp = vlaue +// } +// */ +// let defaultKeyValuePairs = new Map( +// [["id", "v4"], // name of identity scheme +// ["secp256k1", ""], // compressed pub key 33 bytes +// ["ip", ""], // ip address 4 or 6 bytes +// ["tcp", ""], // tcp port number +// ["udp", ""]]); // udp port number +// +// /* --------------------------------------- +// *** "v4" Inentity Scheme *** +// --------------------------------------- +// IdentityScheme{ +// sign() -> createRecordSignature(record contents), +// verify() -> validateRecordSignature(), +// derive() -> deriveNodeAddress() +// } +// */ +// class DefaultIdentityScheme { +// let defaultSchemeList = "v4"; +// +// // signs a records content +// function sign(content) { +// const sighash = keccak256(content); +// } +// +// // verifies a node record +// function verify(signature, publicKey) { +// let isValid = new Boolean(signature == defaultKeyValuePairs.get("secp256k1")); +// return isValid; +// } +// +// // derives the node address +// function derive(publicKey) { +// const nodeAddress = keccak256(publicKey); +// return nodeAddress; +// } +// } +// +// module.exports = { EthereumNodeRecord, DefaultIdentityScheme }; + +module.exports = EthereumNodeRecord diff --git a/src/discv5/server.js b/src/discv5/server.js new file mode 100644 index 0000000..71f7b64 --- /dev/null +++ b/src/discv5/server.js @@ -0,0 +1,281 @@ +const { EventEmitter } = require('events') +const dgram = require('dgram') +const ms = require('ms') +const createDebugLogger = require('debug') +const LRUCache = require('lru-cache') +const message = require('./message') +const { keccak256, pk2id, createDeferred, v4, v5 } = require('../util') +const chalk = require('chalk') +const debug = createDebugLogger('devp2p:dpt:server') + +const createSocketUDP4 = dgram.createSocket.bind(null, 'udp4') + +class Server extends EventEmitter { + constructor (dpt, privateKey, options) { + super() + this._dpt = dpt + this._privateKey = privateKey + + if (options.version === '5') { + this._version = v5 + } else { + this._version = v4 + } + + console.log( + chalk.green( + `Starting node discovery protocol with version: ${this._version}` + ) + ) + + this._timeout = options.timeout || ms('10s') + + this._endpoint = options.endpoint || { + address: '0.0.0.0', + udpPort: null, + tcpPort: null + } + this._requests = new Map() + this._parityRequestMap = new Map() + this._requestsCache = new LRUCache({ + max: 1000, + maxAge: ms('1s'), + stale: false + }) + + const createSocket = options.createSocket || createSocketUDP4 + this._socket = createSocket() + this._socket.once('listening', () => this.emit('listening')) + this._socket.once('close', () => this.emit('close')) + this._socket.on('error', err => this.emit('error', err)) + + // processes incoming messages + this._socket.on('message', (msg, rinfo) => { + try { + this._handler(msg, rinfo) + } catch (err) { + this.emit('error', err) + } + }) + } + + bind (...args) { + this._isAliveCheck() + debug('call .bind') + + this._socket.bind(...args) + } + + destroy (...args) { + this._isAliveCheck() + debug('call .destroy') + + this._socket.close(...args) + this._socket = null + } + + async hey (peer) { + this._isAliveCheck() + + const rckey = `${peer.address}:${peer.udpPort}` + const promise = this._requestsCache.get(rckey) + if (promise !== undefined) return promise + + const hash = this._send(peer, 'hey', { + version: this._version, + from: this._endpoint, + to: peer + }) + + const deferred = createDeferred() + const rkey = hash.toString('hex') + + this._requests.set(rkey, { + peer, + deferred, + timeoutId: setTimeout(() => { + if (this._requests.get(rkey) !== undefined) { + debug( + `ping timeout: ${peer.address}:${peer.udpPort} ${peer.id && + peer.id.toString('hex')}` + ) + this._requests.delete(rkey) + deferred.reject( + new Error(`Timeout error: ping ${peer.address}:${peer.udpPort}`) + ) + } else { + return deferred.promise + } + }, this._timeout) + }) + + this._requestsCache.set(rckey, deferred.promise) + return deferred.promise + } + + neighbors (peer, id) { + this._isAliveCheck() + this._send(peer, 'neighbors', { id }) + } + + _isAliveCheck () { + if (this._socket === null) throw new Error('Server already destroyed') + } + + _send (peer, typename, data) { + // debug( + // `send ${typename} to ${peer.address}:${peer.udpPort} (peerId: ${peer.id && + // peer.id.toString("hex")})` + // ); + + const msg = message.encode(typename, data, this._privateKey) + // Parity hack + // There is a bug in Parity up to at lease 1.8.10 not echoing the hash from + // discovery spec (hash: sha3(signature || packet-type || packet-data)) + // but just hashing the RLP-encoded packet data (see discovery.rs, on_ping()) + // 2018-02-28 + if (typename === 'hey') { + const rkeyParity = keccak256(msg.slice(98)).toString('hex') + this._parityRequestMap.set(rkeyParity, msg.slice(0, 32).toString('hex')) + setTimeout(() => { + if (this._parityRequestMap.get(rkeyParity) !== undefined) { + this._parityRequestMap.delete(rkeyParity) + } + }, this._timeout) + } + + this._socket.send(msg, 0, msg.length, peer.udpPort, peer.address) + return msg.slice(0, 32) // message id + } + + // processes each incoming message by it's message type, msg in binary data + _handler (msg, rinfo) { + const info = message.decode(msg) + const peerId = pk2id(info.publicKey) + + debug( + `received ${info.typename} from ${rinfo.address}:${ + rinfo.port + } (peerId: ${peerId.toString('hex')})` + ) + + // add peer if not in our table + const peer = this._dpt.getPeer(peerId) + if ( + peer === null && + info.typename === 'hey' && + info.data.from.udpPort !== null + ) { + setTimeout(() => this.emit('peers', [info.data.from]), ms('100ms')) + } + + switch (info.typename) { + case 'hey': + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) + this._send(rinfo, 'hey', { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }) + break + + /* + findNode packet (0x03) + requests a neighbors packet containing the closest know nodes to the target hash. + */ + case 'findNode': + + var rkey = info.data.hash.toString('hex') + const rkeyParity = this._parityRequestMap.get(rkey) + + if (rkeyParity) { + rkey = rkeyParity + this._parityRequestMap.delete(rkeyParity) + } + const request = this._requests.get(rkey) + + if (request) { + this._requests.delete(rkey) + request.deferred.resolve({ + id: peerId, + address: request.peer.address, + udpPort: request.peer.udpPort, + tcpPort: request.peer.tcpPort + }) + } + break + + case 'neighbors': + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) + this._send(rinfo, 'neighbors', { + peers: this._dpt.getClosestPeers(info.data.id) + }) + break + + case 'requestTicket': + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) + this._send(rinfo, 'pong', { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }) + break + + case 'ticket': + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) + this._send(rinfo, 'pong', { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }) + break + + case 'topicRegister': + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) + this._send(rinfo, 'pong', { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }) + break + + case 'topicQuery': + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) + this._send(rinfo, 'pong', { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }) + break + + case 'topicNodes': + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) + this._send(rinfo, 'pong', { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }) + break + } + } +} + +module.exports = Server diff --git a/src/discv5/topic.js b/src/discv5/topic.js new file mode 100644 index 0000000..282ce4a --- /dev/null +++ b/src/discv5/topic.js @@ -0,0 +1,5 @@ +/* + This is a placeholder. + Geth v5 protocol is a placeholder for now. + https://github.com/ethereumjs/ethereumjs-devp2p/pull/42#issuecomment-418909954 +*/ diff --git a/src/dpt/index.js b/src/dpt/index.js index 7c0fd58..d80a7b1 100644 --- a/src/dpt/index.js +++ b/src/dpt/index.js @@ -28,6 +28,7 @@ class DPT extends EventEmitter { this._server = new DPTServer(this, this._privateKey, { createSocket: options.createSocket, timeout: options.timeout, + version: options.version, endpoint: options.endpoint }) this._server.once('listening', () => this.emit('listening')) @@ -95,7 +96,7 @@ class DPT extends EventEmitter { this._kbucket.add(peer) return peer } catch (err) { - this._banlist.add(obj, ms('5m')) + this._banlist.add(obj, ms('10m')) throw err } } diff --git a/src/dpt/message.js b/src/dpt/message.js index a207afb..73432dc 100644 --- a/src/dpt/message.js +++ b/src/dpt/message.js @@ -1,3 +1,7 @@ +/* + This file implements Node Discovery Protocol Version 5 as defined here: + https://github.com/fjl/p2p-drafts/blob/master/discv5-packets.md +*/ const ip = require('ip') const rlp = require('rlp-encoding') const secp256k1 = require('secp256k1') @@ -15,7 +19,11 @@ const timestamp = { return buffer }, decode: function (buffer) { - if (buffer.length !== 4) throw new RangeError(`Invalid timestamp buffer :${buffer.toString('hex')}`) + if (buffer.length !== 4) { + throw new RangeError( + `Invalid timestamp buffer :${buffer.toString('hex')}` + ) + } return buffer.readUInt32BE(0) } } @@ -41,8 +49,8 @@ const address = { const port = { encode: function (value) { if (value === null) return Buffer.allocUnsafe(0) - if ((value >>> 16) > 0) throw new RangeError(`Invalid port: ${value}`) - return Buffer.from([ (value >>> 8) & 0xff, (value >>> 0) & 0xff ]) + if (value >>> 16 > 0) throw new RangeError(`Invalid port: ${value}`) + return Buffer.from([(value >>> 8) & 0xff, (value >>> 0) & 0xff]) }, decode: function (buffer) { if (buffer.length === 0) return null @@ -76,6 +84,11 @@ const ping = { endpoint.encode(obj.to), timestamp.encode(obj.timestamp) ] + + // message = _pack(CMD_PING.id, payload, self.privkey) + // self.send(node, message) + // # Return the msg hash, which is used as a token to identify pongs. + // return message[:MAC_SIZE] }, decode: function (payload) { return { @@ -89,11 +102,7 @@ const ping = { const pong = { encode: function (obj) { - return [ - endpoint.encode(obj.to), - obj.hash, - timestamp.encode(obj.timestamp) - ] + return [endpoint.encode(obj.to), obj.hash, timestamp.encode(obj.timestamp)] }, decode: function (payload) { return { @@ -106,10 +115,7 @@ const pong = { const findneighbours = { encode: function (obj) { - return [ - obj.id, - timestamp.encode(obj.timestamp) - ] + return [obj.id, timestamp.encode(obj.timestamp)] }, decode: function (payload) { return { @@ -122,13 +128,13 @@ const findneighbours = { const neighbours = { encode: function (obj) { return [ - obj.peers.map((peer) => endpoint.encode(peer).concat(peer.id)), + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), timestamp.encode(obj.timestamp) ] }, decode: function (payload) { return { - peers: payload[0].map((data) => { + peers: payload[0].map(data => { return { endpoint: endpoint.decode(data), id: data[3] } // hack for id }), timestamp: timestamp.decode(payload[1]) @@ -163,13 +169,17 @@ function encode (typename, data, privateKey) { const type = types.byName[typename] if (type === undefined) throw new Error(`Invalid typename: ${typename}`) const encodedMsg = messages[typename].encode(data) - const typedata = Buffer.concat([ Buffer.from([ type ]), rlp.encode(encodedMsg) ]) + const typedata = Buffer.concat([Buffer.from([type]), rlp.encode(encodedMsg)]) const sighash = keccak256(typedata) const sig = secp256k1.sign(sighash, privateKey) - const hashdata = Buffer.concat([ sig.signature, Buffer.from([ sig.recovery ]), typedata ]) + const hashdata = Buffer.concat([ + sig.signature, + Buffer.from([sig.recovery]), + typedata + ]) const hash = keccak256(hashdata) - return Buffer.concat([ hash, hashdata ]) + return Buffer.concat([hash, hashdata]) } function decode (buffer) { diff --git a/src/dpt/server.js b/src/dpt/server.js index b848a5a..e94b840 100644 --- a/src/dpt/server.js +++ b/src/dpt/server.js @@ -4,10 +4,10 @@ const ms = require('ms') const createDebugLogger = require('debug') const LRUCache = require('lru-cache') const message = require('./message') -const { keccak256, pk2id, createDeferred } = require('../util') - +const { keccak256, pk2id, createDeferred, v4, v5 } = require('../util') +const chalk = require('chalk') const debug = createDebugLogger('devp2p:dpt:server') -const VERSION = 0x04 + const createSocketUDP4 = dgram.createSocket.bind(null, 'udp4') class Server extends EventEmitter { @@ -17,17 +17,38 @@ class Server extends EventEmitter { this._dpt = dpt this._privateKey = privateKey + if (options.version === '5') { + this._version = v5 + } else { + this._version = v4 + } + + console.log( + chalk.green( + `Starting node discovery protocol with version: ${this._version}` + ) + ) + this._timeout = options.timeout || ms('10s') - this._endpoint = options.endpoint || { address: '0.0.0.0', udpPort: null, tcpPort: null } + this._endpoint = options.endpoint || { + address: '0.0.0.0', + udpPort: null, + tcpPort: null + } this._requests = new Map() this._parityRequestMap = new Map() - this._requestsCache = new LRUCache({ max: 1000, maxAge: ms('1s'), stale: false }) + this._requestsCache = new LRUCache({ + max: 1000, + maxAge: ms('1s'), + stale: false + }) const createSocket = options.createSocket || createSocketUDP4 this._socket = createSocket() this._socket.once('listening', () => this.emit('listening')) this._socket.once('close', () => this.emit('close')) - this._socket.on('error', (err) => this.emit('error', err)) + this._socket.on('error', err => this.emit('error', err)) + this._socket.on('message', (msg, rinfo) => { try { this._handler(msg, rinfo) @@ -60,26 +81,33 @@ class Server extends EventEmitter { if (promise !== undefined) return promise const hash = this._send(peer, 'ping', { - version: VERSION, + version: this._version, from: this._endpoint, to: peer }) const deferred = createDeferred() const rkey = hash.toString('hex') + this._requests.set(rkey, { peer, deferred, timeoutId: setTimeout(() => { if (this._requests.get(rkey) !== undefined) { - debug(`ping timeout: ${peer.address}:${peer.udpPort} ${peer.id && peer.id.toString('hex')}`) + debug( + `ping timeout: ${peer.address}:${peer.udpPort} ${peer.id && + peer.id.toString('hex')}` + ) this._requests.delete(rkey) - deferred.reject(new Error(`Timeout error: ping ${peer.address}:${peer.udpPort}`)) + deferred.reject( + new Error(`Timeout error: ping ${peer.address}:${peer.udpPort}`) + ) } else { return deferred.promise } }, this._timeout) }) + this._requestsCache.set(rckey, deferred.promise) return deferred.promise } @@ -94,7 +122,10 @@ class Server extends EventEmitter { } _send (peer, typename, data) { - debug(`send ${typename} to ${peer.address}:${peer.udpPort} (peerId: ${peer.id && peer.id.toString('hex')})`) + debug( + `send ${typename} to ${peer.address}:${peer.udpPort} (peerId: ${peer.id && + peer.id.toString('hex')})` + ) const msg = message.encode(typename, data, this._privateKey) // Parity hack @@ -118,12 +149,20 @@ class Server extends EventEmitter { _handler (msg, rinfo) { const info = message.decode(msg) const peerId = pk2id(info.publicKey) - debug(`received ${info.typename} from ${rinfo.address}:${rinfo.port} (peerId: ${peerId.toString('hex')})`) + debug( + `received ${info.typename} from ${rinfo.address}:${ + rinfo.port + } (peerId: ${peerId.toString('hex')})` + ) // add peer if not in our table const peer = this._dpt.getPeer(peerId) - if (peer === null && info.typename === 'ping' && info.data.from.udpPort !== null) { - setTimeout(() => this.emit('peers', [ info.data.from ]), ms('100ms')) + if ( + peer === null && + info.typename === 'ping' && + info.data.from.udpPort !== null + ) { + setTimeout(() => this.emit('peers', [info.data.from]), ms('100ms')) } switch (info.typename) { @@ -166,7 +205,7 @@ class Server extends EventEmitter { break case 'neighbours': - this.emit('peers', info.data.peers.map((peer) => peer.endpoint)) + this.emit('peers', info.data.peers.map(peer => peer.endpoint)) break } } diff --git a/src/eth/index.js b/src/eth/index.js index 6c8d663..3fa28c1 100644 --- a/src/eth/index.js +++ b/src/eth/index.js @@ -41,21 +41,31 @@ class ETH extends EventEmitter { }, ms('5s')) } - static eth62 = { name: 'eth', version: 62, length: 8, constructor: ETH } - static eth63 = { name: 'eth', version: 63, length: 17, constructor: ETH } + static eth62 = { name: 'eth', version: 62, length: 8, constructor: ETH }; + static eth63 = { name: 'eth', version: 63, length: 17, constructor: ETH }; - static MESSAGE_CODES = MESSAGE_CODES + static MESSAGE_CODES = MESSAGE_CODES; _handleMessage (code, data) { const payload = rlp.decode(data) if (code !== MESSAGE_CODES.STATUS) { - debug(`Received ${this.getMsgPrefix(code)} message from ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: ${data.toString('hex')}`) + debug( + `Received ${this.getMsgPrefix(code)} message from ${ + this._peer._socket.remoteAddress + }:${this._peer._socket.remotePort}: ${data.toString('hex')}` + ) } switch (code) { case MESSAGE_CODES.STATUS: assertEq(this._peerStatus, null, 'Uncontrolled status message') this._peerStatus = payload - debug(`Received ${this.getMsgPrefix(code)} message from ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: : ${this._getStatusString(this._peerStatus)}`) + debug( + `Received ${this.getMsgPrefix(code)} message from ${ + this._peer._socket.remoteAddress + }:${this._peer._socket.remotePort}: : ${this._getStatusString( + this._peerStatus + )}` + ) this._handleStatus() break @@ -104,8 +114,12 @@ class ETH extends EventEmitter { } _getStatusString (status) { - var sStr = `[V:${buffer2int(status[0])}, NID:${buffer2int(status[1])}, TD:${buffer2int(status[2])}` - sStr += `, BestH:${status[3].toString('hex')}, GenH:${status[4].toString('hex')}]` + var sStr = `[V:${buffer2int(status[0])}, NID:${buffer2int( + status[1] + )}, TD:${buffer2int(status[2])}` + sStr += `, BestH:${status[3].toString('hex')}, GenH:${status[4].toString( + 'hex' + )}]` return sStr } @@ -119,13 +133,23 @@ class ETH extends EventEmitter { status.genesisHash ] - debug(`Send STATUS message to ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort} (eth${this._version}): ${this._getStatusString(this._status)}`) + debug( + `Send STATUS message to ${this._peer._socket.remoteAddress}:${ + this._peer._socket.remotePort + } (eth${this._version}): ${this._getStatusString(this._status)}` + ) this._send(MESSAGE_CODES.STATUS, rlp.encode(this._status)) this._handleStatus() } sendMessage (code, payload) { - debug(`Send ${this.getMsgPrefix(code)} message to ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: ${rlp.encode(payload).toString('hex')}`) + debug( + `Send ${this.getMsgPrefix(code)} message to ${ + this._peer._socket.remoteAddress + }:${this._peer._socket.remotePort}: ${rlp + .encode(payload) + .toString('hex')}` + ) switch (code) { case MESSAGE_CODES.STATUS: throw new Error('Please send status message through .sendStatus') @@ -138,14 +162,18 @@ class ETH extends EventEmitter { case MESSAGE_CODES.BLOCK_BODIES: case MESSAGE_CODES.NEW_BLOCK: if (this._version >= ETH.eth62.version) break - throw new Error(`Code ${code} not allowed with version ${this._version}`) + throw new Error( + `Code ${code} not allowed with version ${this._version}` + ) case MESSAGE_CODES.GET_NODE_DATA: case MESSAGE_CODES.NODE_DATA: case MESSAGE_CODES.GET_RECEIPTS: case MESSAGE_CODES.RECEIPTS: if (this._version >= ETH.eth63.version) break - throw new Error(`Code ${code} not allowed with version ${this._version}`) + throw new Error( + `Code ${code} not allowed with version ${this._version}` + ) default: throw new Error(`Unknown code ${code}`) @@ -155,7 +183,9 @@ class ETH extends EventEmitter { } getMsgPrefix (msgCode) { - return Object.keys(MESSAGE_CODES).find(key => MESSAGE_CODES[key] === msgCode) + return Object.keys(MESSAGE_CODES).find( + key => MESSAGE_CODES[key] === msgCode + ) } } diff --git a/src/index.js b/src/index.js index 7cccdff..fad7cb1 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,7 @@ +exports.DISCV5 = require('./discv5') exports.DPT = require('./dpt') + exports.ETH = require('./eth') exports.LES = require('./les') exports.RLPx = require('./rlpx') - exports._util = require('./util') diff --git a/src/rlpx/peer.js b/src/rlpx/peer.js index 0ab2a87..0d53f73 100644 --- a/src/rlpx/peer.js +++ b/src/rlpx/peer.js @@ -152,7 +152,9 @@ class Peer extends EventEmitter { case 'Body': const body = this._eciesSession.parseBody(data) - debug(`Received body ${this._socket.remoteAddress}:${this._socket.remotePort} ${body.toString('hex')}`) + + // TODO: uncomment if you want tx:hostname:port details (debug) + // console.log(`Received body ${this._socket.remoteAddress}:${this._socket.remotePort} ${body.toString('hex')}`) this._state = 'Header' this._nextPacketSize = 32 @@ -170,7 +172,7 @@ class Peer extends EventEmitter { const msgCode = code - obj.offset const prefix = this.getMsgPrefix(msgCode) - debug(`Received ${prefix} (message code: ${code} - ${obj.offset} = ${msgCode}) ${this._socket.remoteAddress}:${this._socket.remotePort}`) + console.log(`Received ${prefix} (message code: ${code} - ${obj.offset} = ${msgCode}) ${this._socket.remoteAddress}:${this._socket.remotePort}`) try { obj.protocol._handleMessage(msgCode, body.slice(1)) diff --git a/src/util.js b/src/util.js index 56b6661..749c815 100644 --- a/src/util.js +++ b/src/util.js @@ -4,12 +4,20 @@ const Buffer = require('safe-buffer').Buffer const createDebugLogger = require('debug') const createKeccakHash = require('keccak') const assert = require('assert') - const debug = createDebugLogger('devp2p:util') +// node discovery protocol versions +const v4 = '4' +const v5 = '5' + +// max packet size in bytes +const MAXPACKETSIZE = 1280 + function keccak256 (...buffers) { const buffer = Buffer.concat(buffers) - return createKeccakHash('keccak256').update(buffer).digest() + return createKeccakHash('keccak256') + .update(buffer) + .digest() } function genPrivateKey () { @@ -25,7 +33,7 @@ function pk2id (pk) { } function id2pk (id) { - return Buffer.concat([ Buffer.from([ 0x04 ]), id ]) + return Buffer.concat([Buffer.from([0x04]), id]) } function int2buffer (v) { @@ -46,7 +54,7 @@ function zfill (buffer, size, leftpad) { if (buffer.length >= size) return buffer if (leftpad === undefined) leftpad = true const pad = Buffer.allocUnsafe(size - buffer.length).fill(0x00) - return leftpad ? Buffer.concat([ pad, buffer ]) : Buffer.concat([ buffer, pad ]) + return leftpad ? Buffer.concat([pad, buffer]) : Buffer.concat([buffer, pad]) } function xor (a, b) { @@ -94,5 +102,14 @@ module.exports = { zfill, xor, assertEq, - createDeferred + createDeferred, + v4, + v5, + MAXPACKETSIZE +} + +// used for v5 nonce packet. see https://github.com/fjl/p2p-drafts/blob/master/discv5-packets.md#packets +function generateNonce () { + const nonce = randomBytes(16) + return nonce } diff --git a/test/discv5/dpt-message.js b/test/discv5/dpt-message.js new file mode 100644 index 0000000..e69de29 diff --git a/test/discv5/dpt-simulator.js b/test/discv5/dpt-simulator.js new file mode 100644 index 0000000..e69de29 diff --git a/test/discv5/eth-simulator.js b/test/discv5/eth-simulator.js new file mode 100644 index 0000000..e69de29 diff --git a/test/discv5/les-simulator.js b/test/discv5/les-simulator.js new file mode 100644 index 0000000..e69de29 diff --git a/test/integration/util.js b/test/integration/util.js index 9e6245f..b3983c2 100644 --- a/test/integration/util.js +++ b/test/integration/util.js @@ -7,14 +7,16 @@ exports.getTestDPTs = function (numDPTs) { const dpts = [] for (let i = 0; i < numDPTs; ++i) { - const dpt = new devp2p.DPT(devp2p._util.genPrivateKey(), { - endpoint: { - address: localhost, - udpPort: basePort + i, - tcpPort: basePort + i - }, - timeout: 100 - }) + const dpt = new devp2p.DPT( + devp2p._util.genPrivateKey(), { + version: devp2p._util.v4, + endpoint: { + address: localhost, + udpPort: basePort + i, + tcpPort: basePort + i + }, + timeout: 100 + }) dpt.bind(basePort + i) dpts.push(dpt) } @@ -28,6 +30,33 @@ exports.initTwoPeerDPTSetup = function () { return dpts } +exports.getTestDPTsV5 = function (numDPTs) { + const dpts = [] + + for (let i = 0; i < numDPTs; ++i) { + const dpt = new devp2p.DPT( + devp2p._util.genPrivateKey(), { + version: devp2p._util.v5, + endpoint: { + address: localhost, + udpPort: basePort + i, + tcpPort: basePort + i + }, + timeout: 100 + }) + dpt.bind(basePort + i) + dpts.push(dpt) + } + return dpts +} + +exports.initTwoPeerDPTSetupV5 = function () { + const dpts = exports.getTestDPTsV5(2) + const peer = { address: localhost, udpPort: basePort + 1 } + dpts[0].addPeer(peer) + return dpts +} + exports.destroyDPTs = function (dpts) { for (let dpt of dpts) dpt.destroy() }