forked from jondubois/capitalisk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
loader.js
348 lines (315 loc) · 8.96 KB
/
loader.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
/*
* Copyright © 2019 Lisk Foundation
*
* See the LICENSE file at the top-level directory of this distribution
* for licensing information.
*
* Unless otherwise agreed in a custom licensing agreement with the Lisk Foundation,
* no part of this software, including this file, may be copied, modified,
* propagated, or distributed except according to the terms contained in the
* LICENSE file.
*
* Removal or modification of this copyright notice is prohibited.
*/
'use strict';
const async = require('async');
const { Status: TransactionStatus } = require('@liskhq/lisk-transactions');
const { validator } = require('@liskhq/lisk-validator');
const { validateTransactions } = require('./transactions');
const { CommonBlockError } = require('./utils/error_handlers');
const definitions = require('./schema/definitions');
/**
* Main loader methods. Initializes this with scope content.
* Calls private function initialize.
*
* @class
* @memberof modules
* @see Parent: {@link modules}
* @requires async
* @requires utils/jobs_queue
* @requires logic/peer
* @param {function} cb - Callback function
* @param {scope} scope - App instance
*/
class Loader {
constructor({
moduleAlias,
// components
channel,
logger,
storage,
cache,
// Unique requirements
genesisBlock,
// Modules
transactionPoolModule,
blocksModule,
peersModule,
interfaceAdapters,
// Constants
loadPerIteration,
rebuildUpToRound,
syncingActive,
}) {
this.isActive = false;
this.total = 0;
this.blocksToSync = 0;
this.retries = 5;
this.moduleAlias = moduleAlias;
this.channel = channel;
this.logger = logger;
this.storage = storage;
// TODO: Remove cache
this.cache = cache;
this.genesisBlock = genesisBlock;
this.constants = {
loadPerIteration,
rebuildUpToRound,
syncingActive,
};
this.transactionPoolModule = transactionPoolModule;
this.blocksModule = blocksModule;
this.peersModule = peersModule;
this.interfaceAdapters = interfaceAdapters;
}
/**
* Checks if private constant syncIntervalId has value.
*
* @returns {boolean} True if syncIntervalId has value
*/
syncing() {
return !!this.isActive;
}
/**
* Pulls Transactions.
*/
async loadUnconfirmedTransactions() {
await new Promise(resolve => {
async.retry(
this.retries,
async () => this._getUnconfirmedTransactionsFromNetwork(),
err => {
if (err) {
this.logger.error('Unconfirmed transactions loader', err);
}
resolve();
},
);
});
}
/**
* Performs sync operation:
* - Undoes unconfirmed transactions.
* - Establishes broadhash consensus before sync.
* - Performs sync operation: loads blocks from network.
* - Update headers: broadhash and height
* - Notify remote peers about our new headers
* - Establishes broadhash consensus after sync.
* - Applies unconfirmed transactions.
*
* @private
* @param {function} cb
* @todo Check err actions
* @todo Add description for the params
*/
async sync() {
this.logger.info('Starting sync');
if (this.cache.ready) {
this.cache.disable();
}
this.isActive = true;
const consensusBefore = await this.peersModule.calculateConsensus(
this.blocksModule.broadhash,
);
this.logger.debug(
`Establishing broadhash consensus before sync: ${consensusBefore} %`,
);
await this._loadBlocksFromNetwork();
const consensusAfter = await this.peersModule.calculateConsensus(
this.blocksModule.broadhash,
);
this.logger.debug(
`Establishing broadhash consensus after sync: ${consensusAfter} %`,
);
this.isActive = false;
this.blocksToSync = 0;
this.logger.info('Finished sync');
if (this.cache.ready) {
this.cache.enable();
}
}
/**
* Loads transactions from the network:
* - Validates each transaction from the network and applies a penalty if invalid.
* - Calls processUnconfirmedTransaction for each transaction.
*
* @private
* @returns {setImmediateCallback} cb, err
* @todo Add description for the params
*/
async _getUnconfirmedTransactionsFromNetwork() {
this.logger.info('Loading transactions from the network');
const { data: result } = await this.channel.invoke('network:request', {
procedure: `${this.moduleAlias}:getTransactions`,
});
const validatorErrors = validator.validate(
definitions.WSTransactionsResponse,
result,
);
if (validatorErrors.length) {
throw validatorErrors;
}
const transactions = result.transactions.map(tx =>
this.interfaceAdapters.transactions.fromJson(tx),
);
try {
const { transactionsResponses } = validateTransactions()(transactions);
const invalidTransactionResponse = transactionsResponses.find(
transactionResponse =>
transactionResponse.status !== TransactionStatus.OK,
);
if (invalidTransactionResponse) {
throw invalidTransactionResponse.errors;
}
} catch (errors) {
const error =
Array.isArray(errors) && errors.length > 0 ? errors[0] : errors;
this.logger.debug('Transaction normalization failed', {
id: error.id,
err: error.toString(),
module: 'loader',
});
throw error;
}
const transactionCount = transactions.length;
// eslint-disable-next-line no-plusplus
for (let i = 0; i < transactionCount; i++) {
const transaction = transactions[i];
try {
/* eslint-disable-next-line */
transaction.bundled = true;
// eslint-disable-next-line no-await-in-loop
await this.transactionPoolModule.processUnconfirmedTransaction(
transaction,
);
} catch (error) {
this.logger.error(error);
throw error;
}
}
}
/**
* Loads blocks from network.
*
* @private
* @returns {Promise} void
* @todo Add description for the params
*/
async _getBlocksFromNetwork() {
const { lastBlock } = this.blocksModule;
// TODO: If there is an error, invoke the applyPenalty action on the Network module once it is implemented.
const { data } = await this.channel.invoke('network:request', {
procedure: `${this.moduleAlias}:blocks`,
data: {
lastBlockId: lastBlock.id,
},
});
if (!data) {
throw new Error('Received an invalid blocks response from the network');
}
// Check for strict equality for backwards compatibility reasons.
// The misspelled data.sucess is required to support v1 nodes.
// TODO: Remove the misspelled data.sucess === false condition once enough nodes have migrated to v2.
if (data.success === false || data.sucess === false) {
throw new CommonBlockError(
'Peer did not have a matching lastBlockId.',
lastBlock.id,
);
}
return data.blocks;
}
/**
* Validate blocks from the network.
*
* @private
* @returns {Promise} void
* @todo Add description for the params
*/
// eslint-disable-next-line class-methods-use-this
async _validateBlocks(blocks) {
const errors = validator.validate(definitions.WSBlocksList, blocks);
if (errors.length) {
throw new Error('Received invalid blocks data');
}
return blocks;
}
/**
* Loads valided blocks from network.
*
* @private
* @returns {Promise} void
* @todo Add description for the params
*/
async _getValidatedBlocksFromNetwork(blocks) {
const { lastBlock } = this.blocksModule;
const lastValidBlock = await this.blocksModule.loadBlocksFromNetwork(
blocks,
);
this.blocksToSync = lastValidBlock.height;
return lastValidBlock.id === lastBlock.id;
}
/**
* Loads blocks from network.
*
* @private
* @returns {Promise} void
* @todo Add description for the params
*/
async _loadBlocksFromNetwork() {
// Number of failed attempts to load from the network.
let failedAttemptsToLoad = 0;
// If True, own node's db contains all the blocks from the last block request.
let loaded = false;
while (!loaded && failedAttemptsToLoad < 5) {
try {
// eslint-disable-next-line no-await-in-loop
const blocksFromNetwork = await this._getBlocksFromNetwork();
// eslint-disable-next-line no-await-in-loop
const blocksAfterValidate = await this._validateBlocks(
blocksFromNetwork,
);
// eslint-disable-next-line no-await-in-loop
loaded = await this._getValidatedBlocksFromNetwork(blocksAfterValidate);
// Reset counter after a batch of blocks was successfully loaded from the network
failedAttemptsToLoad = 0;
} catch (err) {
failedAttemptsToLoad += 1;
// eslint-disable-next-line no-await-in-loop
await this._handleCommonBlockError(err);
this.logger.warn(
{ error: err },
'Failed to load blocks from the network.',
);
}
}
}
async _handleCommonBlockError(error) {
if (!(error instanceof CommonBlockError)) {
return;
}
if (this.peersModule.isPoorConsensus(this.blocksModule.broadhash)) {
this.logger.debug('Perform chain recovery due to poor consensus');
try {
// eslint-disable-next-line no-await-in-loop
await this.blocksModule.recoverChain();
} catch (recoveryError) {
this.logger.error(
{ error: recoveryError },
'Chain recovery failed after failing to load blocks while network consensus was low.',
);
}
}
}
}
// Export
module.exports = { Loader };