Skip to content

Commit

Permalink
bug(satellite): potential race condition in _performSnapshot() (#575)
Browse files Browse the repository at this point in the history
Fixes issue with `_performSnapshot()` where we would not reset the assertion on a except
  • Loading branch information
balegas authored Oct 30, 2023
1 parent ff27bc7 commit d60e9ce
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 71 deletions.
5 changes: 5 additions & 0 deletions .changeset/chilled-cameras-swim.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

handle exceptions properly in \_performSnapshot
137 changes: 71 additions & 66 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,7 @@ export class SatelliteProcess implements Satellite {
}

// Perform a snapshot and notify which data actually changed.
// It is not safe to call this function concurrently. Consider
// using a wrapped version
// It is not safe to call concurrently. Use mutexSnapshot.
async _performSnapshot(): Promise<Date> {
// assert a single call at a time
if (this.performingSnapshot) {
Expand All @@ -839,26 +838,27 @@ export class SatelliteProcess implements Satellite {
this.performingSnapshot = true
}

const oplog = this.opts.oplogTable
const shadow = this.opts.shadowTable
const timestamp = new Date()
const newTag = this._generateTag(timestamp)

/*
* IMPORTANT!
*
* The following queries make use of a documented but rare SQLite behaviour that allows selecting bare column
* on aggregate queries: https://sqlite.org/lang_select.html#bare_columns_in_an_aggregate_query
*
* In short, when a query has a `GROUP BY` clause with a single `min()` or `max()` present in SELECT/HAVING,
* then the "bare" columns (i.e. those not mentioned in a `GROUP BY` clause) are definitely the ones from the
* row that satisfied that `min`/`max` function. We make use of it here to find first/last operations in the
* oplog that touch a particular row.
*/

// Update the timestamps on all "new" entries - they have been added but timestamp is still `NULL`
const q1: Statement = {
sql: `
try {
const oplog = this.opts.oplogTable
const shadow = this.opts.shadowTable
const timestamp = new Date()
const newTag = this._generateTag(timestamp)

/*
* IMPORTANT!
*
* The following queries make use of a documented but rare SQLite behaviour that allows selecting bare column
* on aggregate queries: https://sqlite.org/lang_select.html#bare_columns_in_an_aggregate_query
*
* In short, when a query has a `GROUP BY` clause with a single `min()` or `max()` present in SELECT/HAVING,
* then the "bare" columns (i.e. those not mentioned in a `GROUP BY` clause) are definitely the ones from the
* row that satisfied that `min`/`max` function. We make use of it here to find first/last operations in the
* oplog that touch a particular row.
*/

// Update the timestamps on all "new" entries - they have been added but timestamp is still `NULL`
const q1: Statement = {
sql: `
UPDATE ${oplog} SET timestamp = ?
WHERE rowid in (
SELECT rowid FROM ${oplog}
Expand All @@ -867,12 +867,12 @@ export class SatelliteProcess implements Satellite {
)
RETURNING *
`,
args: [timestamp.toISOString()],
}
args: [timestamp.toISOString()],
}

// For each first oplog entry per element, set `clearTags` array to previous tags from the shadow table
const q2: Statement = {
sql: `
// For each first oplog entry per element, set `clearTags` array to previous tags from the shadow table
const q2: Statement = {
sql: `
UPDATE ${oplog}
SET clearTags = updates.tags
FROM (
Expand All @@ -887,26 +887,26 @@ export class SatelliteProcess implements Satellite {
) AS updates
WHERE updates.op_rowid = ${oplog}.rowid
`,
args: [timestamp.toISOString()],
}
args: [timestamp.toISOString()],
}

// For each affected shadow row, set new tag array, unless the last oplog operation was a DELETE
const q3: Statement = {
sql: `
// For each affected shadow row, set new tag array, unless the last oplog operation was a DELETE
const q3: Statement = {
sql: `
INSERT OR REPLACE INTO ${shadow} (namespace, tablename, primaryKey, tags)
SELECT namespace, tablename, primaryKey, ?
FROM ${oplog} AS op
WHERE timestamp = ?
GROUP BY namespace, tablename, primaryKey
HAVING rowid = max(rowid) AND optype != 'DELETE'
`,
args: [encodeTags([newTag]), timestamp.toISOString()],
}
args: [encodeTags([newTag]), timestamp.toISOString()],
}

// And finally delete any shadow rows where the last oplog operation was a `DELETE`
// We do an inner join in a CTE instead of a `WHERE EXISTS (...)` since this is not reliant on re-executing a query per every row in shadow table, but uses a PK join instead.
const q4: Statement = {
sql: `
// And finally delete any shadow rows where the last oplog operation was a `DELETE`
// We do an inner join in a CTE instead of a `WHERE EXISTS (...)` since this is not reliant on re-executing a query per every row in shadow table, but uses a PK join instead.
const q4: Statement = {
sql: `
WITH _to_be_deleted (rowid) AS (
SELECT shadow.rowid
FROM ${oplog} AS op
Expand All @@ -920,40 +920,45 @@ export class SatelliteProcess implements Satellite {
DELETE FROM ${shadow}
WHERE rowid IN _to_be_deleted
`,
args: [timestamp.toISOString()],
}
args: [timestamp.toISOString()],
}

// Execute the four queries above in a transaction, returning the results from the first query
// We're dropping down to this transaction interface because `runInTransaction` doesn't allow queries
const oplogEntries = (await this.adapter.transaction<OplogEntry[]>(
(tx, setResult) => {
tx.query(q1, (tx, res) => {
if (res.length > 0)
tx.run(q2, (tx) =>
tx.run(q3, (tx) =>
tx.run(q4, () => setResult(res as unknown as OplogEntry[]))
// Execute the four queries above in a transaction, returning the results from the first query
// We're dropping down to this transaction interface because `runInTransaction` doesn't allow queries
const oplogEntries = (await this.adapter.transaction<OplogEntry[]>(
(tx, setResult) => {
tx.query(q1, (tx, res) => {
if (res.length > 0)
tx.run(q2, (tx) =>
tx.run(q3, (tx) =>
tx.run(q4, () => setResult(res as unknown as OplogEntry[]))
)
)
)
else {
setResult([])
}
})
}
)) as OplogEntry[]
else {
setResult([])
}
})
}
)) as OplogEntry[]

if (oplogEntries.length > 0) this._notifyChanges(oplogEntries)
if (oplogEntries.length > 0) this._notifyChanges(oplogEntries)

if (!this.client.isClosed()) {
const enqueued = this.client.getLastSentLsn()
const enqueuedLogPos = bytesToNumber(enqueued)
if (!this.client.isClosed()) {
const enqueued = this.client.getLastSentLsn()
const enqueuedLogPos = bytesToNumber(enqueued)

// TODO: handle case where pending oplog is large
await this._getEntries(enqueuedLogPos).then((missing) =>
this._replicateSnapshotChanges(missing)
)
// TODO: handle case where pending oplog is large
await this._getEntries(enqueuedLogPos).then((missing) =>
this._replicateSnapshotChanges(missing)
)
}
return timestamp
} catch (e: any) {
Log.error(`error performing snapshot: ${e}`)
throw e
} finally {
this.performingSnapshot = false
}
this.performingSnapshot = false
return timestamp
}

async _notifyChanges(results: OplogEntry[]): Promise<void> {
Expand Down
24 changes: 19 additions & 5 deletions clients/typescript/test/satellite/process.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1783,12 +1783,26 @@ test.serial('connection backoff success', async (t) => {
)
})

// TODO: implement reconnect protocol
// check that performing snapshot doesn't throw without resetting the performing snapshot assertions
test('(regression) performSnapshot handles exceptions gracefully', async (t) => {
const { adapter, runMigrations, satellite, authState } = t.context
await runMigrations()
await satellite._setAuthState(authState)

// test('resume out of window clears subscriptions and clears oplog after ack', async (t) => {})
const error = 'FAKE TRANSACTION'

// test('not possible to subscribe while oplog is not pushed', async (t) => {})
const txnFn = adapter.transaction
adapter.transaction = () => {
throw new Error(error)
}

// test('process restart loads previous subscriptions', async (t) => {})
try {
await satellite._performSnapshot()
} catch (e: any) {
t.is(e.message, error)
adapter.transaction = txnFn
}

// test('oplog messages allowed between SatSubsRep and SatSubsDataBegin', async (t) => {})
await satellite._performSnapshot()
t.pass()
})

0 comments on commit d60e9ce

Please sign in to comment.