Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pValue #208

Merged
merged 2 commits into from
Apr 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 66 additions & 50 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,47 +257,43 @@ module.exports = function (log, indexesPath) {
}
}

function seekMinTimestamp(buffer) {
function seekMinTimestamp(buffer, pValue) {
var p = 0 // note you pass in p!
p = bipf.seekKey(buffer, p, B_TIMESTAMP)
const arrivalTimestamp = bipf.decode(buffer, p)
p = 0
p = bipf.seekKey(buffer, p, B_VALUE)
p = bipf.seekKey(buffer, p, B_TIMESTAMP)
p = bipf.seekKey(buffer, pValue, B_TIMESTAMP)
const declaredTimestamp = bipf.decode(buffer, p)
return Math.min(arrivalTimestamp, declaredTimestamp)
}

function seekSequence(buffer) {
var p = 0 // note you pass in p!
p = bipf.seekKey(buffer, p, B_VALUE)
p = bipf.seekKey(buffer, p, B_SEQUENCE)
function seekSequence(buffer, pValue) {
var p = bipf.seekKey(buffer, pValue, B_SEQUENCE)
return bipf.decode(buffer, p)
}

function updateTimestampIndex(seq, offset, buffer) {
function updateTimestampIndex(seq, offset, buffer, pValue) {
if (seq > indexes['timestamp'].count - 1) {
if (seq > indexes['timestamp'].tarr.length - 1)
growTarrIndex(indexes['timestamp'], Float64Array)

indexes['timestamp'].offset = offset

const timestamp = seekMinTimestamp(buffer)
const timestamp = seekMinTimestamp(buffer, pValue)

indexes['timestamp'].tarr[seq] = timestamp
indexes['timestamp'].count = seq + 1
return true
}
}

function updateSequenceIndex(seq, offset, buffer) {
function updateSequenceIndex(seq, offset, buffer, pValue) {
if (seq > indexes['sequence'].count - 1) {
if (seq > indexes['sequence'].tarr.length - 1)
growTarrIndex(indexes['sequence'], Uint32Array)

indexes['sequence'].offset = offset

const sequence = seekSequence(buffer)
const sequence = seekSequence(buffer, pValue)

indexes['sequence'].tarr[seq] = sequence
indexes['sequence'].count = seq + 1
Expand All @@ -315,8 +311,8 @@ module.exports = function (log, indexesPath) {

const undefinedBipf = bipf.allocAndEncode(undefined)

function checkEqual(opData, buffer) {
const fieldStart = opData.seek(buffer)
function checkEqual(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
staltz marked this conversation as resolved.
Show resolved Hide resolved

if (fieldStart === -1 && opData.value.equals(undefinedBipf)) return true
else return bipf.compare(buffer, fieldStart, opData.value, 0) === 0
Expand All @@ -334,11 +330,13 @@ module.exports = function (log, indexesPath) {
}

function checkComparison(op, buffer) {
const pValue = bipf.seekKey(buffer, 0, B_VALUE)

if (op.data.indexName === 'timestamp') {
const timestamp = seekMinTimestamp(buffer)
const timestamp = seekMinTimestamp(buffer, pValue)
return compareWithRangeOp(op, timestamp)
} else if (op.data.indexName === 'sequence') {
const sequence = seekSequence(buffer)
const sequence = seekSequence(buffer, pValue)
return compareWithRangeOp(op, sequence)
} else {
console.warn(
Expand All @@ -348,21 +346,21 @@ module.exports = function (log, indexesPath) {
}
}

function checkPredicate(opData, buffer) {
const fieldStart = opData.seek(buffer)
function checkPredicate(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
const predicateFn = opData.value
if (fieldStart < 0) return false
const fieldValue = bipf.decode(buffer, fieldStart)
return predicateFn(fieldValue)
}

function checkAbsent(opData, buffer) {
const fieldStart = opData.seek(buffer)
function checkAbsent(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
return fieldStart < 0
}

function checkIncludes(opData, buffer) {
const fieldStart = opData.seek(buffer)
function checkIncludes(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
if (!~fieldStart) return false
const type = bipf.getEncodedType(buffer, fieldStart)

Expand All @@ -378,7 +376,7 @@ module.exports = function (log, indexesPath) {
}
})
return found
} else return checkEqual(opData, buffer)
} else return checkEqual(opData, buffer, pValue)
}

function safeReadUint32(buf, prefixOffset = 0) {
Expand All @@ -400,9 +398,9 @@ module.exports = function (log, indexesPath) {
arr.push(seq)
}

function updatePrefixMapIndex(opData, index, buffer, seq, offset) {
function updatePrefixMapIndex(opData, index, buffer, seq, offset, pValue) {
if (seq > index.count - 1) {
const fieldStart = opData.seek(buffer)
const fieldStart = opData.seek(buffer, 0, pValue)
if (~fieldStart) {
const buf = bipf.slice(buffer, fieldStart)
if (buf.length) {
Expand All @@ -416,11 +414,11 @@ module.exports = function (log, indexesPath) {
}
}

function updatePrefixIndex(opData, index, buffer, seq, offset) {
function updatePrefixIndex(opData, index, buffer, seq, offset, pValue) {
if (seq > index.count - 1) {
if (seq > index.tarr.length - 1) growTarrIndex(index, Uint32Array)

const fieldStart = opData.seek(buffer)
const fieldStart = opData.seek(buffer, 0, pValue)
if (~fieldStart) {
const buf = bipf.slice(buffer, fieldStart)
index.tarr[seq] = buf.length
Expand All @@ -434,19 +432,19 @@ module.exports = function (log, indexesPath) {
}
}

function updateIndexValue(op, index, buffer, seq) {
if (op.type === 'EQUAL' && checkEqual(op.data, buffer))
function updateIndexValue(op, index, buffer, seq, pValue) {
if (op.type === 'EQUAL' && checkEqual(op.data, buffer, pValue))
index.bitset.add(seq)
else if (op.type === 'PREDICATE' && checkPredicate(op.data, buffer))
else if (op.type === 'PREDICATE' && checkPredicate(op.data, buffer, pValue))
index.bitset.add(seq)
else if (op.type === 'ABSENT' && checkAbsent(op.data, buffer))
else if (op.type === 'ABSENT' && checkAbsent(op.data, buffer, pValue))
index.bitset.add(seq)
else if (op.type === 'INCLUDES' && checkIncludes(op.data, buffer))
else if (op.type === 'INCLUDES' && checkIncludes(op.data, buffer, pValue))
index.bitset.add(seq)
}

function updateAllIndexValue(opData, newIndexes, buffer, seq) {
const fieldStart = opData.seek(buffer)
function updateAllIndexValue(opData, newIndexes, buffer, seq, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
const value = bipf.decode(buffer, fieldStart)
const indexName = safeFilename(opData.indexType + '_' + value)

Expand Down Expand Up @@ -541,18 +539,20 @@ module.exports = function (log, indexesPath) {
return
}

if (updateTimestampIndex(seq, offset, buffer))
const pValue = bipf.seekKey(buffer, 0, B_VALUE)

if (updateTimestampIndex(seq, offset, buffer, pValue))
updatedTimestampIndex = true

if (updateSequenceIndex(seq, offset, buffer))
if (updateSequenceIndex(seq, offset, buffer, pValue))
updatedSequenceIndex = true

if (indexNeedsUpdate) {
if (op.data.prefix && op.data.useMap)
updatePrefixMapIndex(op.data, index, buffer, seq, offset)
updatePrefixMapIndex(op.data, index, buffer, seq, offset, pValue)
else if (op.data.prefix)
updatePrefixIndex(op.data, index, buffer, seq, offset)
else updateIndexValue(op, index, buffer, seq)
updatePrefixIndex(op.data, index, buffer, seq, offset, pValue)
else updateIndexValue(op, index, buffer, seq, pValue)
}

if (seq % 1000 === 0) {
Expand Down Expand Up @@ -673,10 +673,12 @@ module.exports = function (log, indexesPath) {
return
}

if (updateTimestampIndex(seq, offset, buffer))
const pValue = bipf.seekKey(buffer, 0, B_VALUE)

if (updateTimestampIndex(seq, offset, buffer, pValue))
updatedTimestampIndex = true

if (updateSequenceIndex(seq, offset, buffer))
if (updateSequenceIndex(seq, offset, buffer, pValue))
updatedSequenceIndex = true

opsMissingIdx.forEach((op) => {
Expand All @@ -686,19 +688,28 @@ module.exports = function (log, indexesPath) {
newIndexes[op.data.indexName],
buffer,
seq,
offset
offset,
pValue
)
else if (op.data.prefix)
updatePrefixIndex(
op.data,
newIndexes[op.data.indexName],
buffer,
seq,
offset
offset,
pValue
)
else if (op.data.indexAll)
updateAllIndexValue(op.data, newIndexes, buffer, seq)
else updateIndexValue(op, newIndexes[op.data.indexName], buffer, seq)
updateAllIndexValue(op.data, newIndexes, buffer, seq, pValue)
else
updateIndexValue(
op,
newIndexes[op.data.indexName],
buffer,
seq,
pValue
)
})

if (seq % 1000 === 0) {
Expand Down Expand Up @@ -871,7 +882,8 @@ module.exports = function (log, indexesPath) {
function checker(value) {
if (!value) return false // deleted

const fieldStart = seek(value)
const pValue = bipf.seekKey(value, 0, B_VALUE)
const fieldStart = seek(value, 0, pValue)

if (target) return bipf.compare(value, fieldStart, target, 0) === 0
else if (~fieldStart) return false
Expand Down Expand Up @@ -1138,13 +1150,17 @@ module.exports = function (log, indexesPath) {
}

function isValueOk(ops, value, isOr) {
const pValue = bipf.seekKey(value, 0, B_VALUE)

for (let i = 0; i < ops.length; ++i) {
const op = ops[i]
let ok = false
if (op.type === 'EQUAL') ok = checkEqual(op.data, value)
else if (op.type === 'PREDICATE') ok = checkPredicate(op.data, value)
else if (op.type === 'ABSENT') ok = checkAbsent(op.data, value)
else if (op.type === 'INCLUDES') ok = checkIncludes(op.data, value)
if (op.type === 'EQUAL') ok = checkEqual(op.data, value, pValue)
else if (op.type === 'PREDICATE')
ok = checkPredicate(op.data, value, pValue)
else if (op.type === 'ABSENT') ok = checkAbsent(op.data, value, pValue)
else if (op.type === 'INCLUDES')
ok = checkIncludes(op.data, value, pValue)
else if (op.type === 'NOT') ok = !isValueOk(op.data, value, false)
else if (op.type === 'AND') ok = isValueOk(op.data, value, false)
else if (op.type === 'OR') ok = isValueOk(op.data, value, true)
Expand Down