Skip to content

Commit

Permalink
Post review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
samwillis committed Dec 9, 2024
1 parent b37985e commit a9143ea
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions packages/pglite-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ async function createPlugin(

const commit = async () => {
if (messageAggregator.length === 0 && !truncateNeeded) return
const shapeHandle = stream.shapeHandle // The shape handle could change while we are committing
await pg.transaction(async (tx) => {
if (debug) {
console.log('committing message batch', messageAggregator.length)
Expand Down Expand Up @@ -240,13 +241,13 @@ async function createPlugin(
if (
options.shapeKey &&
messageAggregator.length > 0 &&
stream.shapeHandle !== undefined
shapeHandle !== undefined
) {
await updateShapeSubscriptionState({
pg: tx,
metadataSchema,
shapeKey: options.shapeKey,
shapeId: stream.shapeHandle,
shapeId: shapeHandle,
lastOffset:
messageAggregator[messageAggregator.length - 1].offset,
})
Expand All @@ -258,8 +259,12 @@ async function createPlugin(
await new Promise((resolve) => setTimeout(resolve, 0))
}

const throttledCommit = async () => {
const throttledCommit = async ({ reset = false }: { reset?: boolean } = {}) => {
const now = Date.now()
if (reset) {
// Reset the last commit time to 0, forcing the next commit to happen immediately
lastCommitAt = 0
}
if (options.commitThrottle && debug)
console.log(
'throttled commit: now:',
Expand Down Expand Up @@ -322,7 +327,7 @@ async function createPlugin(

case 'up-to-date':
// perform all accumulated changes and store stream state
await commit() // not throttled, we want this to happen ASAP
await throttledCommit({ reset: true }) // not throttled, we want this to happen ASAP
if (
isNewSubscription &&
!onInitialSyncCalled &&
Expand Down

0 comments on commit a9143ea

Please sign in to comment.