Skip to content

Commit

Permalink
rocksdb: move merkle state onto session (#618)
Browse files Browse the repository at this point in the history
* move merkle tree state onto the session

* add setRoots helper

* create merkle tree batches independently

* seek still needs session

* remove dead code

* flake fixed
  • Loading branch information
chm-diederichs authored Jan 9, 2025
1 parent 6b81e71 commit 4cffb5b
Show file tree
Hide file tree
Showing 14 changed files with 390 additions and 334 deletions.
39 changes: 20 additions & 19 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class Hypercore extends EventEmitter {
this.state = await parent.state.createSession(opts.name, checkout, !!opts.overwrite, this.draft)
if (state) state.unref() // ref'ed above in setup session

if (checkout !== -1 && checkout < this.state.tree.length) {
if (checkout !== -1 && checkout < this.state.length) {
await this.state.truncate(checkout, this.fork)
}
} else if (this.state === null) {
Expand Down Expand Up @@ -386,9 +386,9 @@ class Hypercore extends EventEmitter {

_getSnapshot () {
return {
length: this.state.tree.length,
byteLength: this.state.tree.byteLength,
fork: this.state.tree.fork
length: this.state.length,
byteLength: this.state.byteLength,
fork: this.state.fork
}
}

Expand Down Expand Up @@ -502,15 +502,15 @@ class Hypercore extends EventEmitter {

get length () {
if (this._snapshot) return this._snapshot.length
return this.opened === false ? 0 : this.state.tree.length
return this.opened === false ? 0 : this.state.length
}

get signedLength () {
if (this.opened === false) return 0
if (this.state === this.core.state) return this.core.tree.length
if (this.state === this.core.state) return this.core.state.length
const flushed = this.state.flushedLength()

return flushed === -1 ? this.state.tree.length : flushed
return flushed === -1 ? this.state.length : flushed
}

/**
Expand All @@ -519,12 +519,12 @@ class Hypercore extends EventEmitter {
get byteLength () {
if (this.opened === false) return 0
if (this._snapshot) return this._snapshot.byteLength
return this.state.tree.byteLength - (this.state.tree.length * this.padding)
return this.state.byteLength - (this.state.length * this.padding)
}

get contiguousLength () {
if (this.opened === false) return 0
return Math.min(this.core.tree.length, this.core.header.hints.contiguousLength)
return Math.min(this.core.state.length, this.core.header.hints.contiguousLength)
}

get contiguousByteLength () {
Expand All @@ -533,7 +533,7 @@ class Hypercore extends EventEmitter {

get fork () {
if (this.opened === false) return 0
return this.state.tree.fork
return this.state.fork
}

get peers () {
Expand Down Expand Up @@ -592,7 +592,7 @@ class Hypercore extends EventEmitter {
}

createTreeBatch () {
return this.state.tree.batch()
return this.state.createTreeBatch()
}

findingPeers () {
Expand Down Expand Up @@ -652,8 +652,8 @@ class Hypercore extends EventEmitter {
if (this.opened === false) await this.opening
if (!isValidIndex(bytes)) throw ASSERTION('seek is invalid')

const tree = (opts && opts.tree) || this.state.tree
const s = tree.seek(bytes, this.padding)
const tree = (opts && opts.tree) || this.state.core.tree
const s = tree.seek(this.state, bytes, this.padding)

const offset = await s.update()
if (offset) return offset
Expand Down Expand Up @@ -812,7 +812,8 @@ class Hypercore extends EventEmitter {

async restoreBatch (length, blocks) {
if (this.opened === false) await this.opening
return this.state.tree.restoreBatch(length)
const batch = this.state.createTreeBatch()
return batch.restore(length)
}

_shouldWait (opts, defaultValue) {
Expand Down Expand Up @@ -853,14 +854,14 @@ class Hypercore extends EventEmitter {
if (this.opened === false) await this.opening

const {
fork = this.state.tree.fork + 1,
fork = this.state.fork + 1,
keyPair = this.keyPair,
signature = null
} = typeof opts === 'number' ? { fork: opts } : opts

const isDefault = this.state === this.core.state
const writable = !this._readonly && !!(signature || (keyPair && keyPair.secretKey))
if (isDefault && writable === false && (newLength > 0 || fork !== this.state.tree.fork)) throw SESSION_NOT_WRITABLE()
if (isDefault && writable === false && (newLength > 0 || fork !== this.state.fork)) throw SESSION_NOT_WRITABLE()

await this.state.truncate(newLength, fork, { keyPair, signature })

Expand Down Expand Up @@ -902,7 +903,7 @@ class Hypercore extends EventEmitter {
async treeHash (length) {
if (length === undefined) {
await this.ready()
length = this.state.tree.length
length = this.state.length
}

const roots = await this.state.tree.getRoots(length)
Expand Down Expand Up @@ -1007,8 +1008,8 @@ function toHex (buf) {
}

function preappend (blocks) {
const offset = this.state.tree.length
const fork = this.state.tree.fork
const offset = this.state.length
const fork = this.state.fork

if (this.encryption.compat !== this.core.compat) this._updateEncryption()

Expand Down
2 changes: 1 addition & 1 deletion lib/copy-prologue.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async function flushBatch (prologue, src, dst, batch) {

if (upgraded) {
const roots = nodes.slice(0, batch.roots.length)
dst.tree.setRoots(roots, null)
dst.state.setRoots(roots)
dst.header.tree = prologueToTree(prologue)
}

Expand Down
46 changes: 27 additions & 19 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const b4a = require('b4a')
const unslab = require('unslab')
const z32 = require('z32')
const Mutex = require('./mutex')
const MerkleTree = require('./merkle-tree')
const { MerkleTree, ReorgBatch } = require('./merkle-tree')
const BlockStore = require('./block-store')
const BitInterlude = require('./bit-interlude')
const Bitfield = require('./bitfield')
Expand Down Expand Up @@ -234,10 +234,17 @@ module.exports = class Core {

const prologue = header.manifest ? header.manifest.prologue : null

const tree = await MerkleTree.open(storage, header.tree.length, { crypto, prologue, ...header.tree })
const tree = await MerkleTree.open(storage)
const bitfield = await Bitfield.open(storage, header.tree.length)
const blocks = new BlockStore(storage)

const treeInfo = {
fork: header.tree.fork,
signature: header.tree.signature,
roots: header.tree.length ? await tree.getRoots(header.tree.length) : [],
prologue
}

if (overwrite) {
const writer = storage.createWriteBatch()
tree.clear(writer)
Expand Down Expand Up @@ -267,7 +274,7 @@ module.exports = class Core {
this.blocks = blocks
this.bitfield = bitfield
this.verifier = verifier
this.state = new SessionState(this, storage, this.blocks, tree, null)
this.state = new SessionState(this, storage, this.blocks, tree, treeInfo, null)

if (this.key === null) this.key = this.header.key
if (this.discoveryKey === null) this.discoveryKey = crypto.discoveryKey(this.key)
Expand Down Expand Up @@ -319,7 +326,7 @@ module.exports = class Core {

const verifier = new Verifier(this.header.key, manifest, { legacy: this._legacy })

if (verifier.prologue) this.tree.setPrologue(verifier.prologue)
if (verifier.prologue) this.state.prologue = Object.assign({}, verifier.prologue)

this.manifest = this.header.manifest = manifest

Expand Down Expand Up @@ -372,7 +379,7 @@ module.exports = class Core {
return this.state.flushed()
}

async commit (state, { signature, keyPair = this.header.keyPair, length = state.tree.length, treeLength = state.flushedLength(), overwrite = false, atom } = {}) {
async commit (state, { signature, keyPair = this.header.keyPair, length = state.length, treeLength = state.flushedLength(), overwrite = false, atom } = {}) {
if (atom) atom.enter()

let srcLock = null
Expand All @@ -381,12 +388,12 @@ module.exports = class Core {
try {
srcLock = await state.mutex.lock()

if (this.tree.length > state.tree.length) {
if (this.state.length > state.length) {
throw new Error('Invalid commit: partial commit') // TODO: partial commit in the future if possible
}

if (this.tree.length > treeLength) {
for (const root of this.tree.roots) {
if (this.state.length > treeLength) {
for (const root of this.state.roots) {
const batchRoot = await state.tree.get(root.index)
if (batchRoot.size !== root.size || !b4a.equals(batchRoot.hash, root.hash)) {
throw new Error('Invalid commit: tree conflict')
Expand All @@ -398,11 +405,11 @@ module.exports = class Core {
throw INVALID_OPERATION('Cannot commit without manifest') // easier to assert than upsert
}

if (this.tree.length < length && !signature) {
signature = this.verifier.sign(state.tree.batch(), keyPair)
if (this.state.length < length && !signature) {
signature = this.verifier.sign(state.createTreeBatch(), keyPair)
}

const tree = await this.state._overwrite(state, this.tree.fork, length, treeLength, signature, false, false, atom)
const tree = await this.state._overwrite(state, this.state.fork, length, treeLength, signature, false, false, atom)

// gc blocks from source
if (treeLength < length) {
Expand Down Expand Up @@ -432,8 +439,8 @@ module.exports = class Core {
this.state.onappend(bitfield)

return {
length: this.tree.length,
byteLength: this.tree.byteLength
length: this.state.length,
byteLength: this.state.byteLength
}
} catch (err) {
if (atom) atom.destroy()
Expand Down Expand Up @@ -556,12 +563,12 @@ module.exports = class Core {
}

async checkConflict (proof, from) {
if (this.tree.length < proof.upgrade.length || proof.fork !== this.tree.fork) {
if (this.state.length < proof.upgrade.length || proof.fork !== this.state.fork) {
// out of date this proof - ignore for now
return false
}

const batch = this.tree.verifyFullyRemote(proof)
const batch = this.tree.verifyFullyRemote(proof, this.state)

try {
this._verifyBatchUpgrade(batch, proof.manifest)
Expand Down Expand Up @@ -592,7 +599,8 @@ module.exports = class Core {
}

async verifyReorg (proof) {
const batch = await this.tree.reorg(proof)
const batch = new ReorgBatch(this.tree, this.state)
await this.tree.reorg(proof, batch)
this._verifyBatchUpgrade(batch, proof.manifest)
return batch
}
Expand All @@ -601,9 +609,9 @@ module.exports = class Core {
// We cannot apply "other forks" atm.
// We should probably still try and they are likely super similar for non upgrades
// but this is easy atm (and the above layer will just retry)
if (proof.fork !== this.tree.fork) return false
if (proof.fork !== this.state.fork) return false

const batch = await this.tree.verify(proof)
const batch = await this.tree.verify(proof, this.state)
if (!batch.commitable()) return false

const value = (proof.block && proof.block.value) || null
Expand Down Expand Up @@ -649,7 +657,7 @@ module.exports = class Core {
openSkipBitfield () {
if (this.skipBitfield !== null) return this.skipBitfield
this.skipBitfield = new RemoteBitfield()
const buf = this.bitfield.toBuffer(this.tree.length)
const buf = this.bitfield.toBuffer(this.state.length)
const bitfield = new Uint32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4)
this.skipBitfield.insert(0, bitfield)
return this.skipBitfield
Expand Down
Loading

0 comments on commit 4cffb5b

Please sign in to comment.