diff --git a/core.js b/core.js index d0f76a55..37139b15 100644 --- a/core.js +++ b/core.js @@ -24,6 +24,7 @@ const { jitIndexesPath, resetLevelPath, resetPrivatePath, + reindexEncryptedInProgressPath, } = require('./defaults') const { onceWhen, ReadyGate, onceWhenPromise } = require('./utils') const ThrottleBatchAdd = require('./throttle-batch') @@ -45,10 +46,10 @@ const { deferred, asOffsets, isEncrypted, - toCallback, batch, toPullStream, } = operators +const isBrowser = typeof window !== 'undefined' exports.name = 'db' @@ -111,6 +112,8 @@ exports.init = function (sbot, config) { const stateFeedsReady = Obv().set(false) const secretStackLoaded = new ReadyGate() const indexesStateLoaded = new ReadyGate() + const reindexingLock = mutexify() + const reindexedValues = Notify() sbot.close.hook(function (fn, args) { close((err) => { @@ -136,6 +139,11 @@ exports.init = function (sbot, config) { stateLoadedPromises.push(indexes[indexName].stateLoaded) } Promise.all(stateLoadedPromises).then(() => { + if (!isBrowser && fs.existsSync(reindexEncryptedInProgressPath(dir))) { + reindexEncrypted(() => { + debug('done reindexing encrypted from a previous session') + }) + } indexesStateLoaded.setReady() }) }) @@ -987,10 +995,10 @@ exports.init = function (sbot, config) { }) } - const reindexingLock = mutexify() - const reindexedValues = Notify() - function reindexEncrypted(cb) { + if (!isBrowser) { + fs.closeSync(fs.openSync(reindexEncryptedInProgressPath(dir), 'w')) + } indexingActive.set(indexingActive.value + 1) reindexingLock((unlock) => { pull( @@ -1042,6 +1050,9 @@ exports.init = function (sbot, config) { // prettier-ignore if (err) return unlock(cb, new Error('reindexEncrypted() failed to force-flush indexes', {cause: err})) indexingActive.set(indexingActive.value - 1) + if (!isBrowser) { + rimraf.sync(reindexEncryptedInProgressPath(dir)) + } unlock(cb) }) }) diff --git a/defaults.js b/defaults.js index c91ea385..4a133d7f 100644 --- a/defaults.js +++ b/defaults.js @@ -13,6 +13,8 @@ exports.resetLevelPath = (dir) => path.join(dir, 'db2', 'post-compact-reset-level') exports.resetPrivatePath = (dir) => path.join(dir, 'db2', 'post-compact-reset-private') +exports.reindexEncryptedInProgressPath = (dir) => + path.join(dir, 'db2', 'reindex-encrypted-wip') exports.jitIndexesPath = (dir) => path.join(dir, 'db2', 'jit') exports.tooHotOpts = (config) => config.db2 diff --git a/test/reindex-encrypted.js b/test/reindex-encrypted.js index f825188f..79dfff4b 100644 --- a/test/reindex-encrypted.js +++ b/test/reindex-encrypted.js @@ -5,6 +5,8 @@ const test = require('tape') const ssbKeys = require('ssb-keys') const pify = require('util').promisify +const pull = require('pull-stream') +const push = require('push-stream') const rimraf = require('rimraf') const mkdirp = require('mkdirp') const SecretStack = require('secret-stack') @@ -173,3 +175,170 @@ test('box2 group reindex larger', async (t) => { await Promise.all([pify(alice.close)(true), pify(bob.close)(true)]) t.end() }) + +test('reindexEncrypted is crash resistant', async (t) => { + // Create group keys + const groupKey = Buffer.from( + '30720d8f9cbf37f6d7062826f6decac93e308060a8aaaa77e6a4747f40ee1a76', + 'hex' + ) + const groupId = '%Aihvp+fMdt5CihjbOY6eZc0qCe0eKsrN2wfgXV2E3PM=.cloaked' + + // Setup Alice + const dirAlice = '/tmp/ssb-db2-box2-group-reindex3-alice' + rimraf.sync(dirAlice) + mkdirp.sync(dirAlice) + const keysAlice = ssbKeys.generate(null, 'alice') + const alice = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .call(null, { + keys: keysAlice, + path: dirAlice, + }) + alice.box2.addGroupInfo(groupId, { key: groupKey }) + + // Setup Bob + const dirBob = '/tmp/ssb-db2-box2-group-reindex3-bob' + rimraf.sync(dirBob) + mkdirp.sync(dirBob) + const keysBob = ssbKeys.generate(null, 'bob') + let bob = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .use(require('../full-mentions')) + .call(null, { + keys: keysBob, + path: dirBob, + }) + + // Alice publishes 5 messages, some of them box2 + let opts1 = { + keys: keysAlice, + content: { type: 'about', text: 'not super secret1' }, + } + let opts2 = { + keys: keysAlice, + content: { + type: 'post', + text: 'super secret2', + mentions: [{ link: bob.id }], + }, + recps: [groupId], + encryptionFormat: 'box2', + } + let opts3 = { + content: { type: 'weird' }, + keys: keysAlice, + } + let opts4 = { + content: { type: 'about', text: 'super secret4', recps: [groupId] }, + keys: keysAlice, + encryptionFormat: 'box2', + } + let opts5 = { + content: { + type: 'post', + text: 'super secret5', + mentions: [{ link: bob.id }], + recps: [groupId], + }, + keys: keysAlice, + encryptionFormat: 'box2', + } + + const msg1 = await pify(alice.db.create)(opts1) + const msg2 = await pify(alice.db.create)(opts2) + const msg3 = await pify(alice.db.create)(opts3) + const msg4 = await pify(alice.db.create)(opts4) + const msg5 = await pify(alice.db.create)(opts5) + t.pass('alice published 5 messages') + + t.notEqual(typeof msg1.value.content, 'string', 'msg1 is public about') + t.true(msg2.value.content.endsWith('.box2'), 'msg2 is group box2 post') + t.notEqual(typeof msg3.value.content, 'string', 'msg3 is public weird') + t.true(msg4.value.content.endsWith('.box2'), 'msg4 is group box2 about') + t.true(msg5.value.content.endsWith('.box2'), 'msg5 is group box2 post') + + await pify(bob.db.add)(msg1.value) + await pify(bob.db.add)(msg2.value) + await pify(bob.db.add)(msg3.value) + await pify(bob.db.add)(msg4.value) + await pify(bob.db.add)(msg5.value) + t.pass('bob added all 5 messages') + + // Get offsets of those messages + const bobLog = bob.db.getLog() + const offsets = await new Promise((resolve) => { + bobLog.stream({ offsets: true, values: false }).pipe( + push.collect((err, ary) => { + t.error(err, 'no error when streaming bobLog') + resolve(ary) + }) + ) + }) + + const results1 = await bob.db.query( + where(and(author(alice.id), fullMentions(bob.id))), + toPromise() + ) + t.equal(results1.length, 0, 'bob has no box2 mentions from alice') + + // Bob joins group and is able to decrypt some messages + bob.box2.addGroupInfo(groupId, { key: groupKey }) + t.pass('bob joined the group') + + // Wait for private indexes to be saved to disk + await pify(setTimeout)(4000) + + // Hack log.get to make it crash on the 2nd box2 msg + const originalGet = bobLog.get + bobLog.get = (offset, cb) => { + if (offset === offsets[3]) { + // offsets[3] is the 4th msg + return cb(new Error('simulated crash')) + } else { + originalGet.call(bobLog, offset, cb) + } + } + + try { + await pify(bob.db.reindexEncrypted)() + t.fail('reindexEncrypted should have thrown') + } catch (err) { + t.true(err.cause.message.includes('simulated crash'), 'simulated crash') + } + + await Promise.all([pify(alice.close)(true), pify(bob.close)(true)]) + + bob = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .use(require('../full-mentions')) + .call(null, { + keys: keysBob, + path: dirBob, + }) + + const streamed = [] + pull( + bob.db.reindexed(), + pull.drain((msg) => { + streamed.push(msg.value.content.text) + }) + ) + + await pify(setTimeout)(5000) + + const results2 = await bob.db.query( + where(and(author(alice.id), fullMentions(bob.id))), + toPromise() + ) + t.equal(results2.length, 2, 'bob has two box2 mentions from alice') + + t.deepEquals( + streamed, + ['super secret2', 'super secret4', 'super secret5'], + 'reindexed stream correct' + ) + + await pify(bob.close)(true) + t.end() +})