Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use bipf 1.6.0 seekKeyCached #209

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 53 additions & 74 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ module.exports = function (log, indexesPath) {
else waiting.push(cb)
}

const B_TIMESTAMP = Buffer.from('timestamp')
const B_SEQUENCE = Buffer.from('sequence')
const B_VALUE = Buffer.from('value')

function loadIndexes(cb) {
function parseIndexes(err, files) {
push(
Expand Down Expand Up @@ -257,43 +253,46 @@ module.exports = function (log, indexesPath) {
}
}

function seekMinTimestamp(buffer, pValue) {
var p = 0 // note you pass in p!
p = bipf.seekKey(buffer, p, B_TIMESTAMP)
const arrivalTimestamp = bipf.decode(buffer, p)
p = bipf.seekKey(buffer, pValue, B_TIMESTAMP)
const declaredTimestamp = bipf.decode(buffer, p)
const B_TIMESTAMP = Buffer.from('timestamp')

function seekMinTimestamp(buffer) {
const pTimestamp = bipf.seekKey(buffer, 0, B_TIMESTAMP)
const arrivalTimestamp = bipf.decode(buffer, pTimestamp)
const pValue = bipf.seekKeyCached(buffer, 0, 'value')
const pValueTimestamp = bipf.seekKeyCached(buffer, pValue, 'timestamp')
const declaredTimestamp = bipf.decode(buffer, pValueTimestamp)
return Math.min(arrivalTimestamp, declaredTimestamp)
}

function seekSequence(buffer, pValue) {
var p = bipf.seekKey(buffer, pValue, B_SEQUENCE)
function seekSequence(buffer) {
const pValue = bipf.seekKeyCached(buffer, 0, 'value')
const p = bipf.seekKeyCached(buffer, pValue, 'sequence')
return bipf.decode(buffer, p)
}

function updateTimestampIndex(seq, offset, buffer, pValue) {
function updateTimestampIndex(seq, offset, buffer) {
if (seq > indexes['timestamp'].count - 1) {
if (seq > indexes['timestamp'].tarr.length - 1)
growTarrIndex(indexes['timestamp'], Float64Array)

indexes['timestamp'].offset = offset

const timestamp = seekMinTimestamp(buffer, pValue)
const timestamp = seekMinTimestamp(buffer)

indexes['timestamp'].tarr[seq] = timestamp
indexes['timestamp'].count = seq + 1
return true
}
}

function updateSequenceIndex(seq, offset, buffer, pValue) {
function updateSequenceIndex(seq, offset, buffer) {
if (seq > indexes['sequence'].count - 1) {
if (seq > indexes['sequence'].tarr.length - 1)
growTarrIndex(indexes['sequence'], Uint32Array)

indexes['sequence'].offset = offset

const sequence = seekSequence(buffer, pValue)
const sequence = seekSequence(buffer)

indexes['sequence'].tarr[seq] = sequence
indexes['sequence'].count = seq + 1
Expand All @@ -311,8 +310,8 @@ module.exports = function (log, indexesPath) {

const undefinedBipf = bipf.allocAndEncode(undefined)

function checkEqual(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
function checkEqual(opData, buffer) {
const fieldStart = opData.seek(buffer)

if (fieldStart === -1 && opData.value.equals(undefinedBipf)) return true
else return bipf.compare(buffer, fieldStart, opData.value, 0) === 0
Expand All @@ -330,13 +329,11 @@ module.exports = function (log, indexesPath) {
}

function checkComparison(op, buffer) {
const pValue = bipf.seekKey(buffer, 0, B_VALUE)

if (op.data.indexName === 'timestamp') {
const timestamp = seekMinTimestamp(buffer, pValue)
const timestamp = seekMinTimestamp(buffer)
return compareWithRangeOp(op, timestamp)
} else if (op.data.indexName === 'sequence') {
const sequence = seekSequence(buffer, pValue)
const sequence = seekSequence(buffer)
return compareWithRangeOp(op, sequence)
} else {
console.warn(
Expand All @@ -346,21 +343,21 @@ module.exports = function (log, indexesPath) {
}
}

function checkPredicate(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
function checkPredicate(opData, buffer) {
const fieldStart = opData.seek(buffer)
const predicateFn = opData.value
if (fieldStart < 0) return false
const fieldValue = bipf.decode(buffer, fieldStart)
return predicateFn(fieldValue)
}

function checkAbsent(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
function checkAbsent(opData, buffer) {
const fieldStart = opData.seek(buffer)
return fieldStart < 0
}

function checkIncludes(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
function checkIncludes(opData, buffer) {
const fieldStart = opData.seek(buffer)
if (!~fieldStart) return false
const type = bipf.getEncodedType(buffer, fieldStart)

Expand All @@ -376,7 +373,7 @@ module.exports = function (log, indexesPath) {
}
})
return found
} else return checkEqual(opData, buffer, pValue)
} else return checkEqual(opData, buffer)
}

function safeReadUint32(buf, prefixOffset = 0) {
Expand All @@ -398,9 +395,9 @@ module.exports = function (log, indexesPath) {
arr.push(seq)
}

function updatePrefixMapIndex(opData, index, buffer, seq, offset, pValue) {
function updatePrefixMapIndex(opData, index, buffer, seq, offset) {
if (seq > index.count - 1) {
const fieldStart = opData.seek(buffer, 0, pValue)
const fieldStart = opData.seek(buffer)
if (~fieldStart) {
const buf = bipf.slice(buffer, fieldStart)
if (buf.length) {
Expand All @@ -414,11 +411,11 @@ module.exports = function (log, indexesPath) {
}
}

function updatePrefixIndex(opData, index, buffer, seq, offset, pValue) {
function updatePrefixIndex(opData, index, buffer, seq, offset) {
if (seq > index.count - 1) {
if (seq > index.tarr.length - 1) growTarrIndex(index, Uint32Array)

const fieldStart = opData.seek(buffer, 0, pValue)
const fieldStart = opData.seek(buffer)
if (~fieldStart) {
const buf = bipf.slice(buffer, fieldStart)
index.tarr[seq] = buf.length
Expand All @@ -432,19 +429,19 @@ module.exports = function (log, indexesPath) {
}
}

function updateIndexValue(op, index, buffer, seq, pValue) {
if (op.type === 'EQUAL' && checkEqual(op.data, buffer, pValue))
function updateIndexValue(op, index, buffer, seq) {
if (op.type === 'EQUAL' && checkEqual(op.data, buffer))
index.bitset.add(seq)
else if (op.type === 'PREDICATE' && checkPredicate(op.data, buffer, pValue))
else if (op.type === 'PREDICATE' && checkPredicate(op.data, buffer))
index.bitset.add(seq)
else if (op.type === 'ABSENT' && checkAbsent(op.data, buffer, pValue))
else if (op.type === 'ABSENT' && checkAbsent(op.data, buffer))
index.bitset.add(seq)
else if (op.type === 'INCLUDES' && checkIncludes(op.data, buffer, pValue))
else if (op.type === 'INCLUDES' && checkIncludes(op.data, buffer))
index.bitset.add(seq)
}

function updateAllIndexValue(opData, newIndexes, buffer, seq, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
function updateAllIndexValue(opData, newIndexes, buffer, seq) {
const fieldStart = opData.seek(buffer)
const value = bipf.decode(buffer, fieldStart)
const indexName = safeFilename(opData.indexType + '_' + value)

Expand Down Expand Up @@ -539,20 +536,18 @@ module.exports = function (log, indexesPath) {
return
}

const pValue = bipf.seekKey(buffer, 0, B_VALUE)

if (updateTimestampIndex(seq, offset, buffer, pValue))
if (updateTimestampIndex(seq, offset, buffer))
updatedTimestampIndex = true

if (updateSequenceIndex(seq, offset, buffer, pValue))
if (updateSequenceIndex(seq, offset, buffer))
updatedSequenceIndex = true

if (indexNeedsUpdate) {
if (op.data.prefix && op.data.useMap)
updatePrefixMapIndex(op.data, index, buffer, seq, offset, pValue)
updatePrefixMapIndex(op.data, index, buffer, seq, offset)
else if (op.data.prefix)
updatePrefixIndex(op.data, index, buffer, seq, offset, pValue)
else updateIndexValue(op, index, buffer, seq, pValue)
updatePrefixIndex(op.data, index, buffer, seq, offset)
else updateIndexValue(op, index, buffer, seq)
}

if (seq % 1000 === 0) {
Expand Down Expand Up @@ -673,12 +668,10 @@ module.exports = function (log, indexesPath) {
return
}

const pValue = bipf.seekKey(buffer, 0, B_VALUE)

if (updateTimestampIndex(seq, offset, buffer, pValue))
if (updateTimestampIndex(seq, offset, buffer))
updatedTimestampIndex = true

if (updateSequenceIndex(seq, offset, buffer, pValue))
if (updateSequenceIndex(seq, offset, buffer))
updatedSequenceIndex = true

opsMissingIdx.forEach((op) => {
Expand All @@ -688,28 +681,19 @@ module.exports = function (log, indexesPath) {
newIndexes[op.data.indexName],
buffer,
seq,
offset,
pValue
offset
)
else if (op.data.prefix)
updatePrefixIndex(
op.data,
newIndexes[op.data.indexName],
buffer,
seq,
offset,
pValue
offset
)
else if (op.data.indexAll)
updateAllIndexValue(op.data, newIndexes, buffer, seq, pValue)
else
updateIndexValue(
op,
newIndexes[op.data.indexName],
buffer,
seq,
pValue
)
updateAllIndexValue(op.data, newIndexes, buffer, seq)
else updateIndexValue(op, newIndexes[op.data.indexName], buffer, seq)
})

if (seq % 1000 === 0) {
Expand Down Expand Up @@ -882,8 +866,7 @@ module.exports = function (log, indexesPath) {
function checker(value) {
if (!value) return false // deleted

const pValue = bipf.seekKey(value, 0, B_VALUE)
const fieldStart = seek(value, 0, pValue)
const fieldStart = seek(value)

if (target) return bipf.compare(value, fieldStart, target, 0) === 0
else if (~fieldStart) return false
Expand Down Expand Up @@ -1150,17 +1133,13 @@ module.exports = function (log, indexesPath) {
}

function isValueOk(ops, value, isOr) {
const pValue = bipf.seekKey(value, 0, B_VALUE)

for (let i = 0; i < ops.length; ++i) {
const op = ops[i]
let ok = false
if (op.type === 'EQUAL') ok = checkEqual(op.data, value, pValue)
else if (op.type === 'PREDICATE')
ok = checkPredicate(op.data, value, pValue)
else if (op.type === 'ABSENT') ok = checkAbsent(op.data, value, pValue)
else if (op.type === 'INCLUDES')
ok = checkIncludes(op.data, value, pValue)
if (op.type === 'EQUAL') ok = checkEqual(op.data, value)
else if (op.type === 'PREDICATE') ok = checkPredicate(op.data, value)
else if (op.type === 'ABSENT') ok = checkAbsent(op.data, value)
else if (op.type === 'INCLUDES') ok = checkIncludes(op.data, value)
else if (op.type === 'NOT') ok = !isValueOk(op.data, value, false)
else if (op.type === 'AND') ok = isValueOk(op.data, value, false)
else if (op.type === 'OR') ok = isValueOk(op.data, value, true)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"dependencies": {
"atomic-file-rw": "^0.2.1",
"binary-search-bounds": "^2.0.4",
"bipf": "^1.5.4",
"bipf": "^1.6.1",
"crc": "3.6.0",
"debug": "^4.2.0",
"fastpriorityqueue": "^0.7.1",
Expand Down
Loading