Skip to content

Commit

Permalink
Handle NaN/+Inf/-Inf values when serialising and deserialising rows i…
Browse files Browse the repository at this point in the history
…n the oplog.
  • Loading branch information
kevin-dp committed Oct 25, 2023
1 parent 9410203 commit 2c6fa6b
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 36 deletions.
13 changes: 9 additions & 4 deletions clients/typescript/src/satellite/merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
OPTYPES,
} from './oplog'
import { difference, union } from '../util/sets'
import { Row } from '../util'
import { RelationsCache, Row } from '../util'

/**
* Merge server-sent operation with local pending oplog to arrive at the same row state the server is at.
Expand All @@ -25,15 +25,20 @@ export function mergeEntries(
localOrigin: string,
local: OplogEntry[],
incomingOrigin: string,
incoming: OplogEntry[]
incoming: OplogEntry[],
relations: RelationsCache
): PendingChanges {
const localTableChanges = localOperationsToTableChanges(
local,
(timestamp: Date) => {
return generateTag(localOrigin, timestamp)
}
},
relations
)
const incomingTableChanges = remoteOperationsToTableChanges(
incoming,
relations
)
const incomingTableChanges = remoteOperationsToTableChanges(incoming)

for (const [tablename, incomingMapping] of Object.entries(
incomingTableChanges
Expand Down
102 changes: 86 additions & 16 deletions clients/typescript/src/satellite/oplog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
SqlValue,
DataTransaction,
DataChange,
Record as Rec,
Relation,
} from '../util/types'
import { union } from '../util/sets'
import { numberToBytes } from '../util/common'
Expand Down Expand Up @@ -115,7 +117,8 @@ export const stringToOpType = (opTypeStr: string): OpType => {
// parsing out the changed columns from the oldRow and the newRow.
export const localEntryToChanges = (
entry: OplogEntry,
tag: Tag
tag: Tag,
relations: RelationsCache
): OplogEntryChanges => {
const result: OplogEntryChanges = {
namespace: entry.namespace,
Expand All @@ -127,8 +130,14 @@ export const localEntryToChanges = (
clearTags: decodeTags(entry.clearTags),
}

const oldRow: Row = entry.oldRow ? JSON.parse(entry.oldRow) : {}
const newRow: Row = entry.newRow ? JSON.parse(entry.newRow) : {}
const relation = relations[entry.tablename]

const oldRow: Row = entry.oldRow
? (deserialiseRow(entry.oldRow, relation) as Row)
: {}
const newRow: Row = entry.newRow
? (deserialiseRow(entry.newRow, relation) as Row)
: {}

const timestamp = new Date(entry.timestamp).getTime()

Expand All @@ -143,9 +152,17 @@ export const localEntryToChanges = (

// Convert an `OplogEntry` to a `ShadowEntryChanges` structure,
// parsing out the changed columns from the oldRow and the newRow.
export const remoteEntryToChanges = (entry: OplogEntry): ShadowEntryChanges => {
const oldRow: Row = entry.oldRow ? JSON.parse(entry.oldRow) : {}
const newRow: Row = entry.newRow ? JSON.parse(entry.newRow) : {}
export const remoteEntryToChanges = (
entry: OplogEntry,
relations: RelationsCache
): ShadowEntryChanges => {
const relation = relations[entry.tablename]
const oldRow: Row = entry.oldRow
? (deserialiseRow(entry.oldRow, relation) as Row)
: {}
const newRow: Row = entry.newRow
? (deserialiseRow(entry.newRow, relation) as Row)
: {}

const result: ShadowEntryChanges = {
namespace: entry.namespace,
Expand Down Expand Up @@ -182,14 +199,16 @@ export const remoteEntryToChanges = (entry: OplogEntry): ShadowEntryChanges => {
*/
export const localOperationsToTableChanges = (
operations: OplogEntry[],
genTag: (timestamp: Date) => Tag
genTag: (timestamp: Date) => Tag,
relations: RelationsCache
): OplogTableChanges => {
const initialValue: OplogTableChanges = {}

return operations.reduce((acc, entry) => {
const entryChanges = localEntryToChanges(
entry,
genTag(new Date(entry.timestamp))
genTag(new Date(entry.timestamp)),
relations
)

// Sort for deterministic key generation.
Expand Down Expand Up @@ -231,12 +250,13 @@ export const localOperationsToTableChanges = (
}

export const remoteOperationsToTableChanges = (
operations: OplogEntry[]
operations: OplogEntry[],
relations: RelationsCache
): PendingChanges => {
const initialValue: PendingChanges = {}

return operations.reduce((acc, entry) => {
const entryChanges = remoteEntryToChanges(entry)
const entryChanges = remoteEntryToChanges(entry, relations)

// Sort for deterministic key generation.
const primaryKeyStr = primaryKeyToStr(entryChanges.primaryKeyCols)
Expand Down Expand Up @@ -264,6 +284,56 @@ export const remoteOperationsToTableChanges = (
}, initialValue)
}

/**
* Serialises a row that is represented by a record.
* `NaN`, `+Inf`, and `-Inf` are transformed to their string equivalent.
* @param record The row to serialise.
*/
function serialiseRow(row?: Rec): string {
return JSON.stringify(row, (_key, value) => {
if (typeof value === 'number') {
if (Number.isNaN(value)) {
return 'NaN'
} else if (value === Infinity) {
return '+Inf'
} else if (value === -Infinity) {
return '-Inf'
}
}
return value
})
}

/**
* Deserialises a row back into a record.
* `"NaN"`, `"+Inf"`, and `"-Inf"` are transformed back into their numeric equivalent
* iff the column type is a float.
* @param str The row to deserialise.
* @param rel The relation for the table to which this row belongs.
*/
function deserialiseRow(str: string, rel: Pick<Relation, 'columns'>): Rec {
return JSON.parse(str, (key, value) => {
const columnType = rel.columns
.find((c) => c.name === key)
?.type?.toUpperCase()
if (
(columnType === 'FLOAT4' ||
columnType === 'FLOAT8' ||
columnType === 'REAL') &&
typeof value === 'string'
) {
if (value === 'NaN') {
return NaN
} else if (value === '+Inf') {
return Infinity
} else if (value === '-Inf') {
return -Infinity
}
}
return value
})
}

export const fromTransaction = (
transaction: DataTransaction,
relations: RelationsCache
Expand All @@ -282,8 +352,8 @@ export const fromTransaction = (
namespace: 'main', // TODO: how?
tablename: t.relation.table,
optype: stringToOpType(t.type),
newRow: JSON.stringify(t.record),
oldRow: JSON.stringify(t.oldRecord),
newRow: serialiseRow(t.record),
oldRow: serialiseRow(t.oldRecord),
primaryKey: pk,
rowid: -1, // not required
timestamp: new Date(
Expand Down Expand Up @@ -366,17 +436,17 @@ export const opLogEntryToChange = (
entry: OplogEntry,
relations: RelationsCache
): DataChange => {
const relation = relations[`${entry.tablename}`]

let record, oldRecord
if (entry.newRow != null) {
record = JSON.parse(entry.newRow)
record = deserialiseRow(entry.newRow, relation)
}

if (entry.oldRow != null) {
oldRecord = JSON.parse(entry.oldRow)
oldRecord = deserialiseRow(entry.oldRow, relation)
}

const relation = relations[`${entry.tablename}`]

if (typeof relation === 'undefined') {
throw new Error(`Could not find relation for ${entry.tablename}`)
}
Expand Down
3 changes: 2 additions & 1 deletion clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,8 @@ export class SatelliteProcess implements Satellite {
this._authState!.clientId,
local,
incoming_origin,
incoming
incoming,
this.relations
)

const stmts: Statement[] = []
Expand Down
20 changes: 20 additions & 0 deletions clients/typescript/test/satellite/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,26 @@ export const relations = {
},
],
},
floatTable: {
id: 3,
schema: 'public',
table: 'floatTable',
tableType: 0,
columns: [
{
name: 'id',
type: 'INTEGER',
isNullable: false,
primaryKey: true,
},
{
name: 'value',
type: 'REAL',
isNullable: true,
primaryKey: false,
},
],
},
}

import migrations from '../support/migrations/migrations.js'
Expand Down
118 changes: 111 additions & 7 deletions clients/typescript/test/satellite/merge.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import test from 'ava'
import { mergeEntries } from '../../src/satellite/merge'
import { OplogEntry, primaryKeyToStr } from '../../src/satellite/oplog'
import {
OplogEntry,
fromTransaction,
primaryKeyToStr,
} from '../../src/satellite/oplog'
import {
DEFAULT_LOG_POS,
DataChangeType,
DataTransaction,
} from '../../src/util'
import Long from 'long'
import { relations } from './common'

test('merging entries: local no-op updates should cancel incoming delete', (t) => {
const pk = primaryKeyToStr({ id: 1 })
Expand All @@ -9,7 +20,7 @@ test('merging entries: local no-op updates should cancel incoming delete', (t) =
{
rowid: 0,
namespace: 'main',
tablename: 'public',
tablename: 'parent',
optype: 'UPDATE',
timestamp: '1970-01-02T03:46:41.000Z', // 100001000 as a unix timestamp
primaryKey: pk,
Expand All @@ -23,7 +34,7 @@ test('merging entries: local no-op updates should cancel incoming delete', (t) =
{
rowid: 0,
namespace: 'main',
tablename: 'public',
tablename: 'parent',
optype: 'DELETE',
timestamp: '1970-01-02T03:46:42.000Z', // 100002000 as a unix timestamp
primaryKey: pk,
Expand All @@ -32,10 +43,103 @@ test('merging entries: local no-op updates should cancel incoming delete', (t) =
},
]

const merged = mergeEntries('local', local, 'remote', remote)
const merged = mergeEntries('local', local, 'remote', remote, relations)

// Merge should resolve into the UPSERT for this row, since the remote DELETE didn't observe this local update
t.like(merged, { 'main.public': { [pk]: { optype: 'UPSERT' } } })
t.deepEqual(merged['main.public'][pk].tags, ['local@100001000'])
t.deepEqual(merged['main.public'][pk].fullRow, { id: 1, value: 'TEST' })
t.like(merged, { 'main.parent': { [pk]: { optype: 'UPSERT' } } })
t.deepEqual(merged['main.parent'][pk].tags, ['local@100001000'])
t.deepEqual(merged['main.parent'][pk].fullRow, { id: 1, value: 'TEST' })
})

test('merge can handle infinity values', (t) => {
const pk = primaryKeyToStr({ id: 1 })

const to_commit_timestamp = (timestamp: string): Long =>
Long.UZERO.add(new Date(timestamp).getTime())

const tx1: DataTransaction = {
lsn: DEFAULT_LOG_POS,
commit_timestamp: to_commit_timestamp('1970-01-02T03:46:41.000Z'),
changes: [
{
relation: relations.floatTable,
type: DataChangeType.INSERT,
record: { id: 1, value: +Infinity },
tags: [],
},
],
}

const tx2: DataTransaction = {
lsn: DEFAULT_LOG_POS,
commit_timestamp: to_commit_timestamp('1970-01-02T03:46:42.000Z'),
changes: [
{
relation: relations.floatTable,
type: DataChangeType.INSERT,
record: { id: 1, value: -Infinity },
tags: [],
},
],
}

// we go through `fromTransaction` on purpose
// in order to also test serialisation/deserialisation of the rows
const entry1: OplogEntry[] = fromTransaction(tx1, relations)
const entry2: OplogEntry[] = fromTransaction(tx2, relations)

const merged = mergeEntries('local', entry1, 'remote', entry2, relations)

// tx2 should win because tx1 and tx2 happened concurrently
// but the timestamp of tx2 > tx1
t.like(merged, { 'main.floatTable': { [pk]: { optype: 'UPSERT' } } })
t.deepEqual(merged['main.floatTable'][pk].fullRow, {
id: 1,
value: -Infinity,
})
})

test('merge can handle NaN values', (t) => {
const pk = primaryKeyToStr({ id: 1 })

const to_commit_timestamp = (timestamp: string): Long =>
Long.UZERO.add(new Date(timestamp).getTime())

const tx1: DataTransaction = {
lsn: DEFAULT_LOG_POS,
commit_timestamp: to_commit_timestamp('1970-01-02T03:46:41.000Z'),
changes: [
{
relation: relations.floatTable,
type: DataChangeType.INSERT,
record: { id: 1, value: 5.0 },
tags: [],
},
],
}

const tx2: DataTransaction = {
lsn: DEFAULT_LOG_POS,
commit_timestamp: to_commit_timestamp('1970-01-02T03:46:42.000Z'),
changes: [
{
relation: relations.floatTable,
type: DataChangeType.INSERT,
record: { id: 1, value: NaN },
tags: [],
},
],
}

// we go through `fromTransaction` on purpose
// in order to also test serialisation/deserialisation of the rows
const entry1: OplogEntry[] = fromTransaction(tx1, relations)
const entry2: OplogEntry[] = fromTransaction(tx2, relations)

const merged = mergeEntries('local', entry1, 'remote', entry2, relations)

// tx2 should win because tx1 and tx2 happened concurrently
// but the timestamp of tx2 > tx1
t.like(merged, { 'main.floatTable': { [pk]: { optype: 'UPSERT' } } })
t.deepEqual(merged['main.floatTable'][pk].fullRow, { id: 1, value: NaN })
})
Loading

0 comments on commit 2c6fa6b

Please sign in to comment.