Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reindexEncrypted() should auto-resume after a crash #419

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions core.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const {
jitIndexesPath,
resetLevelPath,
resetPrivatePath,
reindexEncryptedInProgressPath,
} = require('./defaults')
const { onceWhen, ReadyGate, onceWhenPromise } = require('./utils')
const ThrottleBatchAdd = require('./throttle-batch')
Expand All @@ -45,10 +46,10 @@ const {
deferred,
asOffsets,
isEncrypted,
toCallback,
batch,
toPullStream,
} = operators
const isBrowser = typeof window !== 'undefined'

exports.name = 'db'

Expand Down Expand Up @@ -111,6 +112,8 @@ exports.init = function (sbot, config) {
const stateFeedsReady = Obv().set(false)
const secretStackLoaded = new ReadyGate()
const indexesStateLoaded = new ReadyGate()
const reindexingLock = mutexify()
const reindexedValues = Notify()

sbot.close.hook(function (fn, args) {
close((err) => {
Expand All @@ -136,6 +139,11 @@ exports.init = function (sbot, config) {
stateLoadedPromises.push(indexes[indexName].stateLoaded)
}
Promise.all(stateLoadedPromises).then(() => {
if (!isBrowser && fs.existsSync(reindexEncryptedInProgressPath(dir))) {
reindexEncrypted(() => {
debug('done reindexing encrypted from a previous session')
})
}
indexesStateLoaded.setReady()
})
})
Expand Down Expand Up @@ -987,10 +995,10 @@ exports.init = function (sbot, config) {
})
}

const reindexingLock = mutexify()
const reindexedValues = Notify()

function reindexEncrypted(cb) {
if (!isBrowser) {
fs.closeSync(fs.openSync(reindexEncryptedInProgressPath(dir), 'w'))
}
indexingActive.set(indexingActive.value + 1)
reindexingLock((unlock) => {
pull(
Expand Down Expand Up @@ -1042,6 +1050,9 @@ exports.init = function (sbot, config) {
// prettier-ignore
if (err) return unlock(cb, new Error('reindexEncrypted() failed to force-flush indexes', {cause: err}))
indexingActive.set(indexingActive.value - 1)
if (!isBrowser) {
rimraf.sync(reindexEncryptedInProgressPath(dir))
}
unlock(cb)
})
})
Expand Down
2 changes: 2 additions & 0 deletions defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ exports.resetLevelPath = (dir) =>
path.join(dir, 'db2', 'post-compact-reset-level')
exports.resetPrivatePath = (dir) =>
path.join(dir, 'db2', 'post-compact-reset-private')
exports.reindexEncryptedInProgressPath = (dir) =>
path.join(dir, 'db2', 'reindex-encrypted-wip')
exports.jitIndexesPath = (dir) => path.join(dir, 'db2', 'jit')
exports.tooHotOpts = (config) =>
config.db2
Expand Down
169 changes: 169 additions & 0 deletions test/reindex-encrypted.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
const test = require('tape')
const ssbKeys = require('ssb-keys')
const pify = require('util').promisify
const pull = require('pull-stream')
const push = require('push-stream')
const rimraf = require('rimraf')
const mkdirp = require('mkdirp')
const SecretStack = require('secret-stack')
Expand Down Expand Up @@ -173,3 +175,170 @@ test('box2 group reindex larger', async (t) => {
await Promise.all([pify(alice.close)(true), pify(bob.close)(true)])
t.end()
})

test('reindexEncrypted is crash resistant', async (t) => {
// Create group keys
const groupKey = Buffer.from(
'30720d8f9cbf37f6d7062826f6decac93e308060a8aaaa77e6a4747f40ee1a76',
'hex'
)
const groupId = '%Aihvp+fMdt5CihjbOY6eZc0qCe0eKsrN2wfgXV2E3PM=.cloaked'

// Setup Alice
const dirAlice = '/tmp/ssb-db2-box2-group-reindex3-alice'
rimraf.sync(dirAlice)
mkdirp.sync(dirAlice)
const keysAlice = ssbKeys.generate(null, 'alice')
const alice = SecretStack({ appKey: caps.shs })
.use(require('../'))
.call(null, {
keys: keysAlice,
path: dirAlice,
})
alice.box2.addGroupInfo(groupId, { key: groupKey })

// Setup Bob
const dirBob = '/tmp/ssb-db2-box2-group-reindex3-bob'
rimraf.sync(dirBob)
mkdirp.sync(dirBob)
const keysBob = ssbKeys.generate(null, 'bob')
let bob = SecretStack({ appKey: caps.shs })
.use(require('../'))
.use(require('../full-mentions'))
.call(null, {
keys: keysBob,
path: dirBob,
})

// Alice publishes 5 messages, some of them box2
let opts1 = {
keys: keysAlice,
content: { type: 'about', text: 'not super secret1' },
}
let opts2 = {
keys: keysAlice,
content: {
type: 'post',
text: 'super secret2',
mentions: [{ link: bob.id }],
},
recps: [groupId],
encryptionFormat: 'box2',
}
let opts3 = {
content: { type: 'weird' },
keys: keysAlice,
}
let opts4 = {
content: { type: 'about', text: 'super secret4', recps: [groupId] },
keys: keysAlice,
encryptionFormat: 'box2',
}
let opts5 = {
content: {
type: 'post',
text: 'super secret5',
mentions: [{ link: bob.id }],
recps: [groupId],
},
keys: keysAlice,
encryptionFormat: 'box2',
}

const msg1 = await pify(alice.db.create)(opts1)
const msg2 = await pify(alice.db.create)(opts2)
const msg3 = await pify(alice.db.create)(opts3)
const msg4 = await pify(alice.db.create)(opts4)
const msg5 = await pify(alice.db.create)(opts5)
t.pass('alice published 5 messages')

t.notEqual(typeof msg1.value.content, 'string', 'msg1 is public about')
t.true(msg2.value.content.endsWith('.box2'), 'msg2 is group box2 post')
t.notEqual(typeof msg3.value.content, 'string', 'msg3 is public weird')
t.true(msg4.value.content.endsWith('.box2'), 'msg4 is group box2 about')
t.true(msg5.value.content.endsWith('.box2'), 'msg5 is group box2 post')

await pify(bob.db.add)(msg1.value)
await pify(bob.db.add)(msg2.value)
await pify(bob.db.add)(msg3.value)
await pify(bob.db.add)(msg4.value)
await pify(bob.db.add)(msg5.value)
t.pass('bob added all 5 messages')

// Get offsets of those messages
const bobLog = bob.db.getLog()
const offsets = await new Promise((resolve) => {
bobLog.stream({ offsets: true, values: false }).pipe(
push.collect((err, ary) => {
t.error(err, 'no error when streaming bobLog')
resolve(ary)
})
)
})

const results1 = await bob.db.query(
where(and(author(alice.id), fullMentions(bob.id))),
toPromise()
)
t.equal(results1.length, 0, 'bob has no box2 mentions from alice')

// Bob joins group and is able to decrypt some messages
bob.box2.addGroupInfo(groupId, { key: groupKey })
t.pass('bob joined the group')

// Wait for private indexes to be saved to disk
await pify(setTimeout)(4000)

// Hack log.get to make it crash on the 2nd box2 msg
const originalGet = bobLog.get
bobLog.get = (offset, cb) => {
if (offset === offsets[3]) {
// offsets[3] is the 4th msg
return cb(new Error('simulated crash'))
} else {
originalGet.call(bobLog, offset, cb)
}
}

try {
await pify(bob.db.reindexEncrypted)()
t.fail('reindexEncrypted should have thrown')
} catch (err) {
t.true(err.cause.message.includes('simulated crash'), 'simulated crash')
}

await Promise.all([pify(alice.close)(true), pify(bob.close)(true)])

bob = SecretStack({ appKey: caps.shs })
.use(require('../'))
.use(require('../full-mentions'))
.call(null, {
keys: keysBob,
path: dirBob,
})

const streamed = []
pull(
bob.db.reindexed(),
pull.drain((msg) => {
streamed.push(msg.value.content.text)
})
)

await pify(setTimeout)(5000)

const results2 = await bob.db.query(
where(and(author(alice.id), fullMentions(bob.id))),
toPromise()
)
t.equal(results2.length, 2, 'bob has two box2 mentions from alice')

t.deepEquals(
streamed,
['super secret2', 'super secret4', 'super secret5'],
'reindexed stream correct'
)

await pify(bob.close)(true)
t.end()
})