Skip to content

Commit

Permalink
Merge pull request #208 from ssb-ngi-pointer/fix207
Browse files Browse the repository at this point in the history
recalculate ssb-validate state when migrate ends
  • Loading branch information
staltz authored Mar 4, 2021
2 parents d62d5ef + f66d6c6 commit dc0b03a
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 62 deletions.
25 changes: 17 additions & 8 deletions compat/ebt.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const EBTIndex = require('../indexes/ebt')
const { onceWhen } = require('../utils')

exports.init = function (sbot, config) {
sbot.db.registerIndex(EBTIndex)
Expand All @@ -7,15 +8,23 @@ exports.init = function (sbot, config) {
sbot.getAtSequence = ebtIndex.getMessageFromAuthorSequence.bind(ebtIndex)
sbot.add = sbot.db.add
sbot.getVectorClock = function (cb) {
sbot.db.getAllLatest((err, last) => {
if (err) return cb(err)
onceWhen(
sbot.db2migrate && sbot.db2migrate.synchronized,
(isSynced) => isSynced,
() => {
sbot.db.onDrain('base', () => {
sbot.db.getAllLatest((err, last) => {
if (err) return cb(err)

const clock = {}
for (const k in last) {
clock[k] = last[k].sequence
}
const clock = {}
for (const k in last) {
clock[k] = last[k].sequence
}

cb(null, clock)
})
cb(null, clock)
})
})
}
)
}
}
128 changes: 76 additions & 52 deletions db.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ const jitdbOperators = require('jitdb/operators')
const operators = require('./operators')
const JITDb = require('jitdb')
const Debug = require('debug')
const DeferredPromise = require('p-defer')

const { indexesPath } = require('./defaults')
const { onceWhen } = require('./utils')
const Log = require('./log')
const Status = require('./status')
const makeBaseIndex = require('./indexes/base')
Expand Down Expand Up @@ -57,7 +57,7 @@ exports.init = function (sbot, config) {
const debug = Debug('ssb:db2')
const post = Obv()
const hmac_key = null
const stateFeedsReady = DeferredPromise()
const stateFeedsReady = Obv().set(false)
let state = validate.initial()

sbot.close.hook(function (fn, args) {
Expand All @@ -69,21 +69,32 @@ exports.init = function (sbot, config) {
registerIndex(makeBaseIndex(privateIndex))
registerIndex(KeysIndex)

// restore current state
indexes.base.getAllLatest((err, last) => {
// copy to so we avoid weirdness, because this object
// tracks the state coming in to the database.
for (const k in last) {
state.feeds[k] = {
id: last[k].id,
timestamp: last[k].timestamp,
sequence: last[k].sequence,
queue: [],
}
}
debug('getAllLatest is done setting up initial validate state')
stateFeedsReady.resolve()
})
loadStateFeeds()

function setStateFeedsReady(x) {
stateFeedsReady.set(x)
}

function loadStateFeeds(cb) {
// restore current state
onDrain('base', () => {
indexes.base.getAllLatest((err, last) => {
// copy to so we avoid weirdness, because this object
// tracks the state coming in to the database.
for (const k in last) {
state.feeds[k] = {
id: last[k].id,
timestamp: last[k].timestamp,
sequence: last[k].sequence,
queue: [],
}
}
debug('getAllLatest is done setting up initial validate state')
if (!stateFeedsReady.value) stateFeedsReady.set(true)
if (cb) cb()
})
})
}

// Crunch stats numbers to produce one number for the "indexing" progress
status.obv((stats) => {
Expand Down Expand Up @@ -148,15 +159,19 @@ exports.init = function (sbot, config) {
const guard = guardAgainstDuplicateLogs('add()')
if (guard) return cb(guard)

stateFeedsReady.promise.then(() => {
try {
state = validate.append(state, hmac_key, msg)
if (state.error) return cb(state.error)
rawAdd(msg, true, cb)
} catch (ex) {
return cb(ex)
onceWhen(
stateFeedsReady,
(ready) => ready === true,
() => {
try {
state = validate.append(state, hmac_key, msg)
if (state.error) return cb(state.error)
rawAdd(msg, true, cb)
} catch (ex) {
return cb(ex)
}
}
})
)
}

function addOOO(msg, cb) {
Expand Down Expand Up @@ -198,14 +213,18 @@ exports.init = function (sbot, config) {
const guard = guardAgainstDuplicateLogs('publish()')
if (guard) return cb(guard)

stateFeedsReady.promise.then(() => {
state.queue = []
state = validate.appendNew(state, null, config.keys, msg, Date.now())
rawAdd(state.queue[0].value, true, (err, data) => {
post.set(data)
cb(err, data)
})
})
onceWhen(
stateFeedsReady,
(ready) => ready === true,
() => {
state.queue = []
state = validate.appendNew(state, null, config.keys, msg, Date.now())
rawAdd(state.queue[0].value, true, (err, data) => {
post.set(data)
cb(err, data)
})
}
)
}

function del(msgId, cb) {
Expand Down Expand Up @@ -301,25 +320,28 @@ exports.init = function (sbot, config) {
indexName = 'base'
}

onIndexesStateLoaded(() => {
log.onDrain(() => {
const index = indexes[indexName]
if (!index) return cb('Unknown index:' + indexName)

status.updateLog()

if (index.offset.value === log.since.value) {
status.updateIndex(indexName, index.offset.value)
cb()
} else {
const remove = index.offset(() => {
if (index.offset.value === log.since.value) {
remove()
status.updateIndex(indexName, index.offset.value)
cb()
}
})
}
// setTimeout to make sure extra indexes from secret-stack are also included
setTimeout(() => {
onIndexesStateLoaded(() => {
log.onDrain(() => {
const index = indexes[indexName]
if (!index) return cb('Unknown index:' + indexName)

status.updateLog()

if (index.offset.value === log.since.value) {
status.updateIndex(indexName, index.offset.value)
cb()
} else {
const remove = index.offset(() => {
if (index.offset.value === log.since.value) {
remove()
status.updateIndex(indexName, index.offset.value)
cb()
}
})
}
})
})
})
}
Expand Down Expand Up @@ -400,6 +422,8 @@ exports.init = function (sbot, config) {
getAllLatest: indexes.base.getAllLatest.bind(indexes.base),
getLog: () => log,
registerIndex,
setStateFeedsReady,
loadStateFeeds,
getIndexes: () => indexes,
getIndex: (index) => indexes[index],
clearIndexes,
Expand Down
19 changes: 17 additions & 2 deletions migrate.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ exports.init = function init(sbot, config) {
started = true
debug('started')

if (sbot.db) sbot.db.setStateFeedsReady('migrating')
synchronized.set(false)

const oldLog = getOldLog(sbot, config)
Expand Down Expand Up @@ -313,10 +314,24 @@ exports.init = function init(sbot, config) {

if (config.db2.dangerouslyKillFlumeWhenMigrated) {
rimraf(flumePath(config.path), (err) => {
if (!err) oldLogExists.set(false)
doneMigrating()
if (err) return console.error(err)
if (sbot.db) {
sbot.db.loadStateFeeds(() => {
sbot.db.setStateFeedsReady(true)
oldLogExists.set(false)
doneMigrating()
})
} else {
oldLogExists.set(false)
doneMigrating()
}
})
} else {
if (sbot.db) {
sbot.db.loadStateFeeds(() => {
sbot.db.setStateFeedsReady(true)
})
}
doneMigrating()
migrateLive()
}
Expand Down
27 changes: 27 additions & 0 deletions utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Obv utility to run the `cb` once, as soon as the condition given by
* `filter` is true.
*/
function onceWhen(obv, filter, cb) {
if (!obv) return cb()
let answered = false
let remove
remove = obv((x) => {
if (!filter(x)) return
if (answered) {
if (remove) {
remove()
remove = null
}
} else {
answered = true
if (remove) {
remove()
remove = null
}
cb()
}
})
}

module.exports = { onceWhen }

0 comments on commit dc0b03a

Please sign in to comment.