diff --git a/lib/services/bitcoind/index.js b/lib/services/bitcoind/index.js index 801d5f92b..3ccf9981b 100644 --- a/lib/services/bitcoind/index.js +++ b/lib/services/bitcoind/index.js @@ -115,7 +115,6 @@ Bitcoin.prototype._wrapRPCError = function(errObj) { }; Bitcoin.prototype._getGenesisBlock = function(callback) { - var self = this; self.client.getBlockHash(0, function(err, response) { @@ -290,7 +289,6 @@ Bitcoin.prototype._connectProcess = function(config) { }; Bitcoin.prototype.start = function(callback) { - var self = this; if (!self.options.connect) { @@ -303,7 +301,10 @@ Bitcoin.prototype.start = function(callback) { throw new Error('Could not connect to any servers in connect array.'); } - self._initChain(function() { + self._initChain(function(err) { + if(err) { + return callback(err); + } log.info('Bitcoin Daemon Ready'); callback(); diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 4d481abb7..2dce9cafa 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -171,6 +171,10 @@ DB.prototype.start = function(callback) { log.info('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height); self._sync.sync(); + self._sync.once('synced', function() { + self.emit('synced'); + }); + }); }); @@ -239,43 +243,13 @@ DB.prototype.createKeyStream = function(op) { return stream; }; -DB.prototype.detectReorg = function(blocks) { - - var self = this; - - if (!blocks || blocks.length === 0) { - return; - } - - var tipHash = self.reorgTipHash || self.tip.hash; - var chainMembers = []; - - var loopIndex = 0; - var overallCounter = 0; - - while(overallCounter < blocks.length) { - - if (loopIndex >= blocks.length) { - overallCounter++; - loopIndex = 0; - } - - var prevHash = BufferUtil.reverse(blocks[loopIndex].header.prevHash).toString('hex'); - if (prevHash === tipHash) { - tipHash = blocks[loopIndex].hash; - chainMembers.push(blocks[loopIndex]); - } - loopIndex++; - - } - - for(var i = 0; i < blocks.length; i++) { - if (chainMembers.indexOf(blocks[i]) === -1) { - return blocks[i]; - } - self.reorgTipHash = blocks[i].hash; +DB.prototype.detectReorg = function(block) { + var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); + if(this.tip.hash !== prevHash) { + return true; } + return false; }; DB.prototype.handleReorg = function(forkBlock, callback) { diff --git a/lib/services/db/reorg.js b/lib/services/db/reorg.js index 42370c62e..693495116 100644 --- a/lib/services/db/reorg.js +++ b/lib/services/db/reorg.js @@ -18,6 +18,8 @@ Reorg.prototype.handleReorg = function(newBlockHash, callback) { return callback(err); } + log.info('Finding common ancestor for bitcore-node serial tip and bitcoind tip'); + self.findCommonAncestorAndNewHashes(self.db.tip.hash, newBlockHash, function(err, commonAncestor, newHashes) { if(err) { return callback(err); @@ -40,6 +42,8 @@ Reorg.prototype.handleConcurrentReorg = function(callback) { return callback(); } + log.info('Finding common ancestor for bitcore-node concurrent tip and bitcore-node serial tip'); + self.findCommonAncestorAndNewHashes(self.db.concurrentTip.hash, self.db.tip.hash, function(err, commonAncestor, newHashes) { if(err) { return callback(err); @@ -58,11 +62,14 @@ Reorg.prototype.handleConcurrentReorg = function(callback) { Reorg.prototype.rewindConcurrentTip = function(commonAncestor, callback) { var self = this; + log.info('Rewinding concurrent tip back to serial common ancestor'); + async.whilst( function() { return self.db.concurrentTip.hash !== commonAncestor; }, function(next) { + log.info('Rewinding concurrent block ' + self.db.concurrentTip.hash); self.db.getConcurrentBlockOperations(self.db.concurrentTip, false, function(err, operations) { if(err) { return next(err); @@ -96,7 +103,10 @@ Reorg.prototype.rewindConcurrentTip = function(commonAncestor, callback) { Reorg.prototype.fastForwardConcurrentTip = function(newHashes, callback) { var self = this; + log.info('Bringing concurrent tip to serial tip'); + async.eachSeries(newHashes, function(hash, next) { + log.info('Adding new concurrent block ' + hash); self.node.services.bitcoind.getBlock(hash, function(err, block) { if(err) { return next(err); @@ -124,6 +134,8 @@ Reorg.prototype.fastForwardConcurrentTip = function(newHashes, callback) { Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { var self = this; + log.info('Rewinding concurrent tip and serial tip back to bitcoind common ancestor'); + async.whilst( function() { return self.db.tip.hash !== commonAncestor; @@ -132,6 +144,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { async.parallel( [ function(next) { + log.info('Rewinding concurrent block ' + self.db.concurrentTip.hash); self.db.getConcurrentBlockOperations(self.db.concurrentTip, false, function(err, operations) { if(err) { return next(err); @@ -141,6 +154,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { }); }, function(next) { + log.info('Rewinding serial block ' + self.db.tip.hash); self.db.getSerialBlockOperations(self.db.tip, false, function(err, operations) { if(err) { return next(err); @@ -184,6 +198,8 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { Reorg.prototype.fastForwardBothTips = function(newHashes, callback) { var self = this; + log.info('Bringing concurrent tip and serial tip to bitcoind tip'); + async.eachSeries(newHashes, function(hash, next) { self.node.services.bitcoind.getBlock(hash, function(err, block) { if(err) { @@ -193,6 +209,7 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) { async.parallel( [ function(next) { + log.info('Adding new concurrent block ' + hash); self.db.getConcurrentBlockOperations(block, true, function(err, operations) { if(err) { return next(err); @@ -203,6 +220,7 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) { }); }, function(next) { + log.info('Adding new serial block ' + hash); self.db.getSerialBlockOperations(block, true, function(err, operations) { if(err) { return next(err); @@ -262,6 +280,8 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash return next(); } + log.info('Getting block header for main chain block ' + mainPosition); + self.node.services.bitcoind.getBlockHeader(mainPosition, function(err, mainBlockHeader) { if(err) { return next(err); @@ -281,6 +301,8 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash return next(); } + log.info('Getting block header for fork chain block ' + forkPosition); + self.node.services.bitcoind.getBlockHeader(forkPosition, function(err, forkBlockHeader) { if(err) { return next(err); @@ -336,6 +358,8 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash } } + log.info('Common ancestor found: ' + commonAncestor); + callback(null, commonAncestor, newHashes); } ); diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index 33728073c..da83e920b 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -6,7 +6,6 @@ var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var async = require('async'); var bitcore = require('bitcore-lib'); -var Block = bitcore.Block; var index = require('../../index'); var log = index.log; @@ -16,8 +15,7 @@ function BlockStream(highWaterMark, db, sync) { this.db = db; this.dbTip = this.db.tip; this.lastReadHeight = this.dbTip.__height; - this.lastEmittedHash = this.dbTip.hash; - this.queue = []; + this.lastReadHash = this.dbTip.hash; this.processing = false; this.bitcoind = this.db.bitcoind; } @@ -101,18 +99,9 @@ Sync.prototype.sync = function() { }; Sync.prototype._onFinish = function() { - var self = this; self.syncing = false; - if (self.forkBlock) { - self.db.handleReorg(self.forkBlock, function() { - self.forkBlock = null; - self.sync(); - }); - return; - } - self._startSubscriptions(); self.emit('synced'); @@ -147,91 +136,27 @@ Sync.prototype._handleErrors = function(stream) { BlockStream.prototype._read = function() { + var self = this; - if (this.lastEmittedHash === this.bitcoind.tiphash) { + if(this.lastReadHash === this.bitcoind.tiphash) { return this.push(null); } - this.queue.push(++this.lastReadHeight); - this._process(); -}; - -BlockStream.prototype._process = function() { - var self = this; - - if(self.processing) { - return; + if(this.lastReadHeight >= this.bitcoind.height) { + return this.push(null); } - this.processing = true; - - async.whilst( - function() { - return self.queue.length; - }, function(next) { - - var blockArgs = self.queue.slice(0, Math.min(5, self.queue.length)); - self.queue = self.queue.slice(blockArgs.length); - self._getBlocks(blockArgs, next); - - }, function(err) { - if(err) { - return self.emit('error', err); - } - self.processing = false; - } - ); -}; - - -BlockStream.prototype._getBlocks = function(heights, callback) { - - var self = this; - async.map(heights, function(height, next) { - - if (height === 0) { - var block = new Block(self.bitcoind.genesisBuffer); - block.__height = 0; - return next(null, block); - } - - self.bitcoind.getBlock(height, function(err, block) { - - if(err) { - return next(err); - } - - block.__height = height; - next(null, block); - }); - - - }, function(err, blocks) { - + self.bitcoind.getBlock(self.lastReadHeight + 1, function(err, block) { if(err) { - return callback(err); + // add new stack lines to err + return self.emit('error', new Error(err)); } - //at this point, we know that all blocks we've sent down the pipe - //have not been reorg'ed, but the new batch here might have been - self.sync.forkBlock = self.db.detectReorg(blocks); - - if (!self.sync.forkBlock) { - - for(var i = 0; i < blocks.length; i++) { - - self.lastEmittedHash = blocks[i].hash; - self.push(blocks[i]); - - } - - return callback(); - - } - - self.push(null); - callback(); + self.lastReadHeight++; + self.lastReadHash = block.hash; + block.__height = self.lastReadHeight; + self.push(block); }); }; @@ -266,6 +191,10 @@ ProcessSerial.prototype._write = function(block, enc, callback) { ProcessSerial.prototype._process = function(block, callback) { var self = this; + if(self.db.detectReorg(block)) { + return self.db.handleReorg(block, callback); + } + self.db.getSerialBlockOperations(block, true, function(err, operations) { if(err) { return callback(err); diff --git a/test/data/blocks.json b/test/data/blocks.json index 381bb9005..80a184492 100644 --- a/test/data/blocks.json +++ b/test/data/blocks.json @@ -1,5 +1,6 @@ { "genesis": "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000", "block1a": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f69965a91e7fc9ccccbe4051b74d086114741b96678f5e491b5609b18962252fd2d12f858ffff7f20040000000102000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0200f2052a0100000023210372bfaa748e546ba784a4d1395f5cedf673f9f5a8160effbe0f595fe905fb3e59ac0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf900000000", - "block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000" + "block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000", + "block2b": "00000020d459cd53b1e7fb9c9b7bdd7c48b571da9b0df08ff68b13cb6784b5b18761512e206072ebec8302cf43f859245d61dc304eaced26703e5d2fa035ae8068c6196ae0ede058ffff7f20010000000101000000000100f2052a010000001976a9142a48bf892a5461dffe8c68fe209be16a84289ca488ac00000000" } diff --git a/test/services/db/index.unit.js b/test/services/db/index.unit.js deleted file mode 100644 index 37731ef7b..000000000 --- a/test/services/db/index.unit.js +++ /dev/null @@ -1,56 +0,0 @@ -'use strict'; - -var expect = require('chai').expect; -var bitcore = require('bitcore-lib'); -var DB = require('../../../lib/services/db'); - -describe('DB', function() { - - describe('Reorg', function() { - - before(function() { - this.db = new DB({ - node: { - network: bitcore.Networks.testnet, - datadir: '/tmp', - services: '' - } - }); - this.db.tip = { hash: 'ff', height: 444 }; - }); - - it('should detect a reorg from a common ancenstor that is in our set', function() { - var block1 = { hash: '11', header: { prevHash: new Buffer('ff', 'hex') } }; - var block2 = { hash: '22', header: { prevHash: new Buffer('11', 'hex') } }; - var block3 = { hash: '33', header: { prevHash: new Buffer('22', 'hex') } }; - var block4 = { hash: '44', header: { prevHash: new Buffer('22', 'hex') } }; - //blocks must be passed in the order that they are received. - var blocks = [ block3, block2, block1, block4 ]; - expect(this.db.detectReorg(blocks)).to.deep.equal(block3); - - }); - - it('should detect a reorg from a common ancenstor that is not in our set', function() { - var block1 = { hash: '11', header: { prevHash: new Buffer('ff', 'hex') } }; - var block2 = { hash: '22', header: { prevHash: new Buffer('11', 'hex') } }; - var block3 = { hash: '33', header: { prevHash: new Buffer('22', 'hex') } }; - var block4 = { hash: '44', header: { prevHash: new Buffer('ee', 'hex') } }; - var blocks = [ block3, block2, block1, block4 ]; - expect(this.db.detectReorg(blocks)).to.deep.equal(block4); - - }); - - it('should not detect a reorg', function() { - this.db.reorgTipHash = null; - var block1 = { hash: '11', header: { prevHash: new Buffer('ff', 'hex') } }; - var block2 = { hash: '22', header: { prevHash: new Buffer('11', 'hex') } }; - var block3 = { hash: '33', header: { prevHash: new Buffer('22', 'hex') } }; - var block4 = { hash: '44', header: { prevHash: new Buffer('33', 'hex') } }; - var blocks = [ block3, block2, block1, block4 ]; - var actual = this.db.detectReorg(blocks); - expect(actual).to.be.undefined; - }); - - }); -}); - diff --git a/test/services/db/reorg.integration.js b/test/services/db/reorg.integration.js index 8f80a9fc1..fd328f6fe 100644 --- a/test/services/db/reorg.integration.js +++ b/test/services/db/reorg.integration.js @@ -5,8 +5,10 @@ var sinon = require('sinon'); var bitcore = require('bitcore-lib'); var BufferUtil = bitcore.util.buffer; var DB = require('../../../lib/services/db'); +var Sync = require('../../../lib/services/db/sync.js'); var Networks = bitcore.Networks; var EventEmitter = require('events').EventEmitter; +var inherits = require('util').inherits; var rimraf = require('rimraf'); var mkdirp = require('mkdirp'); var blocks = require('../../data/blocks.json'); @@ -16,8 +18,59 @@ describe('DB', function() { var bitcoind = { on: function(event, callback) { + console.log('event', event); }, - genesisBuffer: blocks.genesis + genesisBuffer: blocks.genesis, + tiphash: '2e516187b1b58467cb138bf68ff00d9bda71b5487cdd7b9b9cfbe7b153cd59d4', + height: 2, + getBlock: function(heightHash, callback) { + var self = this; + setImmediate(function() { + switch(heightHash) { + case 0: + return callback(null, bitcore.Block.fromString(blocks.genesis)); + case 1: + return callback(null, bitcore.Block.fromString(blocks.block1a)); + case 2: + return callback(null, bitcore.Block.fromString(blocks.block2b)); + case '0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206': + return callback(null, bitcore.Block.fromString(blocks.genesis)); + case '77d0b8043d3a1353ffd22ad70e228e30c15fd0f250d51d608b1b7997e6239ffb': + return callback(null, bitcore.Block.fromString(blocks.block1a)); + case '2e516187b1b58467cb138bf68ff00d9bda71b5487cdd7b9b9cfbe7b153cd59d4': + return callback(null, bitcore.Block.fromString(blocks.block1b)); + case 'a0eadacf7ac5d613edea275ad1f3375689cd025f97b2fc73a27d04f745c46996': + return callback(null, bitcore.Block.fromString(blocks.block2b)); + default: + return callback(new Error('height/hash out of range')); + } + }); + }, + getBlockHeader: function(hash, callback) { + var self = this; + setImmediate(function() { + switch(hash) { + case '0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206': + // genesis + return callback(null, null); + case '77d0b8043d3a1353ffd22ad70e228e30c15fd0f250d51d608b1b7997e6239ffb': + // 1a + return callback(null, { + prevHash: '0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206' + }); + case '2e516187b1b58467cb138bf68ff00d9bda71b5487cdd7b9b9cfbe7b153cd59d4': + // 1b + return callback(null, { + prevHash: '0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206' + }); + case 'a0eadacf7ac5d613edea275ad1f3375689cd025f97b2fc73a27d04f745c46996': + // 2b + return callback(null, { + prevHash: '2e516187b1b58467cb138bf68ff00d9bda71b5487cdd7b9b9cfbe7b153cd59d4' + }); + } + }); + } }; var node = { @@ -25,13 +78,20 @@ describe('DB', function() { datadir: '/tmp/datadir', services: { bitcoind: bitcoind }, on: sinon.stub(), - once: sinon.stub() + once: function(event, callback) { + if(event === 'ready') { + setImmediate(callback); + } + }, + openBus: sinon.stub() }; before(function(done) { var self = this; + sinon.stub(Sync.prototype, '_startSubscriptions'); + rimraf(node.datadir, function(err) { if(err) { return done(err); @@ -40,15 +100,33 @@ describe('DB', function() { }); this.db = new DB({node: node}); + sinon.spy(this.db, 'printTipInfo'); this.emitter = new EventEmitter(); }); + after(function() { + Sync.prototype._startSubscriptions.restore(); + }); + describe('Reorg', function() { - it('should start db service', function(done) { - this.db.start(done); + this.db.start(function(err) { + should.not.exist(err); + done(); + }); + }); + + it('should reorg successfully', function(done) { + var self = this; + + this.db.on('synced', function() { + self.db.printTipInfo.callCount.should.equal(2); + self.db.printTipInfo.args[0][0].should.equal('Reorg detected!'); + self.db.printTipInfo.args[1][0].should.equal('Reorg successful!'); + done(); + }) }); }); diff --git a/test/services/db/reorg.unit.js b/test/services/db/reorg.unit.js index c49891f3f..60e71c089 100644 --- a/test/services/db/reorg.unit.js +++ b/test/services/db/reorg.unit.js @@ -50,9 +50,7 @@ describe('Reorg', function() { var db = { tip: tipBlocks[3], concurrentTip: concurrentBlocks[4], - store: { - batch: sinon.stub().callsArg(1) - }, + batch: sinon.stub().callsArg(1), getConcurrentBlockOperations: sinon.stub().callsArgWith(2, null, []), getSerialBlockOperations: sinon.stub().callsArgWith(2, null, []), getConcurrentTipOperation: sinon.stub().returns(null),