Skip to content

Commit

Permalink
pglite-sync: do initial sync as a copy from
Browse files Browse the repository at this point in the history
  • Loading branch information
samwillis committed Oct 16, 2024
1 parent 047d959 commit 96275c6
Showing 1 changed file with 114 additions and 0 deletions.
114 changes: 114 additions & 0 deletions packages/pglite-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ export type MapColumnsFn = (message: ChangeMessage<any>) => Record<string, any>
export type MapColumns = MapColumnsMap | MapColumnsFn
export type ShapeKey = string

type InsertChangeMessage = ChangeMessage<any> & {
headers: { operation: 'insert' }
}

export interface SyncShapeToTableOptions {
shape: ShapeStreamOptions
table: string
Expand Down Expand Up @@ -67,6 +71,12 @@ async function createPlugin(
}
}

// If its a new subscription there is no state to resume from
const isNewSubscription = shapeSubState === null

// If its a new subscription we can do a `COPY FROM` to insert the initial data
let doCopy = isNewSubscription

const aborter = new AbortController()
if (options.shape.signal) {
// we new to have our own aborter to be able to abort the stream
Expand Down Expand Up @@ -135,6 +145,47 @@ async function createPlugin(
}
}

if (doCopy) {
// We can do a `COPY FROM` to insert the initial data
// Split messageAggregator into initial inserts and remaining messages
const initialInserts: InsertChangeMessage[] = []
const remainingMessages: ChangeMessage<any>[] = []
let foundNonInsert = false
for (const message of messageAggregator) {
if (
!foundNonInsert &&
message.headers.operation === 'insert'
) {
initialInserts.push(message as InsertChangeMessage)
} else {
foundNonInsert = true
remainingMessages.push(message)
}
}
if (initialInserts.length > 0) {
// As `COPY FROM` doesn't trigger a NOTIFY, we pop
// the last insert message and and add it to the be beginning
// of the remaining messages to be applied after the `COPY FROM`
remainingMessages.unshift(initialInserts.pop()!)
}
messageAggregator = remainingMessages

// Do the `COPY FROM` with initial inserts
if (initialInserts.length > 0) {
applyMessagesToTableWithCopy({
pg: tx,
table: options.table,
schema: options.schema,
messages: initialInserts as InsertChangeMessage[],
mapColumns: options.mapColumns,
primaryKey: options.primaryKey,
debug,
})
// We don't want to do a `COPY FROM` again after that
doCopy = false
}
}

for (const changeMessage of messageAggregator) {
await applyMessageToTable({
pg: tx,
Expand Down Expand Up @@ -326,6 +377,69 @@ async function applyMessageToTable({
}
}

interface ApplyMessagesToTableWithCopyOptions {
pg: PGliteInterface | Transaction
table: string
schema?: string
messages: InsertChangeMessage[]
mapColumns?: MapColumns
primaryKey: string[]
debug: boolean
}

async function applyMessagesToTableWithCopy({
pg,
table,
schema = 'public',
messages,
mapColumns,
debug,
}: ApplyMessagesToTableWithCopyOptions) {
if (debug) console.log('applying messages with COPY')

// Map the messages to the data to be inserted
const data: Record<string, any>[] = messages.map((message) =>
mapColumns ? doMapColumns(mapColumns, message) : message.value,
)

// Get column names from the first message
const columns = Object.keys(data[0])

// Create CSV data
const csvData = data
.map((message) => {
return columns
.map((column) => {
const value = message[column]
// Escape double quotes and wrap in quotes if necessary
if (
typeof value === 'string' &&
(value.includes(',') || value.includes('"') || value.includes('\n'))
) {
return `"${value.replace(/"/g, '""')}"`
}
return value === null ? '\\N' : value
})
.join(',')
})
.join('\n')

// Perform COPY FROM
await pg.query(
`
COPY "${schema}"."${table}" (${columns.map((c) => `"${c}"`).join(', ')})
FROM '/dev/blob'
WITH (FORMAT csv, NULL '\\N')
`,
[],
{
blob: new Blob([csvData], { type: 'text/csv' }),
},
)

if (debug) console.log(`Inserted ${messages.length} rows using COPY`)
}

interface GetShapeSubscriptionStateOptions {
pg: PGliteInterface | Transaction
metadataSchema: string
Expand Down

0 comments on commit 96275c6

Please sign in to comment.