Skip to content

Commit

Permalink
split out initial sync
Browse files Browse the repository at this point in the history
  • Loading branch information
samwillis committed Nov 28, 2024
1 parent 827a23e commit a28c961
Showing 1 changed file with 108 additions and 21 deletions.
129 changes: 108 additions & 21 deletions demos/linearlite/src/pglite-worker.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { worker } from '@electric-sql/pglite/worker'
import { PGlite, Mutex } from '@electric-sql/pglite'
import { live, type PGliteWithLive } from '@electric-sql/pglite/live'
import { electricSync } from '@electric-sql/pglite-sync'
import { electricSync, type PGliteWithSync } from '@electric-sql/pglite-sync'
import { migrate } from './migrations'
import type { IssueChange, CommentChange, ChangeSet } from './utils/changes'

const WRITE_SERVER_URL = import.meta.env.VITE_WRITE_SERVER_URL
const ELECTRIC_URL = import.meta.env.VITE_ELECTRIC_URL
const APPLY_CHANGES_URL = `${WRITE_SERVER_URL}/apply-changes`

type PGliteWithExtensions = PGliteWithLive & PGliteWithSync

worker({
async init() {
const pg = await PGlite.create({
Expand All @@ -20,33 +22,118 @@ worker({
live,
},
})

// Migrate the database to the latest schema
await migrate(pg)
await pg.sync.syncShapeToTable({
shape: {
url: `${ELECTRIC_URL}/v1/shape`,
table: 'issue',
},
table: 'issue',
primaryKey: ['id'],
shapeKey: 'issues',
})
await pg.sync.syncShapeToTable({
shape: {
url: `${ELECTRIC_URL}/v1/shape`,
table: 'comment',
},
table: 'comment',
primaryKey: ['id'],
shapeKey: 'comments',
})

// This waits for the last weeks data to sync to the database
await startSyncToDatabase(pg)

startWritePath(pg)

return pg
},
})

const INITIAL_SYNC_DAYS = 7
// We can set this to a specific date to sync from, or leave it blank to sync from 30 days ago
// this is used for the demo to sync from a specific date based on what we have in the demo data
const INITIAL_SYNC_FROM_DATE = import.meta.env.VITE_INITIAL_SYNC_FROM_DATE ?? '2024-11-28T00:00:00.000Z'

async function initialSyncToDatabase(pg: PGliteWithExtensions) {
// We are going to first sync just the last weeks data.
// To make this cache efficient lets sync to the previous Monday that is at least
// 7 days prior to today.
const today = new Date()
const syncFrom = new Date(INITIAL_SYNC_FROM_DATE ?? today)
if (!INITIAL_SYNC_FROM_DATE) {
syncFrom.setDate(
today.getDate() - (INITIAL_SYNC_DAYS + ((today.getDay() + 6) % 7))
)
}

console.log('syncing from', syncFrom.toISOString())

const issuesSync = await pg.sync.syncShapeToTable({
shape: {
url: `${ELECTRIC_URL}/v1/shape`,
table: 'issue',
where: `created >= '${syncFrom.toISOString()}'`,
},
table: 'issue',
primaryKey: ['id'],
})
const issueSyncUpToDate = new Promise<void>((resolve, reject) => {
issuesSync.subscribe(() => {
// Subscribe will be called when the sync is up to date
// at which point we can unsubscribe and resolve the promise
console.log('issue sync up to date')
issuesSync.unsubscribe()
resolve()
}, reject)
})
const commentsSync = await pg.sync.syncShapeToTable({
shape: {
url: `${ELECTRIC_URL}/v1/shape`,
table: 'comment',
where: `created >= '${syncFrom.toISOString()}'`,
},
table: 'comment',
primaryKey: ['id'],
})
const commentSyncUpToDate = new Promise<void>((resolve, reject) => {
commentsSync.subscribe(() => {
// Subscribe will be called when the sync is up to date
// at which point we can unsubscribe and resolve the promise
console.log('comment sync up to date')
commentsSync.unsubscribe()
resolve()
}, reject)
})
// Wait for both syncs to complete
await Promise.all([issueSyncUpToDate, commentSyncUpToDate])
}

async function startSyncToDatabase(pg: PGliteWithExtensions) {
// First sync the last weeks data if the database is empty
const issueCount = await pg.query<{ count: number }>(`
SELECT count(id) as count FROM issue
`)
if (issueCount.rows[0].count === 0) {
console.log('initial sync to database')
await initialSyncToDatabase(pg)
console.log('initial sync to database complete')
}

// Finally start the full sync
const throttle = 100 // used during initial sync to prevent too many renders
pg.sync.syncShapeToTable({
shape: {
url: `${ELECTRIC_URL}/v1/shape`,
table: 'issue',
},
table: 'issue',
primaryKey: ['id'],
shapeKey: 'issues',
commitGranularity: 'transaction',
commitThrottle: throttle,
})
pg.sync.syncShapeToTable({
shape: {
url: `${ELECTRIC_URL}/v1/shape`,
table: 'comment',
},
table: 'comment',
primaryKey: ['id'],
shapeKey: 'comments',
commitGranularity: 'transaction',
commitThrottle: throttle,
})
}

const syncMutex = new Mutex()

async function startWritePath(pg: PGliteWithLive) {
async function startWritePath(pg: PGliteWithExtensions) {
// Use a live query to watch for changes to the local tables that need to be synced
pg.live.query<{
issue_count: number
Expand All @@ -73,7 +160,7 @@ async function startWritePath(pg: PGliteWithLive) {
}

// Call wrapped in mutex to prevent multiple syncs from happening at the same time
async function doSyncToServer(pg: PGliteWithLive) {
async function doSyncToServer(pg: PGliteWithExtensions) {
let issueChanges: IssueChange[]
let commentChanges: CommentChange[]
await pg.transaction(async (tx) => {
Expand Down

0 comments on commit a28c961

Please sign in to comment.