From e9fc9adf288c202806ea4d640abd5c5b2fec5af7 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Tue, 31 Jan 2023 10:36:34 +0200 Subject: [PATCH 1/4] reindexEncrypted() should auto-resume after a crash --- core.js | 19 ++++- defaults.js | 2 + test/reindex-encrypted.js | 170 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 4 deletions(-) diff --git a/core.js b/core.js index d0f76a55..37139b15 100644 --- a/core.js +++ b/core.js @@ -24,6 +24,7 @@ const { jitIndexesPath, resetLevelPath, resetPrivatePath, + reindexEncryptedInProgressPath, } = require('./defaults') const { onceWhen, ReadyGate, onceWhenPromise } = require('./utils') const ThrottleBatchAdd = require('./throttle-batch') @@ -45,10 +46,10 @@ const { deferred, asOffsets, isEncrypted, - toCallback, batch, toPullStream, } = operators +const isBrowser = typeof window !== 'undefined' exports.name = 'db' @@ -111,6 +112,8 @@ exports.init = function (sbot, config) { const stateFeedsReady = Obv().set(false) const secretStackLoaded = new ReadyGate() const indexesStateLoaded = new ReadyGate() + const reindexingLock = mutexify() + const reindexedValues = Notify() sbot.close.hook(function (fn, args) { close((err) => { @@ -136,6 +139,11 @@ exports.init = function (sbot, config) { stateLoadedPromises.push(indexes[indexName].stateLoaded) } Promise.all(stateLoadedPromises).then(() => { + if (!isBrowser && fs.existsSync(reindexEncryptedInProgressPath(dir))) { + reindexEncrypted(() => { + debug('done reindexing encrypted from a previous session') + }) + } indexesStateLoaded.setReady() }) }) @@ -987,10 +995,10 @@ exports.init = function (sbot, config) { }) } - const reindexingLock = mutexify() - const reindexedValues = Notify() - function reindexEncrypted(cb) { + if (!isBrowser) { + fs.closeSync(fs.openSync(reindexEncryptedInProgressPath(dir), 'w')) + } indexingActive.set(indexingActive.value + 1) reindexingLock((unlock) => { pull( @@ -1042,6 +1050,9 @@ exports.init = function (sbot, config) { // prettier-ignore if (err) return unlock(cb, new Error('reindexEncrypted() failed to force-flush indexes', {cause: err})) indexingActive.set(indexingActive.value - 1) + if (!isBrowser) { + rimraf.sync(reindexEncryptedInProgressPath(dir)) + } unlock(cb) }) }) diff --git a/defaults.js b/defaults.js index c91ea385..4a133d7f 100644 --- a/defaults.js +++ b/defaults.js @@ -13,6 +13,8 @@ exports.resetLevelPath = (dir) => path.join(dir, 'db2', 'post-compact-reset-level') exports.resetPrivatePath = (dir) => path.join(dir, 'db2', 'post-compact-reset-private') +exports.reindexEncryptedInProgressPath = (dir) => + path.join(dir, 'db2', 'reindex-encrypted-wip') exports.jitIndexesPath = (dir) => path.join(dir, 'db2', 'jit') exports.tooHotOpts = (config) => config.db2 diff --git a/test/reindex-encrypted.js b/test/reindex-encrypted.js index f825188f..b20be19c 100644 --- a/test/reindex-encrypted.js +++ b/test/reindex-encrypted.js @@ -5,6 +5,8 @@ const test = require('tape') const ssbKeys = require('ssb-keys') const pify = require('util').promisify +const pull = require('pull-stream') +const push = require('push-stream') const rimraf = require('rimraf') const mkdirp = require('mkdirp') const SecretStack = require('secret-stack') @@ -173,3 +175,171 @@ test('box2 group reindex larger', async (t) => { await Promise.all([pify(alice.close)(true), pify(bob.close)(true)]) t.end() }) + +test.only('reindexEncrypted is crash resistant', async (t) => { + // Create group keys + const groupKey1 = Buffer.from( + '30720d8f9cbf37f6d7062826f6decac93e308060a8aaaa77e6a4747f40ee1a76', + 'hex' + ) + const groupId1 = '%Aihvp+fMdt5CihjbOY6eZc0qCe0eKsrN2wfgXV2E3PM=.cloaked' + + // Setup Alice + const dirAlice = '/tmp/ssb-db2-box2-group-reindex2-alice' + rimraf.sync(dirAlice) + mkdirp.sync(dirAlice) + const keysAlice = ssbKeys.generate(null, 'alice') + const alice = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .call(null, { + keys: keysAlice, + path: dirAlice, + }) + alice.box2.addGroupInfo(groupId1, { key: groupKey1 }) + + // Setup Bob + const dirBob = '/tmp/ssb-db2-box2-group-reindex2-bob' + rimraf.sync(dirBob) + mkdirp.sync(dirBob) + const keysBob = ssbKeys.generate(null, 'bob') + let bob = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .use(require('../full-mentions')) + .call(null, { + keys: keysBob, + path: dirBob, + }) + + // Alice publishes 5 messages, some of them box2 + let opts1 = { + keys: keysAlice, + content: { type: 'about', text: 'not super secret1' }, + } + let opts2 = { + keys: keysAlice, + content: { + type: 'post', + text: 'super secret2', + mentions: [{ link: bob.id }], + }, + recps: [groupId1], + encryptionFormat: 'box2', + } + let opts3 = { + content: { type: 'weird' }, + keys: keysAlice, + } + let opts4 = { + content: { type: 'about', text: 'super secret4', recps: [groupId1] }, + keys: keysAlice, + encryptionFormat: 'box2', + } + let opts5 = { + content: { + type: 'post', + text: 'super secret5', + mentions: [{ link: bob.id }], + recps: [groupId1], + }, + keys: keysAlice, + encryptionFormat: 'box2', + } + + const msg1 = await pify(alice.db.create)(opts1) + const msg2 = await pify(alice.db.create)(opts2) + const msg3 = await pify(alice.db.create)(opts3) + const msg4 = await pify(alice.db.create)(opts4) + const msg5 = await pify(alice.db.create)(opts5) + t.pass('alice published 5 messages') + + t.notEqual(typeof msg1.value.content, 'string', 'msg1 is public about') + t.true(msg2.value.content.endsWith('.box2'), 'msg2 is group1-box2 post') + t.notEqual(typeof msg3.value.content, 'string', 'msg3 is public weird') + t.true(msg4.value.content.endsWith('.box2'), 'msg4 is group1-box2 about') + t.true(msg5.value.content.endsWith('.box2'), 'msg5 is group1-box2 post') + + // First, Bob gets 2 messages and indexes those + await pify(bob.db.add)(msg1.value) + await pify(bob.db.add)(msg2.value) + await pify(bob.db.add)(msg3.value) + await pify(bob.db.add)(msg4.value) + await pify(bob.db.add)(msg5.value) + t.pass('bob added all 5 messages') + + // Get offsets of those messages + const bobLog = bob.db.getLog() + const offsets = await new Promise((resolve) => { + bobLog.stream({ offsets: true, values: false }).pipe( + push.collect((err, ary) => { + t.error(err, 'no error when streaming bobLog') + resolve(ary) + }) + ) + }) + + const results1 = await bob.db.query( + where(and(author(alice.id), fullMentions(bob.id))), + toPromise() + ) + t.equal(results1.length, 0, 'bob has no box2 mentions from alice') + + // Bob joins group 1 and is able to decrypt some messages + bob.box2.addGroupInfo(groupId1, { key: groupKey1 }) + t.pass('bob joined group 1') + + // Wait for private indexes to be saved to disk + await pify(setTimeout)(2000) + + // Hack log.get to make it crash on the 2nd box2 msg + const originalGet = bobLog.get + bobLog.get = (offset, cb) => { + if (offset === offsets[3]) { + // offsets[3] is the 4th msg + return cb(new Error('simulated crash')) + } else { + originalGet.call(bobLog, offset, cb) + } + } + + try { + await pify(bob.db.reindexEncrypted)() + t.fail('reindexEncrypted should have thrown') + } catch (err) { + t.true(err.cause.message.includes('simulated crash'), 'simulated crash') + } + + await Promise.all([pify(alice.close)(true), pify(bob.close)(true)]) + + bob = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .use(require('../full-mentions')) + .call(null, { + keys: keysBob, + path: dirBob, + }) + + const streamed = [] + pull( + bob.db.reindexed(), + pull.drain((msg) => { + streamed.push(msg.value.content.text) + }) + ) + + await pify(setTimeout)(4000) + + const results2 = await bob.db.query( + where(and(author(alice.id), fullMentions(bob.id))), + toPromise() + ) + t.equal(results2.length, 2, 'bob has two box2 mentions from alice') + + t.deepEquals( + streamed, + ['super secret2', 'super secret4', 'super secret5'], + 'reindexed stream correct' + ) + + await pify(bob.close)(true) + t.end() +}) From 7813a253f8c8112eef71e794a9d99f8ac61255c1 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Tue, 31 Jan 2023 10:47:35 +0200 Subject: [PATCH 2/4] run all the tests, oops --- test/reindex-encrypted.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/reindex-encrypted.js b/test/reindex-encrypted.js index b20be19c..502fc403 100644 --- a/test/reindex-encrypted.js +++ b/test/reindex-encrypted.js @@ -176,7 +176,7 @@ test('box2 group reindex larger', async (t) => { t.end() }) -test.only('reindexEncrypted is crash resistant', async (t) => { +test('reindexEncrypted is crash resistant', async (t) => { // Create group keys const groupKey1 = Buffer.from( '30720d8f9cbf37f6d7062826f6decac93e308060a8aaaa77e6a4747f40ee1a76', From d52ee4300246c65b0c45928d21f66bc17153d416 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Tue, 31 Jan 2023 10:56:10 +0200 Subject: [PATCH 3/4] polish test description and comments --- test/reindex-encrypted.js | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/test/reindex-encrypted.js b/test/reindex-encrypted.js index 502fc403..25428b0f 100644 --- a/test/reindex-encrypted.js +++ b/test/reindex-encrypted.js @@ -178,14 +178,14 @@ test('box2 group reindex larger', async (t) => { test('reindexEncrypted is crash resistant', async (t) => { // Create group keys - const groupKey1 = Buffer.from( + const groupKey = Buffer.from( '30720d8f9cbf37f6d7062826f6decac93e308060a8aaaa77e6a4747f40ee1a76', 'hex' ) - const groupId1 = '%Aihvp+fMdt5CihjbOY6eZc0qCe0eKsrN2wfgXV2E3PM=.cloaked' + const groupId = '%Aihvp+fMdt5CihjbOY6eZc0qCe0eKsrN2wfgXV2E3PM=.cloaked' // Setup Alice - const dirAlice = '/tmp/ssb-db2-box2-group-reindex2-alice' + const dirAlice = '/tmp/ssb-db2-box2-group-reindex3-alice' rimraf.sync(dirAlice) mkdirp.sync(dirAlice) const keysAlice = ssbKeys.generate(null, 'alice') @@ -195,10 +195,10 @@ test('reindexEncrypted is crash resistant', async (t) => { keys: keysAlice, path: dirAlice, }) - alice.box2.addGroupInfo(groupId1, { key: groupKey1 }) + alice.box2.addGroupInfo(groupId, { key: groupKey }) // Setup Bob - const dirBob = '/tmp/ssb-db2-box2-group-reindex2-bob' + const dirBob = '/tmp/ssb-db2-box2-group-reindex3-bob' rimraf.sync(dirBob) mkdirp.sync(dirBob) const keysBob = ssbKeys.generate(null, 'bob') @@ -222,7 +222,7 @@ test('reindexEncrypted is crash resistant', async (t) => { text: 'super secret2', mentions: [{ link: bob.id }], }, - recps: [groupId1], + recps: [groupId], encryptionFormat: 'box2', } let opts3 = { @@ -230,7 +230,7 @@ test('reindexEncrypted is crash resistant', async (t) => { keys: keysAlice, } let opts4 = { - content: { type: 'about', text: 'super secret4', recps: [groupId1] }, + content: { type: 'about', text: 'super secret4', recps: [groupId] }, keys: keysAlice, encryptionFormat: 'box2', } @@ -239,7 +239,7 @@ test('reindexEncrypted is crash resistant', async (t) => { type: 'post', text: 'super secret5', mentions: [{ link: bob.id }], - recps: [groupId1], + recps: [groupId], }, keys: keysAlice, encryptionFormat: 'box2', @@ -253,12 +253,11 @@ test('reindexEncrypted is crash resistant', async (t) => { t.pass('alice published 5 messages') t.notEqual(typeof msg1.value.content, 'string', 'msg1 is public about') - t.true(msg2.value.content.endsWith('.box2'), 'msg2 is group1-box2 post') + t.true(msg2.value.content.endsWith('.box2'), 'msg2 is group box2 post') t.notEqual(typeof msg3.value.content, 'string', 'msg3 is public weird') - t.true(msg4.value.content.endsWith('.box2'), 'msg4 is group1-box2 about') - t.true(msg5.value.content.endsWith('.box2'), 'msg5 is group1-box2 post') + t.true(msg4.value.content.endsWith('.box2'), 'msg4 is group box2 about') + t.true(msg5.value.content.endsWith('.box2'), 'msg5 is group box2 post') - // First, Bob gets 2 messages and indexes those await pify(bob.db.add)(msg1.value) await pify(bob.db.add)(msg2.value) await pify(bob.db.add)(msg3.value) @@ -283,9 +282,9 @@ test('reindexEncrypted is crash resistant', async (t) => { ) t.equal(results1.length, 0, 'bob has no box2 mentions from alice') - // Bob joins group 1 and is able to decrypt some messages - bob.box2.addGroupInfo(groupId1, { key: groupKey1 }) - t.pass('bob joined group 1') + // Bob joins group and is able to decrypt some messages + bob.box2.addGroupInfo(groupId, { key: groupKey }) + t.pass('bob joined the group') // Wait for private indexes to be saved to disk await pify(setTimeout)(2000) From 6751a61e19c0a7c879d6f60608a1114557e4b729 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Tue, 31 Jan 2023 11:00:00 +0200 Subject: [PATCH 4/4] try to decrease test flakiness --- test/reindex-encrypted.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/reindex-encrypted.js b/test/reindex-encrypted.js index 25428b0f..79dfff4b 100644 --- a/test/reindex-encrypted.js +++ b/test/reindex-encrypted.js @@ -287,7 +287,7 @@ test('reindexEncrypted is crash resistant', async (t) => { t.pass('bob joined the group') // Wait for private indexes to be saved to disk - await pify(setTimeout)(2000) + await pify(setTimeout)(4000) // Hack log.get to make it crash on the 2nd box2 msg const originalGet = bobLog.get @@ -325,7 +325,7 @@ test('reindexEncrypted is crash resistant', async (t) => { }) ) - await pify(setTimeout)(4000) + await pify(setTimeout)(5000) const results2 = await bob.db.query( where(and(author(alice.id), fullMentions(bob.id))),