Skip to content

Commit

Permalink
Merge pull request #208 from ssb-ngi-pointer/pValue
Browse files Browse the repository at this point in the history
pValue
  • Loading branch information
arj03 authored Apr 4, 2022
2 parents 6d62f3a + a35f8c2 commit ac35831
Showing 1 changed file with 66 additions and 50 deletions.
116 changes: 66 additions & 50 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,47 +257,43 @@ module.exports = function (log, indexesPath) {
}
}

function seekMinTimestamp(buffer) {
function seekMinTimestamp(buffer, pValue) {
var p = 0 // note you pass in p!
p = bipf.seekKey(buffer, p, B_TIMESTAMP)
const arrivalTimestamp = bipf.decode(buffer, p)
p = 0
p = bipf.seekKey(buffer, p, B_VALUE)
p = bipf.seekKey(buffer, p, B_TIMESTAMP)
p = bipf.seekKey(buffer, pValue, B_TIMESTAMP)
const declaredTimestamp = bipf.decode(buffer, p)
return Math.min(arrivalTimestamp, declaredTimestamp)
}

function seekSequence(buffer) {
var p = 0 // note you pass in p!
p = bipf.seekKey(buffer, p, B_VALUE)
p = bipf.seekKey(buffer, p, B_SEQUENCE)
function seekSequence(buffer, pValue) {
var p = bipf.seekKey(buffer, pValue, B_SEQUENCE)
return bipf.decode(buffer, p)
}

function updateTimestampIndex(seq, offset, buffer) {
function updateTimestampIndex(seq, offset, buffer, pValue) {
if (seq > indexes['timestamp'].count - 1) {
if (seq > indexes['timestamp'].tarr.length - 1)
growTarrIndex(indexes['timestamp'], Float64Array)

indexes['timestamp'].offset = offset

const timestamp = seekMinTimestamp(buffer)
const timestamp = seekMinTimestamp(buffer, pValue)

indexes['timestamp'].tarr[seq] = timestamp
indexes['timestamp'].count = seq + 1
return true
}
}

function updateSequenceIndex(seq, offset, buffer) {
function updateSequenceIndex(seq, offset, buffer, pValue) {
if (seq > indexes['sequence'].count - 1) {
if (seq > indexes['sequence'].tarr.length - 1)
growTarrIndex(indexes['sequence'], Uint32Array)

indexes['sequence'].offset = offset

const sequence = seekSequence(buffer)
const sequence = seekSequence(buffer, pValue)

indexes['sequence'].tarr[seq] = sequence
indexes['sequence'].count = seq + 1
Expand All @@ -315,8 +311,8 @@ module.exports = function (log, indexesPath) {

const undefinedBipf = bipf.allocAndEncode(undefined)

function checkEqual(opData, buffer) {
const fieldStart = opData.seek(buffer)
function checkEqual(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)

if (fieldStart === -1 && opData.value.equals(undefinedBipf)) return true
else return bipf.compare(buffer, fieldStart, opData.value, 0) === 0
Expand All @@ -334,11 +330,13 @@ module.exports = function (log, indexesPath) {
}

function checkComparison(op, buffer) {
const pValue = bipf.seekKey(buffer, 0, B_VALUE)

if (op.data.indexName === 'timestamp') {
const timestamp = seekMinTimestamp(buffer)
const timestamp = seekMinTimestamp(buffer, pValue)
return compareWithRangeOp(op, timestamp)
} else if (op.data.indexName === 'sequence') {
const sequence = seekSequence(buffer)
const sequence = seekSequence(buffer, pValue)
return compareWithRangeOp(op, sequence)
} else {
console.warn(
Expand All @@ -348,21 +346,21 @@ module.exports = function (log, indexesPath) {
}
}

function checkPredicate(opData, buffer) {
const fieldStart = opData.seek(buffer)
function checkPredicate(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
const predicateFn = opData.value
if (fieldStart < 0) return false
const fieldValue = bipf.decode(buffer, fieldStart)
return predicateFn(fieldValue)
}

function checkAbsent(opData, buffer) {
const fieldStart = opData.seek(buffer)
function checkAbsent(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
return fieldStart < 0
}

function checkIncludes(opData, buffer) {
const fieldStart = opData.seek(buffer)
function checkIncludes(opData, buffer, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
if (!~fieldStart) return false
const type = bipf.getEncodedType(buffer, fieldStart)

Expand All @@ -378,7 +376,7 @@ module.exports = function (log, indexesPath) {
}
})
return found
} else return checkEqual(opData, buffer)
} else return checkEqual(opData, buffer, pValue)
}

function safeReadUint32(buf, prefixOffset = 0) {
Expand All @@ -400,9 +398,9 @@ module.exports = function (log, indexesPath) {
arr.push(seq)
}

function updatePrefixMapIndex(opData, index, buffer, seq, offset) {
function updatePrefixMapIndex(opData, index, buffer, seq, offset, pValue) {
if (seq > index.count - 1) {
const fieldStart = opData.seek(buffer)
const fieldStart = opData.seek(buffer, 0, pValue)
if (~fieldStart) {
const buf = bipf.slice(buffer, fieldStart)
if (buf.length) {
Expand All @@ -416,11 +414,11 @@ module.exports = function (log, indexesPath) {
}
}

function updatePrefixIndex(opData, index, buffer, seq, offset) {
function updatePrefixIndex(opData, index, buffer, seq, offset, pValue) {
if (seq > index.count - 1) {
if (seq > index.tarr.length - 1) growTarrIndex(index, Uint32Array)

const fieldStart = opData.seek(buffer)
const fieldStart = opData.seek(buffer, 0, pValue)
if (~fieldStart) {
const buf = bipf.slice(buffer, fieldStart)
index.tarr[seq] = buf.length
Expand All @@ -434,19 +432,19 @@ module.exports = function (log, indexesPath) {
}
}

function updateIndexValue(op, index, buffer, seq) {
if (op.type === 'EQUAL' && checkEqual(op.data, buffer))
function updateIndexValue(op, index, buffer, seq, pValue) {
if (op.type === 'EQUAL' && checkEqual(op.data, buffer, pValue))
index.bitset.add(seq)
else if (op.type === 'PREDICATE' && checkPredicate(op.data, buffer))
else if (op.type === 'PREDICATE' && checkPredicate(op.data, buffer, pValue))
index.bitset.add(seq)
else if (op.type === 'ABSENT' && checkAbsent(op.data, buffer))
else if (op.type === 'ABSENT' && checkAbsent(op.data, buffer, pValue))
index.bitset.add(seq)
else if (op.type === 'INCLUDES' && checkIncludes(op.data, buffer))
else if (op.type === 'INCLUDES' && checkIncludes(op.data, buffer, pValue))
index.bitset.add(seq)
}

function updateAllIndexValue(opData, newIndexes, buffer, seq) {
const fieldStart = opData.seek(buffer)
function updateAllIndexValue(opData, newIndexes, buffer, seq, pValue) {
const fieldStart = opData.seek(buffer, 0, pValue)
const value = bipf.decode(buffer, fieldStart)
const indexName = safeFilename(opData.indexType + '_' + value)

Expand Down Expand Up @@ -541,18 +539,20 @@ module.exports = function (log, indexesPath) {
return
}

if (updateTimestampIndex(seq, offset, buffer))
const pValue = bipf.seekKey(buffer, 0, B_VALUE)

if (updateTimestampIndex(seq, offset, buffer, pValue))
updatedTimestampIndex = true

if (updateSequenceIndex(seq, offset, buffer))
if (updateSequenceIndex(seq, offset, buffer, pValue))
updatedSequenceIndex = true

if (indexNeedsUpdate) {
if (op.data.prefix && op.data.useMap)
updatePrefixMapIndex(op.data, index, buffer, seq, offset)
updatePrefixMapIndex(op.data, index, buffer, seq, offset, pValue)
else if (op.data.prefix)
updatePrefixIndex(op.data, index, buffer, seq, offset)
else updateIndexValue(op, index, buffer, seq)
updatePrefixIndex(op.data, index, buffer, seq, offset, pValue)
else updateIndexValue(op, index, buffer, seq, pValue)
}

if (seq % 1000 === 0) {
Expand Down Expand Up @@ -673,10 +673,12 @@ module.exports = function (log, indexesPath) {
return
}

if (updateTimestampIndex(seq, offset, buffer))
const pValue = bipf.seekKey(buffer, 0, B_VALUE)

if (updateTimestampIndex(seq, offset, buffer, pValue))
updatedTimestampIndex = true

if (updateSequenceIndex(seq, offset, buffer))
if (updateSequenceIndex(seq, offset, buffer, pValue))
updatedSequenceIndex = true

opsMissingIdx.forEach((op) => {
Expand All @@ -686,19 +688,28 @@ module.exports = function (log, indexesPath) {
newIndexes[op.data.indexName],
buffer,
seq,
offset
offset,
pValue
)
else if (op.data.prefix)
updatePrefixIndex(
op.data,
newIndexes[op.data.indexName],
buffer,
seq,
offset
offset,
pValue
)
else if (op.data.indexAll)
updateAllIndexValue(op.data, newIndexes, buffer, seq)
else updateIndexValue(op, newIndexes[op.data.indexName], buffer, seq)
updateAllIndexValue(op.data, newIndexes, buffer, seq, pValue)
else
updateIndexValue(
op,
newIndexes[op.data.indexName],
buffer,
seq,
pValue
)
})

if (seq % 1000 === 0) {
Expand Down Expand Up @@ -871,7 +882,8 @@ module.exports = function (log, indexesPath) {
function checker(value) {
if (!value) return false // deleted

const fieldStart = seek(value)
const pValue = bipf.seekKey(value, 0, B_VALUE)
const fieldStart = seek(value, 0, pValue)

if (target) return bipf.compare(value, fieldStart, target, 0) === 0
else if (~fieldStart) return false
Expand Down Expand Up @@ -1138,13 +1150,17 @@ module.exports = function (log, indexesPath) {
}

function isValueOk(ops, value, isOr) {
const pValue = bipf.seekKey(value, 0, B_VALUE)

for (let i = 0; i < ops.length; ++i) {
const op = ops[i]
let ok = false
if (op.type === 'EQUAL') ok = checkEqual(op.data, value)
else if (op.type === 'PREDICATE') ok = checkPredicate(op.data, value)
else if (op.type === 'ABSENT') ok = checkAbsent(op.data, value)
else if (op.type === 'INCLUDES') ok = checkIncludes(op.data, value)
if (op.type === 'EQUAL') ok = checkEqual(op.data, value, pValue)
else if (op.type === 'PREDICATE')
ok = checkPredicate(op.data, value, pValue)
else if (op.type === 'ABSENT') ok = checkAbsent(op.data, value, pValue)
else if (op.type === 'INCLUDES')
ok = checkIncludes(op.data, value, pValue)
else if (op.type === 'NOT') ok = !isValueOk(op.data, value, false)
else if (op.type === 'AND') ok = isValueOk(op.data, value, false)
else if (op.type === 'OR') ok = isValueOk(op.data, value, true)
Expand Down

0 comments on commit ac35831

Please sign in to comment.