diff --git a/.changeset/happy-jokes-yawn.md b/.changeset/happy-jokes-yawn.md new file mode 100644 index 000000000..6756f0f48 --- /dev/null +++ b/.changeset/happy-jokes-yawn.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/pglite-sync': patch +--- + +Add options for the `commitGranularity` parameter in the `syncShapeToTable` function, enabling the user to choose how often the sync should commit. diff --git a/packages/pglite-sync/src/index.ts b/packages/pglite-sync/src/index.ts index 8c9b27d49..df5d46b08 100644 --- a/packages/pglite-sync/src/index.ts +++ b/packages/pglite-sync/src/index.ts @@ -20,6 +20,20 @@ type InsertChangeMessage = ChangeMessage & { headers: { operation: 'insert' } } +/** + * The granularity of the commit operation. + * - `up-to-date`: Commit all messages when the `up-to-date` message is received. + * - `transaction`: Commit all messages within transactions inferred from the LSN prefix of the offset. + * - `operation`: Commit each message in its own transaction. + * - `number`: Commit every N messages. + * Note a commit will always be performed on the `up-to-date` message. + */ +export type CommitGranularity = + | 'up-to-date' + | 'transaction' + | 'operation' + | number + export interface SyncShapeToTableOptions { shape: ShapeStreamOptions table: string @@ -28,6 +42,9 @@ export interface SyncShapeToTableOptions { primaryKey: string[] shapeKey?: ShapeKey useCopy?: boolean + commitGranularity?: CommitGranularity + commitThrottle?: number + onInitialSync?: () => void } export interface ElectricSyncOptions { @@ -53,6 +70,11 @@ async function createPlugin( const namespaceObj = { syncShapeToTable: async (options: SyncShapeToTableOptions) => { + await firstRun() + options = { + commitGranularity: 'up-to-date', + ...options, + } if (shapePerTableLock.has(options.table)) { throw new Error('Already syncing shape for table ' + options.table) } @@ -102,123 +124,165 @@ async function createPlugin( // or use a separate connection to hold a long transaction let messageAggregator: ChangeMessage[] = [] let truncateNeeded = false + let lastLSN: string | null = null + let lastCommitAt: number = 0 + + const commit = async () => { + if (messageAggregator.length === 0 && !truncateNeeded) return + await pg.transaction(async (tx) => { + if (debug) { + console.log('committing message batch', messageAggregator.length) + console.time('commit') + } + + // Set the syncing flag to true during this transaction so that + // user defined triggers on the table are able to chose how to run + // during a sync + tx.exec(`SET LOCAL ${metadataSchema}.syncing = true;`) + + if (truncateNeeded) { + truncateNeeded = false + // TODO: sync into shadow table and reference count + // for now just clear the whole table - will break + // cases with multiple shapes on the same table + await tx.exec(`DELETE FROM ${options.table};`) + if (options.shapeKey) { + await deleteShapeSubscriptionState({ + pg: tx, + metadataSchema, + shapeKey: options.shapeKey, + }) + } + } + + if (doCopy) { + // We can do a `COPY FROM` to insert the initial data + // Split messageAggregator into initial inserts and remaining messages + const initialInserts: InsertChangeMessage[] = [] + const remainingMessages: ChangeMessage[] = [] + let foundNonInsert = false + for (const message of messageAggregator) { + if (!foundNonInsert && message.headers.operation === 'insert') { + initialInserts.push(message as InsertChangeMessage) + } else { + foundNonInsert = true + remainingMessages.push(message) + } + } + if (initialInserts.length > 0) { + // As `COPY FROM` doesn't trigger a NOTIFY, we pop + // the last insert message and and add it to the be beginning + // of the remaining messages to be applied after the `COPY FROM` + remainingMessages.unshift(initialInserts.pop()!) + } + messageAggregator = remainingMessages + + // Do the `COPY FROM` with initial inserts + if (initialInserts.length > 0) { + applyMessagesToTableWithCopy({ + pg: tx, + table: options.table, + schema: options.schema, + messages: initialInserts as InsertChangeMessage[], + mapColumns: options.mapColumns, + primaryKey: options.primaryKey, + debug, + }) + // We don't want to do a `COPY FROM` again after that + doCopy = false + } + } + + for (const changeMessage of messageAggregator) { + await applyMessageToTable({ + pg: tx, + table: options.table, + schema: options.schema, + message: changeMessage, + mapColumns: options.mapColumns, + primaryKey: options.primaryKey, + debug, + }) + } + + if ( + options.shapeKey && + messageAggregator.length > 0 && + stream.shapeHandle !== undefined + ) { + await updateShapeSubscriptionState({ + pg: tx, + metadataSchema, + shapeKey: options.shapeKey, + shapeId: stream.shapeHandle, + lastOffset: + messageAggregator[messageAggregator.length - 1].offset, + }) + } + }) + if (debug) console.timeEnd('commit') + messageAggregator = [] + // Await a timeout to start a new task and allow other connections to do work + await new Promise((resolve) => setTimeout(resolve, 0)) + } + + const throttledCommit = async () => { + if ( + options.commitThrottle && + Date.now() - lastCommitAt < options.commitThrottle + ) { + return + } + lastCommitAt = Date.now() + await commit() + } stream.subscribe(async (messages) => { if (debug) console.log('sync messages received', messages) for (const message of messages) { - // accumulate change messages for committing all at once if (isChangeMessage(message)) { - messageAggregator.push(message) - continue - } - - // perform actual DB operations upon receiving control messages - if (!isControlMessage(message)) continue - switch (message.headers.control) { - // mark table as needing truncation before next batch commit - case 'must-refetch': - if (debug) console.log('refetching shape') - truncateNeeded = true - messageAggregator = [] - - break - - // perform all accumulated changes and store stream state - case 'up-to-date': - await pg.transaction(async (tx) => { - if (debug) console.log('up-to-date, committing all messages') - - // Set the syncing flag to true during this transaction so that - // user defined triggers on the table are able to chose how to run - // during a sync - tx.exec(`SET LOCAL ${metadataSchema}.syncing = true;`) - - if (truncateNeeded) { - truncateNeeded = false - // TODO: sync into shadow table and reference count - // for now just clear the whole table - will break - // cases with multiple shapes on the same table - await tx.exec(`DELETE FROM ${options.table};`) - if (options.shapeKey) { - await deleteShapeSubscriptionState({ - pg: tx, - metadataSchema, - shapeKey: options.shapeKey, - }) - } - } - - if (doCopy) { - // We can do a `COPY FROM` to insert the initial data - // Split messageAggregator into initial inserts and remaining messages - const initialInserts: InsertChangeMessage[] = [] - const remainingMessages: ChangeMessage[] = [] - let foundNonInsert = false - for (const message of messageAggregator) { - if ( - !foundNonInsert && - message.headers.operation === 'insert' - ) { - initialInserts.push(message as InsertChangeMessage) - } else { - foundNonInsert = true - remainingMessages.push(message) - } - } - if (initialInserts.length > 0) { - // As `COPY FROM` doesn't trigger a NOTIFY, we pop - // the last insert message and and add it to the be beginning - // of the remaining messages to be applied after the `COPY FROM` - remainingMessages.unshift(initialInserts.pop()!) - } - messageAggregator = remainingMessages - - // Do the `COPY FROM` with initial inserts - if (initialInserts.length > 0) { - applyMessagesToTableWithCopy({ - pg: tx, - table: options.table, - schema: options.schema, - messages: initialInserts as InsertChangeMessage[], - mapColumns: options.mapColumns, - primaryKey: options.primaryKey, - debug, - }) - // We don't want to do a `COPY FROM` again after that - doCopy = false - } - } + const newLSN = message.offset.split('_')[0] + if (newLSN !== lastLSN) { + // If the LSN has changed and granularity is set to transaction + // we need to commit the current batch. + // This is done before we accumulate any more messages as they are + // part of the next transaction batch. + if (options.commitGranularity === 'transaction') { + await throttledCommit() + } + lastLSN = newLSN + } - for (const changeMessage of messageAggregator) { - await applyMessageToTable({ - pg: tx, - table: options.table, - schema: options.schema, - message: changeMessage, - mapColumns: options.mapColumns, - primaryKey: options.primaryKey, - debug, - }) - } + // accumulate change messages for committing all at once or in batches + messageAggregator.push(message) - if ( - options.shapeKey && - messageAggregator.length > 0 && - stream.shapeHandle !== undefined - ) { - await updateShapeSubscriptionState({ - pg: tx, - metadataSchema, - shapeKey: options.shapeKey, - shapeId: stream.shapeHandle, - lastOffset: - messageAggregator[messageAggregator.length - 1].offset, - }) + if (options.commitGranularity === 'operation') { + // commit after each operation if granularity is set to operation + await throttledCommit() + } else if (typeof options.commitGranularity === 'number') { + // commit after every N messages if granularity is set to a number + if (messageAggregator.length >= options.commitGranularity) { + await throttledCommit() + } + } + } else if (isControlMessage(message)) { + switch (message.headers.control) { + case 'must-refetch': + // mark table as needing truncation before next batch commit + if (debug) console.log('refetching shape') + truncateNeeded = true + messageAggregator = [] + break + + case 'up-to-date': + // perform all accumulated changes and store stream state + await commit() // not throttled, we want this to happen ASAP + if (isNewSubscription && options.onInitialSync) { + options.onInitialSync() } - }) - messageAggregator = [] - break + break + } } } }) @@ -258,7 +322,11 @@ async function createPlugin( } } - const init = async () => { + let firstRunDone = false + + const firstRun = async () => { + if (firstRunDone) return + firstRunDone = true await migrateShapeMetadataTables({ pg, metadataSchema, @@ -268,19 +336,25 @@ async function createPlugin( return { namespaceObj, close, - init, } } +export type SyncNamespaceObj = Awaited< + ReturnType +>['namespaceObj'] + +export type PGliteWithSync = PGliteInterface & { + sync: SyncNamespaceObj +} + export function electricSync(options?: ElectricSyncOptions) { return { name: 'ElectricSQL Sync', setup: async (pg: PGliteInterface) => { - const { namespaceObj, close, init } = await createPlugin(pg, options) + const { namespaceObj, close } = await createPlugin(pg, options) return { namespaceObj, close, - init, } }, } satisfies Extension