Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve sync and fix reorg #507

Open
wants to merge 4 commits into
base: 4.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions lib/services/bitcoind/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ Bitcoin.prototype._wrapRPCError = function(errObj) {
};

Bitcoin.prototype._getGenesisBlock = function(callback) {

var self = this;

self.client.getBlockHash(0, function(err, response) {
Expand Down Expand Up @@ -290,7 +289,6 @@ Bitcoin.prototype._connectProcess = function(config) {
};

Bitcoin.prototype.start = function(callback) {

var self = this;

if (!self.options.connect) {
Expand All @@ -303,7 +301,10 @@ Bitcoin.prototype.start = function(callback) {
throw new Error('Could not connect to any servers in connect array.');
}

self._initChain(function() {
self._initChain(function(err) {
if(err) {
return callback(err);
}

log.info('Bitcoin Daemon Ready');
callback();
Expand Down
44 changes: 9 additions & 35 deletions lib/services/db/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ DB.prototype.start = function(callback) {
log.info('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height);
self._sync.sync();

self._sync.once('synced', function() {
self.emit('synced');
});

});

});
Expand Down Expand Up @@ -239,43 +243,13 @@ DB.prototype.createKeyStream = function(op) {
return stream;
};

DB.prototype.detectReorg = function(blocks) {

var self = this;

if (!blocks || blocks.length === 0) {
return;
}

var tipHash = self.reorgTipHash || self.tip.hash;
var chainMembers = [];

var loopIndex = 0;
var overallCounter = 0;

while(overallCounter < blocks.length) {

if (loopIndex >= blocks.length) {
overallCounter++;
loopIndex = 0;
}

var prevHash = BufferUtil.reverse(blocks[loopIndex].header.prevHash).toString('hex');
if (prevHash === tipHash) {
tipHash = blocks[loopIndex].hash;
chainMembers.push(blocks[loopIndex]);
}
loopIndex++;

}

for(var i = 0; i < blocks.length; i++) {
if (chainMembers.indexOf(blocks[i]) === -1) {
return blocks[i];
}
self.reorgTipHash = blocks[i].hash;
DB.prototype.detectReorg = function(block) {
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
if(this.tip.hash !== prevHash) {
return true;
}

return false;
};

DB.prototype.handleReorg = function(forkBlock, callback) {
Expand Down
24 changes: 24 additions & 0 deletions lib/services/db/reorg.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Reorg.prototype.handleReorg = function(newBlockHash, callback) {
return callback(err);
}

log.info('Finding common ancestor for bitcore-node serial tip and bitcoind tip');

self.findCommonAncestorAndNewHashes(self.db.tip.hash, newBlockHash, function(err, commonAncestor, newHashes) {
if(err) {
return callback(err);
Expand All @@ -40,6 +42,8 @@ Reorg.prototype.handleConcurrentReorg = function(callback) {
return callback();
}

log.info('Finding common ancestor for bitcore-node concurrent tip and bitcore-node serial tip');

self.findCommonAncestorAndNewHashes(self.db.concurrentTip.hash, self.db.tip.hash, function(err, commonAncestor, newHashes) {
if(err) {
return callback(err);
Expand All @@ -58,11 +62,14 @@ Reorg.prototype.handleConcurrentReorg = function(callback) {
Reorg.prototype.rewindConcurrentTip = function(commonAncestor, callback) {
var self = this;

log.info('Rewinding concurrent tip back to serial common ancestor');

async.whilst(
function() {
return self.db.concurrentTip.hash !== commonAncestor;
},
function(next) {
log.info('Rewinding concurrent block ' + self.db.concurrentTip.hash);
self.db.getConcurrentBlockOperations(self.db.concurrentTip, false, function(err, operations) {
if(err) {
return next(err);
Expand Down Expand Up @@ -96,7 +103,10 @@ Reorg.prototype.rewindConcurrentTip = function(commonAncestor, callback) {
Reorg.prototype.fastForwardConcurrentTip = function(newHashes, callback) {
var self = this;

log.info('Bringing concurrent tip to serial tip');

async.eachSeries(newHashes, function(hash, next) {
log.info('Adding new concurrent block ' + hash);
self.node.services.bitcoind.getBlock(hash, function(err, block) {
if(err) {
return next(err);
Expand Down Expand Up @@ -124,6 +134,8 @@ Reorg.prototype.fastForwardConcurrentTip = function(newHashes, callback) {
Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
var self = this;

log.info('Rewinding concurrent tip and serial tip back to bitcoind common ancestor');

async.whilst(
function() {
return self.db.tip.hash !== commonAncestor;
Expand All @@ -132,6 +144,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
async.parallel(
[
function(next) {
log.info('Rewinding concurrent block ' + self.db.concurrentTip.hash);
self.db.getConcurrentBlockOperations(self.db.concurrentTip, false, function(err, operations) {
if(err) {
return next(err);
Expand All @@ -141,6 +154,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
});
},
function(next) {
log.info('Rewinding serial block ' + self.db.tip.hash);
self.db.getSerialBlockOperations(self.db.tip, false, function(err, operations) {
if(err) {
return next(err);
Expand Down Expand Up @@ -184,6 +198,8 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
Reorg.prototype.fastForwardBothTips = function(newHashes, callback) {
var self = this;

log.info('Bringing concurrent tip and serial tip to bitcoind tip');

async.eachSeries(newHashes, function(hash, next) {
self.node.services.bitcoind.getBlock(hash, function(err, block) {
if(err) {
Expand All @@ -193,6 +209,7 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) {
async.parallel(
[
function(next) {
log.info('Adding new concurrent block ' + hash);
self.db.getConcurrentBlockOperations(block, true, function(err, operations) {
if(err) {
return next(err);
Expand All @@ -203,6 +220,7 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) {
});
},
function(next) {
log.info('Adding new serial block ' + hash);
self.db.getSerialBlockOperations(block, true, function(err, operations) {
if(err) {
return next(err);
Expand Down Expand Up @@ -262,6 +280,8 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash
return next();
}

log.info('Getting block header for main chain block ' + mainPosition);

self.node.services.bitcoind.getBlockHeader(mainPosition, function(err, mainBlockHeader) {
if(err) {
return next(err);
Expand All @@ -281,6 +301,8 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash
return next();
}

log.info('Getting block header for fork chain block ' + forkPosition);

self.node.services.bitcoind.getBlockHeader(forkPosition, function(err, forkBlockHeader) {
if(err) {
return next(err);
Expand Down Expand Up @@ -336,6 +358,8 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash
}
}

log.info('Common ancestor found: ' + commonAncestor);

callback(null, commonAncestor, newHashes);
}
);
Expand Down
103 changes: 16 additions & 87 deletions lib/services/db/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var async = require('async');
var bitcore = require('bitcore-lib');
var Block = bitcore.Block;
var index = require('../../index');
var log = index.log;

Expand All @@ -16,8 +15,7 @@ function BlockStream(highWaterMark, db, sync) {
this.db = db;
this.dbTip = this.db.tip;
this.lastReadHeight = this.dbTip.__height;
this.lastEmittedHash = this.dbTip.hash;
this.queue = [];
this.lastReadHash = this.dbTip.hash;
this.processing = false;
this.bitcoind = this.db.bitcoind;
}
Expand Down Expand Up @@ -101,18 +99,9 @@ Sync.prototype.sync = function() {
};

Sync.prototype._onFinish = function() {

var self = this;
self.syncing = false;

if (self.forkBlock) {
self.db.handleReorg(self.forkBlock, function() {
self.forkBlock = null;
self.sync();
});
return;
}

self._startSubscriptions();
self.emit('synced');

Expand Down Expand Up @@ -147,91 +136,27 @@ Sync.prototype._handleErrors = function(stream) {


BlockStream.prototype._read = function() {
var self = this;

if (this.lastEmittedHash === this.bitcoind.tiphash) {
if(this.lastReadHash === this.bitcoind.tiphash) {
return this.push(null);
}

this.queue.push(++this.lastReadHeight);
this._process();
};

BlockStream.prototype._process = function() {
var self = this;

if(self.processing) {
return;
if(this.lastReadHeight >= this.bitcoind.height) {
return this.push(null);
}

this.processing = true;

async.whilst(
function() {
return self.queue.length;
}, function(next) {

var blockArgs = self.queue.slice(0, Math.min(5, self.queue.length));
self.queue = self.queue.slice(blockArgs.length);
self._getBlocks(blockArgs, next);

}, function(err) {
if(err) {
return self.emit('error', err);
}
self.processing = false;
}
);
};


BlockStream.prototype._getBlocks = function(heights, callback) {

var self = this;
async.map(heights, function(height, next) {

if (height === 0) {
var block = new Block(self.bitcoind.genesisBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this removed, the require on line 9 is no longer used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed unnecessary require

block.__height = 0;
return next(null, block);
}

self.bitcoind.getBlock(height, function(err, block) {

if(err) {
return next(err);
}

block.__height = height;
next(null, block);
});


}, function(err, blocks) {

self.bitcoind.getBlock(self.lastReadHeight + 1, function(err, block) {
if(err) {
return callback(err);
// add new stack lines to err
return self.emit('error', new Error(err));
}

//at this point, we know that all blocks we've sent down the pipe
//have not been reorg'ed, but the new batch here might have been
self.sync.forkBlock = self.db.detectReorg(blocks);

if (!self.sync.forkBlock) {

for(var i = 0; i < blocks.length; i++) {

self.lastEmittedHash = blocks[i].hash;
self.push(blocks[i]);

}

return callback();

}

self.push(null);
callback();
self.lastReadHeight++;
self.lastReadHash = block.hash;

block.__height = self.lastReadHeight;
self.push(block);
});
};

Expand Down Expand Up @@ -266,6 +191,10 @@ ProcessSerial.prototype._write = function(block, enc, callback) {
ProcessSerial.prototype._process = function(block, callback) {
var self = this;

if(self.db.detectReorg(block)) {
return self.db.handleReorg(block, callback);
}

self.db.getSerialBlockOperations(block, true, function(err, operations) {
if(err) {
return callback(err);
Expand Down
3 changes: 2 additions & 1 deletion test/data/blocks.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"genesis": "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000",
"block1a": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f69965a91e7fc9ccccbe4051b74d086114741b96678f5e491b5609b18962252fd2d12f858ffff7f20040000000102000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0200f2052a0100000023210372bfaa748e546ba784a4d1395f5cedf673f9f5a8160effbe0f595fe905fb3e59ac0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf900000000",
"block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000"
"block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000",
"block2b": "00000020d459cd53b1e7fb9c9b7bdd7c48b571da9b0df08ff68b13cb6784b5b18761512e206072ebec8302cf43f859245d61dc304eaced26703e5d2fa035ae8068c6196ae0ede058ffff7f20010000000101000000000100f2052a010000001976a9142a48bf892a5461dffe8c68fe209be16a84289ca488ac00000000"
}
Loading