Skip to content

Commit

Permalink
Add commitGranularity parameter in the syncShapeToTable
Browse files Browse the repository at this point in the history
  • Loading branch information
samwillis committed Dec 8, 2024
1 parent bfddaaa commit e6ca903
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 112 deletions.
5 changes: 5 additions & 0 deletions .changeset/happy-jokes-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/pglite-sync': patch
---

Add options for the `commitGranularity` parameter in the `syncShapeToTable` function, enabling the user to choose how often the sync should commit.
298 changes: 186 additions & 112 deletions packages/pglite-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ type InsertChangeMessage = ChangeMessage<any> & {
headers: { operation: 'insert' }
}

/**
* The granularity of the commit operation.
* - `up-to-date`: Commit all messages when the `up-to-date` message is received.
* - `transaction`: Commit all messages within transactions inferred from the LSN prefix of the offset.
* - `operation`: Commit each message in its own transaction.
* - `number`: Commit every N messages.
* Note a commit will always be performed on the `up-to-date` message.
*/
export type CommitGranularity =
| 'up-to-date'
| 'transaction'
| 'operation'
| number

export interface SyncShapeToTableOptions {
shape: ShapeStreamOptions
table: string
Expand All @@ -28,6 +42,9 @@ export interface SyncShapeToTableOptions {
primaryKey: string[]
shapeKey?: ShapeKey
useCopy?: boolean
commitGranularity?: CommitGranularity
commitThrottle?: number
onInitialSync?: () => void
}

export interface ElectricSyncOptions {
Expand All @@ -53,6 +70,11 @@ async function createPlugin(

const namespaceObj = {
syncShapeToTable: async (options: SyncShapeToTableOptions) => {
await firstRun()
options = {
commitGranularity: 'up-to-date',
...options,
}
if (shapePerTableLock.has(options.table)) {
throw new Error('Already syncing shape for table ' + options.table)
}
Expand Down Expand Up @@ -102,123 +124,165 @@ async function createPlugin(
// or use a separate connection to hold a long transaction
let messageAggregator: ChangeMessage<any>[] = []
let truncateNeeded = false
let lastLSN: string | null = null
let lastCommitAt: number = 0

const commit = async () => {
if (messageAggregator.length === 0 && !truncateNeeded) return
await pg.transaction(async (tx) => {
if (debug) {
console.log('committing message batch', messageAggregator.length)
console.time('commit')
}

// Set the syncing flag to true during this transaction so that
// user defined triggers on the table are able to chose how to run
// during a sync
tx.exec(`SET LOCAL ${metadataSchema}.syncing = true;`)

if (truncateNeeded) {
truncateNeeded = false
// TODO: sync into shadow table and reference count
// for now just clear the whole table - will break
// cases with multiple shapes on the same table
await tx.exec(`DELETE FROM ${options.table};`)
if (options.shapeKey) {
await deleteShapeSubscriptionState({
pg: tx,
metadataSchema,
shapeKey: options.shapeKey,
})
}
}

if (doCopy) {
// We can do a `COPY FROM` to insert the initial data
// Split messageAggregator into initial inserts and remaining messages
const initialInserts: InsertChangeMessage[] = []
const remainingMessages: ChangeMessage<any>[] = []
let foundNonInsert = false
for (const message of messageAggregator) {
if (!foundNonInsert && message.headers.operation === 'insert') {
initialInserts.push(message as InsertChangeMessage)
} else {
foundNonInsert = true
remainingMessages.push(message)
}
}
if (initialInserts.length > 0) {
// As `COPY FROM` doesn't trigger a NOTIFY, we pop
// the last insert message and and add it to the be beginning
// of the remaining messages to be applied after the `COPY FROM`
remainingMessages.unshift(initialInserts.pop()!)
}
messageAggregator = remainingMessages

// Do the `COPY FROM` with initial inserts
if (initialInserts.length > 0) {
applyMessagesToTableWithCopy({
pg: tx,
table: options.table,
schema: options.schema,
messages: initialInserts as InsertChangeMessage[],
mapColumns: options.mapColumns,
primaryKey: options.primaryKey,
debug,
})
// We don't want to do a `COPY FROM` again after that
doCopy = false
}
}

for (const changeMessage of messageAggregator) {
await applyMessageToTable({
pg: tx,
table: options.table,
schema: options.schema,
message: changeMessage,
mapColumns: options.mapColumns,
primaryKey: options.primaryKey,
debug,
})
}

if (
options.shapeKey &&
messageAggregator.length > 0 &&
stream.shapeHandle !== undefined
) {
await updateShapeSubscriptionState({
pg: tx,
metadataSchema,
shapeKey: options.shapeKey,
shapeId: stream.shapeHandle,
lastOffset:
messageAggregator[messageAggregator.length - 1].offset,
})
}
})
if (debug) console.timeEnd('commit')
messageAggregator = []
// Await a timeout to start a new task and allow other connections to do work
await new Promise((resolve) => setTimeout(resolve, 0))
}

const throttledCommit = async () => {
if (
options.commitThrottle &&
Date.now() - lastCommitAt < options.commitThrottle
) {
return
}
lastCommitAt = Date.now()
await commit()
}

stream.subscribe(async (messages) => {
if (debug) console.log('sync messages received', messages)

for (const message of messages) {
// accumulate change messages for committing all at once
if (isChangeMessage(message)) {
messageAggregator.push(message)
continue
}

// perform actual DB operations upon receiving control messages
if (!isControlMessage(message)) continue
switch (message.headers.control) {
// mark table as needing truncation before next batch commit
case 'must-refetch':
if (debug) console.log('refetching shape')
truncateNeeded = true
messageAggregator = []

break

// perform all accumulated changes and store stream state
case 'up-to-date':
await pg.transaction(async (tx) => {
if (debug) console.log('up-to-date, committing all messages')

// Set the syncing flag to true during this transaction so that
// user defined triggers on the table are able to chose how to run
// during a sync
tx.exec(`SET LOCAL ${metadataSchema}.syncing = true;`)

if (truncateNeeded) {
truncateNeeded = false
// TODO: sync into shadow table and reference count
// for now just clear the whole table - will break
// cases with multiple shapes on the same table
await tx.exec(`DELETE FROM ${options.table};`)
if (options.shapeKey) {
await deleteShapeSubscriptionState({
pg: tx,
metadataSchema,
shapeKey: options.shapeKey,
})
}
}

if (doCopy) {
// We can do a `COPY FROM` to insert the initial data
// Split messageAggregator into initial inserts and remaining messages
const initialInserts: InsertChangeMessage[] = []
const remainingMessages: ChangeMessage<any>[] = []
let foundNonInsert = false
for (const message of messageAggregator) {
if (
!foundNonInsert &&
message.headers.operation === 'insert'
) {
initialInserts.push(message as InsertChangeMessage)
} else {
foundNonInsert = true
remainingMessages.push(message)
}
}
if (initialInserts.length > 0) {
// As `COPY FROM` doesn't trigger a NOTIFY, we pop
// the last insert message and and add it to the be beginning
// of the remaining messages to be applied after the `COPY FROM`
remainingMessages.unshift(initialInserts.pop()!)
}
messageAggregator = remainingMessages

// Do the `COPY FROM` with initial inserts
if (initialInserts.length > 0) {
applyMessagesToTableWithCopy({
pg: tx,
table: options.table,
schema: options.schema,
messages: initialInserts as InsertChangeMessage[],
mapColumns: options.mapColumns,
primaryKey: options.primaryKey,
debug,
})
// We don't want to do a `COPY FROM` again after that
doCopy = false
}
}
const newLSN = message.offset.split('_')[0]
if (newLSN !== lastLSN) {
// If the LSN has changed and granularity is set to transaction
// we need to commit the current batch.
// This is done before we accumulate any more messages as they are
// part of the next transaction batch.
if (options.commitGranularity === 'transaction') {
await throttledCommit()
}
lastLSN = newLSN
}

for (const changeMessage of messageAggregator) {
await applyMessageToTable({
pg: tx,
table: options.table,
schema: options.schema,
message: changeMessage,
mapColumns: options.mapColumns,
primaryKey: options.primaryKey,
debug,
})
}
// accumulate change messages for committing all at once or in batches
messageAggregator.push(message)

if (
options.shapeKey &&
messageAggregator.length > 0 &&
stream.shapeHandle !== undefined
) {
await updateShapeSubscriptionState({
pg: tx,
metadataSchema,
shapeKey: options.shapeKey,
shapeId: stream.shapeHandle,
lastOffset:
messageAggregator[messageAggregator.length - 1].offset,
})
if (options.commitGranularity === 'operation') {
// commit after each operation if granularity is set to operation
await throttledCommit()
} else if (typeof options.commitGranularity === 'number') {
// commit after every N messages if granularity is set to a number
if (messageAggregator.length >= options.commitGranularity) {
await throttledCommit()
}
}
} else if (isControlMessage(message)) {
switch (message.headers.control) {
case 'must-refetch':
// mark table as needing truncation before next batch commit
if (debug) console.log('refetching shape')
truncateNeeded = true
messageAggregator = []
break

case 'up-to-date':
// perform all accumulated changes and store stream state
await commit() // not throttled, we want this to happen ASAP
if (isNewSubscription && options.onInitialSync) {
options.onInitialSync()
}
})
messageAggregator = []
break
break
}
}
}
})
Expand Down Expand Up @@ -258,7 +322,11 @@ async function createPlugin(
}
}

const init = async () => {
let firstRunDone = false

const firstRun = async () => {
if (firstRunDone) return
firstRunDone = true
await migrateShapeMetadataTables({
pg,
metadataSchema,
Expand All @@ -268,19 +336,25 @@ async function createPlugin(
return {
namespaceObj,
close,
init,
}
}

export type SyncNamespaceObj = Awaited<
ReturnType<typeof createPlugin>
>['namespaceObj']

export type PGliteWithSync = PGliteInterface & {
sync: SyncNamespaceObj
}

export function electricSync(options?: ElectricSyncOptions) {
return {
name: 'ElectricSQL Sync',
setup: async (pg: PGliteInterface) => {
const { namespaceObj, close, init } = await createPlugin(pg, options)
const { namespaceObj, close } = await createPlugin(pg, options)
return {
namespaceObj,
close,
init,
}
},
} satisfies Extension
Expand Down

0 comments on commit e6ca903

Please sign in to comment.