diff --git a/demos/linearlite/src/pglite-worker.ts b/demos/linearlite/src/pglite-worker.ts index 06b238ed..ce0b35a3 100644 --- a/demos/linearlite/src/pglite-worker.ts +++ b/demos/linearlite/src/pglite-worker.ts @@ -1,7 +1,7 @@ import { worker } from '@electric-sql/pglite/worker' import { PGlite, Mutex } from '@electric-sql/pglite' import { live, type PGliteWithLive } from '@electric-sql/pglite/live' -import { electricSync } from '@electric-sql/pglite-sync' +import { electricSync, type PGliteWithSync } from '@electric-sql/pglite-sync' import { migrate } from './migrations' import type { IssueChange, CommentChange, ChangeSet } from './utils/changes' @@ -9,6 +9,8 @@ const WRITE_SERVER_URL = import.meta.env.VITE_WRITE_SERVER_URL const ELECTRIC_URL = import.meta.env.VITE_ELECTRIC_URL const APPLY_CHANGES_URL = `${WRITE_SERVER_URL}/apply-changes` +type PGliteWithExtensions = PGliteWithLive & PGliteWithSync + worker({ async init() { const pg = await PGlite.create({ @@ -20,33 +22,118 @@ worker({ live, }, }) + + // Migrate the database to the latest schema await migrate(pg) - await pg.sync.syncShapeToTable({ - shape: { - url: `${ELECTRIC_URL}/v1/shape`, - table: 'issue', - }, - table: 'issue', - primaryKey: ['id'], - shapeKey: 'issues', - }) - await pg.sync.syncShapeToTable({ - shape: { - url: `${ELECTRIC_URL}/v1/shape`, - table: 'comment', - }, - table: 'comment', - primaryKey: ['id'], - shapeKey: 'comments', - }) + + // This waits for the last weeks data to sync to the database + await startSyncToDatabase(pg) + startWritePath(pg) + return pg }, }) +const INITIAL_SYNC_DAYS = 7 +// We can set this to a specific date to sync from, or leave it blank to sync from 30 days ago +// this is used for the demo to sync from a specific date based on what we have in the demo data +const INITIAL_SYNC_FROM_DATE = import.meta.env.VITE_INITIAL_SYNC_FROM_DATE ?? '2024-11-28T00:00:00.000Z' + +async function initialSyncToDatabase(pg: PGliteWithExtensions) { + // We are going to first sync just the last weeks data. + // To make this cache efficient lets sync to the previous Monday that is at least + // 7 days prior to today. + const today = new Date() + const syncFrom = new Date(INITIAL_SYNC_FROM_DATE ?? today) + if (!INITIAL_SYNC_FROM_DATE) { + syncFrom.setDate( + today.getDate() - (INITIAL_SYNC_DAYS + ((today.getDay() + 6) % 7)) + ) + } + + console.log('syncing from', syncFrom.toISOString()) + + const issuesSync = await pg.sync.syncShapeToTable({ + shape: { + url: `${ELECTRIC_URL}/v1/shape`, + table: 'issue', + where: `created >= '${syncFrom.toISOString()}'`, + }, + table: 'issue', + primaryKey: ['id'], + }) + const issueSyncUpToDate = new Promise((resolve, reject) => { + issuesSync.subscribe(() => { + // Subscribe will be called when the sync is up to date + // at which point we can unsubscribe and resolve the promise + console.log('issue sync up to date') + issuesSync.unsubscribe() + resolve() + }, reject) + }) + const commentsSync = await pg.sync.syncShapeToTable({ + shape: { + url: `${ELECTRIC_URL}/v1/shape`, + table: 'comment', + where: `created >= '${syncFrom.toISOString()}'`, + }, + table: 'comment', + primaryKey: ['id'], + }) + const commentSyncUpToDate = new Promise((resolve, reject) => { + commentsSync.subscribe(() => { + // Subscribe will be called when the sync is up to date + // at which point we can unsubscribe and resolve the promise + console.log('comment sync up to date') + commentsSync.unsubscribe() + resolve() + }, reject) + }) + // Wait for both syncs to complete + await Promise.all([issueSyncUpToDate, commentSyncUpToDate]) +} + +async function startSyncToDatabase(pg: PGliteWithExtensions) { + // First sync the last weeks data if the database is empty + const issueCount = await pg.query<{ count: number }>(` + SELECT count(id) as count FROM issue + `) + if (issueCount.rows[0].count === 0) { + console.log('initial sync to database') + await initialSyncToDatabase(pg) + console.log('initial sync to database complete') + } + + // Finally start the full sync + const throttle = 100 // used during initial sync to prevent too many renders + pg.sync.syncShapeToTable({ + shape: { + url: `${ELECTRIC_URL}/v1/shape`, + table: 'issue', + }, + table: 'issue', + primaryKey: ['id'], + shapeKey: 'issues', + commitGranularity: 'transaction', + commitThrottle: throttle, + }) + pg.sync.syncShapeToTable({ + shape: { + url: `${ELECTRIC_URL}/v1/shape`, + table: 'comment', + }, + table: 'comment', + primaryKey: ['id'], + shapeKey: 'comments', + commitGranularity: 'transaction', + commitThrottle: throttle, + }) +} + const syncMutex = new Mutex() -async function startWritePath(pg: PGliteWithLive) { +async function startWritePath(pg: PGliteWithExtensions) { // Use a live query to watch for changes to the local tables that need to be synced pg.live.query<{ issue_count: number @@ -73,7 +160,7 @@ async function startWritePath(pg: PGliteWithLive) { } // Call wrapped in mutex to prevent multiple syncs from happening at the same time -async function doSyncToServer(pg: PGliteWithLive) { +async function doSyncToServer(pg: PGliteWithExtensions) { let issueChanges: IssueChange[] let commentChanges: CommentChange[] await pg.transaction(async (tx) => {