Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit number of concurrent events that are processed #72

Merged
merged 4 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"node-fetch": "^2.1.2",
"pino": "^4.17.3",
"pino-pretty": "^2.0.1",
"promise-limit": "^2.7.0",
"promise-retry": "^1.1.1",
"redlock": "^3.1.2",
"web3": "^1.0.0-beta.34",
Expand Down
70 changes: 38 additions & 32 deletions src/events/processAffirmationRequests.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
require('dotenv').config()
const Web3 = require('web3')
const HttpListProvider = require('http-list-provider')
const promiseLimit = require('promise-limit')
const logger = require('../services/logger')
const rpcUrlsManager = require('../services/getRpcUrlsManager')
const { MAX_CONCURRENT_EVENTS } = require('../utils/constants')

const { VALIDATOR_ADDRESS } = process.env

const limit = promiseLimit(MAX_CONCURRENT_EVENTS)

function processAffirmationRequestsBuilder(config) {
const homeProvider = new HttpListProvider(rpcUrlsManager.homeUrls)
const web3Home = new Web3(homeProvider)
Expand All @@ -14,43 +18,45 @@ function processAffirmationRequestsBuilder(config) {
return async function processAffirmationRequests(affirmationRequests) {
const txToSend = []

const callbacks = affirmationRequests.map(async affirmationRequest => {
const { recipient, value } = affirmationRequest.returnValues
const callbacks = affirmationRequests.map(affirmationRequest =>
limit(async () => {
const { recipient, value } = affirmationRequest.returnValues

logger.info(
{ eventTransactionHash: affirmationRequest.transactionHash, sender: recipient, value },
`Processing affirmationRequest ${affirmationRequest.transactionHash}`
)
logger.info(
{ eventTransactionHash: affirmationRequest.transactionHash, sender: recipient, value },
`Processing affirmationRequest ${affirmationRequest.transactionHash}`
)

let gasEstimate
try {
gasEstimate = await homeBridge.methods
.executeAffirmation(recipient, value, affirmationRequest.transactionHash)
.estimateGas({ from: VALIDATOR_ADDRESS })
} catch (e) {
if (e.message.includes('Invalid JSON RPC response')) {
throw new Error(
`RPC Connection Error: executeAffirmation Gas Estimate cannot be obtained.`
let gasEstimate
try {
gasEstimate = await homeBridge.methods
.executeAffirmation(recipient, value, affirmationRequest.transactionHash)
.estimateGas({ from: VALIDATOR_ADDRESS })
} catch (e) {
if (e.message.includes('Invalid JSON RPC response')) {
throw new Error(
`RPC Connection Error: executeAffirmation Gas Estimate cannot be obtained.`
)
}
logger.info(
{ eventTransactionHash: affirmationRequest.transactionHash },
`Already processed affirmationRequest ${affirmationRequest.transactionHash}`
)
return
}
logger.info(
{ eventTransactionHash: affirmationRequest.transactionHash },
`Already processed affirmationRequest ${affirmationRequest.transactionHash}`
)
return
}

const data = await homeBridge.methods
.executeAffirmation(recipient, value, affirmationRequest.transactionHash)
.encodeABI({ from: VALIDATOR_ADDRESS })

txToSend.push({
data,
gasEstimate,
transactionReference: affirmationRequest.transactionHash,
to: config.homeBridgeAddress

const data = await homeBridge.methods
.executeAffirmation(recipient, value, affirmationRequest.transactionHash)
.encodeABI({ from: VALIDATOR_ADDRESS })

txToSend.push({
data,
gasEstimate,
transactionReference: affirmationRequest.transactionHash,
to: config.homeBridgeAddress
})
})
})
)

await Promise.all(callbacks)
return txToSend
Expand Down
109 changes: 58 additions & 51 deletions src/events/processCollectedSignatures.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
require('dotenv').config()
const Web3 = require('web3')
const HttpListProvider = require('http-list-provider')
const promiseLimit = require('promise-limit')
const logger = require('../services/logger')
const rpcUrlsManager = require('../services/getRpcUrlsManager')
const { signatureToVRS } = require('../utils/message')
const { MAX_CONCURRENT_EVENTS } = require('../utils/constants')

const { VALIDATOR_ADDRESS } = process.env

const limit = promiseLimit(MAX_CONCURRENT_EVENTS)

function processCollectedSignaturesBuilder(config) {
const homeProvider = new HttpListProvider(rpcUrlsManager.homeUrls)
const web3Home = new Web3(homeProvider)
Expand All @@ -21,68 +25,71 @@ function processCollectedSignaturesBuilder(config) {

return async function processCollectedSignatures(signatures) {
const txToSend = []
const callbacks = signatures.map(async colSignature => {
const {
authorityResponsibleForRelay,
messageHash,
NumberOfCollectedSignatures
} = colSignature.returnValues

if (authorityResponsibleForRelay === web3Home.utils.toChecksumAddress(VALIDATOR_ADDRESS)) {
logger.info(
{ eventTransactionHash: colSignature.transactionHash },
`Processing CollectedSignatures ${colSignature.transactionHash}`
)
const message = await homeBridge.methods.message(messageHash).call()
const callbacks = signatures.map(colSignature =>
limit(async () => {
const {
authorityResponsibleForRelay,
messageHash,
NumberOfCollectedSignatures
} = colSignature.returnValues

if (authorityResponsibleForRelay === web3Home.utils.toChecksumAddress(VALIDATOR_ADDRESS)) {
logger.info(
{ eventTransactionHash: colSignature.transactionHash },
`Processing CollectedSignatures ${colSignature.transactionHash}`
)
const message = await homeBridge.methods.message(messageHash).call()

const requiredSignatures = []
requiredSignatures.length = NumberOfCollectedSignatures
requiredSignatures.fill(0)
const requiredSignatures = []
requiredSignatures.length = NumberOfCollectedSignatures
requiredSignatures.fill(0)

const [v, r, s] = [[], [], []]
const signaturePromises = requiredSignatures.map(async (el, index) => {
const signature = await homeBridge.methods.signature(messageHash, index).call()
const recover = signatureToVRS(signature)
v.push(recover.v)
r.push(recover.r)
s.push(recover.s)
})
const [v, r, s] = [[], [], []]
const signaturePromises = requiredSignatures.map(async (el, index) => {
const signature = await homeBridge.methods.signature(messageHash, index).call()
const recover = signatureToVRS(signature)
v.push(recover.v)
r.push(recover.r)
s.push(recover.s)
})

await Promise.all(signaturePromises)
await Promise.all(signaturePromises)

let gasEstimate
try {
gasEstimate = await foreignBridge.methods
.executeSignatures(v, r, s, message)
.estimateGas()
} catch (e) {
if (e.message.includes('Invalid JSON RPC response')) {
throw new Error(
`RPC Connection Error: executeSignatures Gas Estimate cannot be obtained.`
let gasEstimate
try {
gasEstimate = await foreignBridge.methods
.executeSignatures(v, r, s, message)
.estimateGas()
} catch (e) {
if (e.message.includes('Invalid JSON RPC response')) {
throw new Error(
`RPC Connection Error: executeSignatures Gas Estimate cannot be obtained.`
)
}
logger.info(
{ eventTransactionHash: colSignature.transactionHash },
`Already processed CollectedSignatures ${colSignature.transactionHash}`
)
return
}
const data = await foreignBridge.methods.executeSignatures(v, r, s, message).encodeABI()
txToSend.push({
data,
gasEstimate,
transactionReference: colSignature.transactionHash,
to: config.foreignBridgeAddress
})
} else {
logger.info(
{ eventTransactionHash: colSignature.transactionHash },
`Already processed CollectedSignatures ${colSignature.transactionHash}`
`Validator not responsible for relaying CollectedSignatures ${
colSignature.transactionHash
}`
)
return
}
const data = await foreignBridge.methods.executeSignatures(v, r, s, message).encodeABI()
txToSend.push({
data,
gasEstimate,
transactionReference: colSignature.transactionHash,
to: config.foreignBridgeAddress
})
} else {
logger.info(
{ eventTransactionHash: colSignature.transactionHash },
`Validator not responsible for relaying CollectedSignatures ${
colSignature.transactionHash
}`
)
}
})
})
)

await Promise.all(callbacks)

Expand Down
82 changes: 45 additions & 37 deletions src/events/processSignatureRequests.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
require('dotenv').config()
const Web3 = require('web3')
const HttpListProvider = require('http-list-provider')
const promiseLimit = require('promise-limit')
const logger = require('../services/logger')
const rpcUrlsManager = require('../services/getRpcUrlsManager')
const { createMessage } = require('../utils/message')
const { MAX_CONCURRENT_EVENTS } = require('../utils/constants')

const { VALIDATOR_ADDRESS, VALIDATOR_ADDRESS_PRIVATE_KEY } = process.env

const limit = promiseLimit(MAX_CONCURRENT_EVENTS)

let expectedMessageLength = null

function processSignatureRequestsBuilder(config) {
Expand All @@ -21,51 +25,55 @@ function processSignatureRequestsBuilder(config) {
expectedMessageLength = await homeBridge.methods.requiredMessageLength().call()
}

const callbacks = signatureRequests.map(async signatureRequest => {
const { recipient, value } = signatureRequest.returnValues
const callbacks = signatureRequests.map(signatureRequest =>
limit(async () => {
const { recipient, value } = signatureRequest.returnValues

logger.info(
{ eventTransactionHash: signatureRequest.transactionHash, sender: recipient, value },
`Processing signatureRequest ${signatureRequest.transactionHash}`
)
logger.info(
{ eventTransactionHash: signatureRequest.transactionHash, sender: recipient, value },
`Processing signatureRequest ${signatureRequest.transactionHash}`
)

const message = createMessage({
recipient,
value,
transactionHash: signatureRequest.transactionHash,
bridgeAddress: config.foreignBridgeAddress,
expectedMessageLength
})
const message = createMessage({
recipient,
value,
transactionHash: signatureRequest.transactionHash,
bridgeAddress: config.foreignBridgeAddress,
expectedMessageLength
})

const signature = web3Home.eth.accounts.sign(message, `0x${VALIDATOR_ADDRESS_PRIVATE_KEY}`)
const signature = web3Home.eth.accounts.sign(message, `0x${VALIDATOR_ADDRESS_PRIVATE_KEY}`)

let gasEstimate
try {
gasEstimate = await homeBridge.methods
.submitSignature(signature.signature, message)
.estimateGas({ from: VALIDATOR_ADDRESS })
} catch (e) {
if (e.message.includes('Invalid JSON RPC response')) {
throw new Error(`RPC Connection Error: submitSignature Gas Estimate cannot be obtained.`)
let gasEstimate
try {
gasEstimate = await homeBridge.methods
.submitSignature(signature.signature, message)
.estimateGas({ from: VALIDATOR_ADDRESS })
} catch (e) {
if (e.message.includes('Invalid JSON RPC response')) {
throw new Error(
`RPC Connection Error: submitSignature Gas Estimate cannot be obtained.`
)
}
logger.info(
{ eventTransactionHash: signatureRequest.transactionHash },
`Already processed signatureRequest ${signatureRequest.transactionHash}`
)
return
}
logger.info(
{ eventTransactionHash: signatureRequest.transactionHash },
`Already processed signatureRequest ${signatureRequest.transactionHash}`
)
return
}

const data = await homeBridge.methods
.submitSignature(signature.signature, message)
.encodeABI({ from: VALIDATOR_ADDRESS })
const data = await homeBridge.methods
.submitSignature(signature.signature, message)
.encodeABI({ from: VALIDATOR_ADDRESS })

txToSend.push({
data,
gasEstimate,
transactionReference: signatureRequest.transactionHash,
to: config.homeBridgeAddress
txToSend.push({
data,
gasEstimate,
transactionReference: signatureRequest.transactionHash,
to: config.homeBridgeAddress
})
})
})
)

await Promise.all(callbacks)
return txToSend
Expand Down
Loading