Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use bipf seekKeyCached() #325

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compat/feedstate.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
const { onceWhen } = require('../utils')

exports.init = function (sbot, config) {
sbot.getFeedState = function(feedId, cb) {
sbot.getFeedState = function (feedId, cb) {
onceWhen(
sbot.db.stateFeedsReady,
(ready) => ready === true,
Expand All @@ -17,7 +17,7 @@ exports.init = function (sbot, config) {

return cb(null, {
id: feedState.key,
sequence: feedState.value.sequence
sequence: feedState.value.sequence,
})
}
)
Expand Down
22 changes: 16 additions & 6 deletions compat/history-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,25 @@ exports.init = function (sbot, config) {
if (limit) {
sbot.db
.getJITDB()
.paginate(query, 0, limit, false, false, 'declared', (err, answer) => {
if (err) cb(new Error('ssb-db2 createHistoryStream failed: ' + err.message)) // prettier-ignore
.paginate(
query,
0,
limit,
false,
false,
'declared',
(err, answer) => {
if (err) cb(new Error('ssb-db2 createHistoryStream failed: ' + err.message)) // prettier-ignore
else cb(null, pull.values(answer.results.map(formatMsg)))
})
}
)
} else {
sbot.db.getJITDB().all(query, 0, false, false, 'declared', (err, results) => {
if (err) cb(new Error('ssb-db2 createHistoryStream failed: ' + err.message)) // prettier-ignore
sbot.db
.getJITDB()
.all(query, 0, false, false, 'declared', (err, results) => {
if (err) cb(new Error('ssb-db2 createHistoryStream failed: ' + err.message)) // prettier-ignore
else cb(null, pull.values(results.map(formatMsg)))
})
})
}
})
})
Expand Down
10 changes: 2 additions & 8 deletions db.js
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,10 @@ exports.init = function (sbot, config) {
)
debug(`lowest offset for all indexes is ${lowestOffset}`)

const B_VALUE = Buffer.from('value')

log.stream({ gt: lowestOffset }).pipe({
paused: false,
write(record) {
const buf = record.value
const pValue = buf ? bipf.seekKey(buf, 0, B_VALUE) : null
indexesArr.forEach((idx) => idx.onRecord(record, false, pValue))
indexesArr.forEach((idx) => idx.onRecord(record, false))
},
end() {
debug(`updateIndexes() scan time: ${Date.now() - start}ms`)
Expand All @@ -509,9 +505,7 @@ exports.init = function (sbot, config) {
log.stream({ gt: indexes['base'].offset.value, live: true }).pipe({
paused: false,
write(record) {
const buf = record.value
const pValue = buf ? bipf.seekKey(buf, 0, B_VALUE) : null
indexesArr.forEach((idx) => idx.onRecord(record, true, pValue))
indexesArr.forEach((idx) => idx.onRecord(record, true))
},
})
})
Expand Down
13 changes: 6 additions & 7 deletions indexes/about-self.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ const pl = require('pull-level')
const clarify = require('clarify-error')
const Plugin = require('./plugin')

const B_AUTHOR = Buffer.from('author')
const B_CONTENT = Buffer.from('content')
const B_TYPE = Buffer.from('type')
const B_ABOUT = Buffer.from('about')

// feedId => hydratedAboutObj
Expand Down Expand Up @@ -39,13 +36,15 @@ module.exports = class AboutSelf extends Plugin {
)
}

processRecord(record, seq, pValue) {
processRecord(record, seq) {
const buf = record.value

const pAuthor = bipf.seekKey(buf, pValue, B_AUTHOR)
const pContent = bipf.seekKey(buf, pValue, B_CONTENT)
const pValue = bipf.seekKeyCached(buf, 0, 'value')
if (pValue < 0) return
const pAuthor = bipf.seekKeyCached(buf, pValue, 'author')
const pContent = bipf.seekKeyCached(buf, pValue, 'content')
if (pContent < 0) return
const pType = bipf.seekKey(buf, pContent, B_TYPE)
const pType = bipf.seekKeyCached(buf, pContent, 'type')
if (pType < 0) return

if (bipf.compareString(buf, pType, B_ABOUT) === 0) {
Expand Down
15 changes: 9 additions & 6 deletions indexes/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ const pull = require('pull-stream')
const clarify = require('clarify-error')
const Plugin = require('./plugin')

const B_AUTHOR = Buffer.from('author')
const B_SEQUENCE = Buffer.from('sequence')

// authorId => latestMsg { offset, sequence }
//
// Necessary for feed validation and for EBT
Expand All @@ -37,10 +34,16 @@ module.exports = function makeBaseIndex(privateIndex) {
)
}

processRecord(record, seq, pValue) {
processRecord(record, seq) {
const buf = record.value
const author = bipf.decode(buf, bipf.seekKey(buf, pValue, B_AUTHOR))
const sequence = bipf.decode(buf, bipf.seekKey(buf, pValue, B_SEQUENCE))
const pValue = bipf.seekKeyCached(buf, 0, 'value')
if (pValue < 0) return
const pValueAuthor = bipf.seekKeyCached(buf, pValue, 'author')
if (pValueAuthor < 0) return
const author = bipf.decode(buf, pValueAuthor)
const pValueSequence = bipf.seekKeyCached(buf, pValue, 'sequence')
if (pValueSequence < 0) return
const sequence = bipf.decode(buf, pValueSequence)
const latestSequence = this.authorLatest.has(author)
? this.authorLatest.get(author).sequence
: 0
Expand Down
15 changes: 9 additions & 6 deletions indexes/ebt.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ const clarify = require('clarify-error')
const Plugin = require('./plugin')
const { reEncrypt } = require('./private')

const B_AUTHOR = Buffer.from('author')
const B_SEQUENCE = Buffer.from('sequence')

// [author, sequence] => offset
module.exports = class EBT extends Plugin {
constructor(log, dir) {
super(log, dir, 'ebt', 1, 'json')
}

processRecord(record, seq, pValue) {
processRecord(record, seq) {
const buf = record.value
const author = bipf.decode(buf, bipf.seekKey(buf, pValue, B_AUTHOR))
const sequence = bipf.decode(buf, bipf.seekKey(buf, pValue, B_SEQUENCE))
const pValue = bipf.seekKeyCached(buf, 0, 'value')
if (pValue < 0) return
const pValueAuthor = bipf.seekKeyCached(buf, pValue, 'author')
if (pValueAuthor < 0) return
const author = bipf.decode(buf, pValueAuthor)
const pValueSequence = bipf.seekKeyCached(buf, pValue, 'sequence')
if (pValueSequence < 0) return
const sequence = bipf.decode(buf, pValueSequence)
this.batch.push({
type: 'put',
key: [author, sequence],
Expand Down
25 changes: 11 additions & 14 deletions indexes/full-mentions.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ const clarify = require('clarify-error')
const Plugin = require('./plugin')
const { or, seqs, liveSeqs } = require('../operators')

const B_KEY = Buffer.from('key')
const B_CONTENT = Buffer.from('content')
const B_MENTIONS = Buffer.from('mentions')

function parseInt10(x) {
return parseInt(x, 10)
}
Expand All @@ -23,18 +19,19 @@ module.exports = class FullMentions extends Plugin {
super(log, dir, 'fullMentions', 1, 'json')
}

processRecord(record, seq, pValue) {
processRecord(record, seq) {
const buf = record.value
const pKey = bipf.seekKey(buf, 0, B_KEY)
let p = 0 // note you pass in p!
p = bipf.seekKey(buf, pValue, B_CONTENT)
if (p < 0) return
p = bipf.seekKey(buf, p, B_MENTIONS)
if (p < 0) return
const mentionsData = bipf.decode(buf, p)
const pKey = bipf.seekKeyCached(buf, 0, 'key')
const pValue = bipf.seekKeyCached(buf, 0, 'value')
if (pValue < 0) return
const pValueContent = bipf.seekKeyCached(buf, pValue, 'content')
if (pValueContent < 0) return
const pMentions = bipf.seekKeyCached(buf, pValueContent, 'mentions')
if (pMentions < 0) return
const mentionsData = bipf.decode(buf, pMentions)
if (!Array.isArray(mentionsData)) return
const shortKey = bipf.decode(buf, pKey).slice(1, 10)
mentionsData.forEach((mention) => {
for (const mention of mentionsData) {
if (
mention.link &&
typeof mention.link === 'string' &&
Expand All @@ -46,7 +43,7 @@ module.exports = class FullMentions extends Plugin {
value: seq,
})
}
})
}
}

indexesContent() {
Expand Down
6 changes: 2 additions & 4 deletions indexes/keys.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@ const clarify = require('clarify-error')
const Plugin = require('./plugin')
const { seqs } = require('../operators')

const B_KEY = Buffer.from('key')

// msgId => seq
module.exports = class Keys extends Plugin {
constructor(log, dir) {
super(log, dir, 'keys', 1)
}

processRecord(record, seq, pValue) {
processRecord(record, seq) {
const buf = record.value
const pKey = bipf.seekKey(buf, 0, B_KEY)
const pKey = bipf.seekKeyCached(buf, 0, 'key')
if (pKey < 0) return
const key = bipf.decode(buf, pKey)
this.batch.push({
Expand Down
13 changes: 9 additions & 4 deletions indexes/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ module.exports = class Plugin {
this.batch = []

this.flush = (cb) => {
if (processedOffset === this.offset.value || this.level.isClosed()) return cb()
if (processedOffset === this.offset.value || this.level.isClosed())
return cb()
if (!this.onFlush) this.onFlush = (cb2) => cb2()

const processedOffsetAtFlush = processedOffset
Expand All @@ -61,7 +62,11 @@ module.exports = class Plugin {
// 2nd, persist the META because it has its own valueEncoding
this.level.put(
META,
{ version, offset: processedOffsetAtFlush, processed: processedSeqAtFlush },
{
version,
offset: processedOffsetAtFlush,
processed: processedSeqAtFlush,
},
{ valueEncoding: 'json' },
(err3) => {
if (err3) cb(clarify(err3, 'failed to persist META when flushing')) // prettier-ignore
Expand All @@ -79,10 +84,10 @@ module.exports = class Plugin {

const liveFlush = debounce(this.flush, 250)

this.onRecord = function onRecord(record, isLive, pValue) {
this.onRecord = function onRecord(record, isLive) {
let changes = 0
if (record.offset > processedOffset) {
if (record.value && pValue >= 0) this.processRecord(record, processedSeq, pValue)
if (record.value) this.processRecord(record, processedSeq)
changes = this.batch.length
processedSeq++
processedOffset = record.offset
Expand Down
33 changes: 14 additions & 19 deletions indexes/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ module.exports = function (dir, sbot, config) {
let canDecrypt = []

const startDecryptBox1 = config.db2.startDecryptBox1
? new Date(config.db2.startDecryptBox1)
: null
? new Date(config.db2.startDecryptBox1)
: null

const debug = Debug('ssb:db2:private')

Expand Down Expand Up @@ -134,11 +134,7 @@ module.exports = function (dir, sbot, config) {
return { offset: record.offset, value: buf }
}

const B_VALUE = Buffer.from('value')
const B_CONTENT = Buffer.from('content')
const B_AUTHOR = Buffer.from('author')
const B_PREVIOUS = Buffer.from('previous')
const B_TIMESTAMP = Buffer.from('timestamp')

function decryptBox1(ciphertext, keys) {
return ssbKeys.unbox(ciphertext, keys)
Expand All @@ -149,7 +145,7 @@ module.exports = function (dir, sbot, config) {
if (ciphertext.endsWith('.box')) {
content = decryptBox1(ciphertext, config.keys)
} else if (sbot.box2 && ciphertext.endsWith('.box2')) {
const pAuthor = bipf.seekKey(recBuffer, pValue, B_AUTHOR)
const pAuthor = bipf.seekKeyCached(recBuffer, pValue, 'author')
if (pAuthor >= 0) {
const author = bipf.decode(recBuffer, pAuthor)
const pPrevious = bipf.seekKey(recBuffer, pValue, B_PREVIOUS)
Expand All @@ -168,12 +164,12 @@ module.exports = function (dir, sbot, config) {
if (!recBuffer) return record
let p = 0 // note you pass in p!
if (bsb.eq(canDecrypt, recOffset) !== -1) {
const pValue = bipf.seekKey(recBuffer, p, B_VALUE)
const pValue = bipf.seekKeyCached(recBuffer, p, 'value')
if (pValue < 0) return record
const pContent = bipf.seekKey(recBuffer, pValue, B_CONTENT)
if (pContent < 0) return record
const pValueContent = bipf.seekKeyCached(recBuffer, pValue, 'content')
if (pValueContent < 0) return record

const ciphertext = bipf.decode(recBuffer, pContent)
const ciphertext = bipf.decode(recBuffer, pValueContent)
const content = tryDecryptContent(ciphertext, recBuffer, pValue)
if (!content) return record

Expand All @@ -182,21 +178,20 @@ module.exports = function (dir, sbot, config) {
} else if (recOffset > latestOffset.value || !streaming) {
if (streaming) latestOffset.set(recOffset)

const pValue = bipf.seekKey(recBuffer, p, B_VALUE)
const pValue = bipf.seekKeyCached(recBuffer, p, 'value')
if (pValue < 0) return record
const pContent = bipf.seekKey(recBuffer, pValue, B_CONTENT)
if (pContent < 0) return record
const pValueContent = bipf.seekKeyCached(recBuffer, pValue, 'content')
if (pValueContent < 0) return record

const type = bipf.getEncodedType(recBuffer, pContent)
const type = bipf.getEncodedType(recBuffer, pValueContent)
if (type !== bipf.types.string) return record

const ciphertext = bipf.decode(recBuffer, pContent)
const ciphertext = bipf.decode(recBuffer, pValueContent)

if (ciphertext.endsWith('.box') && startDecryptBox1) {
const pTimestamp = bipf.seekKey(recBuffer, pValue, B_TIMESTAMP)
const pTimestamp = bipf.seekKeyCached(recBuffer, pValue, 'timestamp')
const declaredTimestamp = bipf.decode(recBuffer, pTimestamp)
if (declaredTimestamp < startDecryptBox1)
return record
if (declaredTimestamp < startDecryptBox1) return record
}
if (streaming && ciphertext.endsWith('.box2')) encrypted.push(recOffset)

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
"async-append-only-log": "^4.0.0",
"atomic-file-rw": "^0.2.1",
"binary-search-bounds": "^2.0.4",
"bipf": "^1.5.4",
"bipf": "^1.6.1",
"clarify-error": "^1.0.0",
"debug": "^4.3.1",
"fastintcompression": "0.0.4",
"flumecodec": "0.0.1",
"flumelog-offset": "3.4.4",
"hoox": "0.0.1",
"jitdb": "^6.4.0",
"jitdb": "ssb-ngi-pointer/jitdb#bipf-cached",
"level": "^6.0.1",
"level-codec": "^9.0.2",
"lodash.debounce": "^4.0.8",
Expand Down
Loading