diff --git a/examples/write-patterns/patterns/3-combine-on-read/index.tsx b/examples/write-patterns/patterns/3-combine-on-read/index.tsx index 8889989134..cee009057b 100644 --- a/examples/write-patterns/patterns/3-combine-on-read/index.tsx +++ b/examples/write-patterns/patterns/3-combine-on-read/index.tsx @@ -1,16 +1,15 @@ import React, { useState } from 'react' import { v4 as uuidv4 } from 'uuid' -import { PGlite } from '@electric-sql/pglite' -import { PGliteWithLive, live } from '@electric-sql/pglite/live' import { PGliteProvider, useLiveQuery, usePGlite, } from '@electric-sql/pglite-react' -import { electricSync } from '@electric-sql/pglite-sync' import api from '../../shared/app/client' +import pglite from '../../shared/app/db' + import localSchemaMigrations from './local-schema.sql?raw' const ELECTRIC_URL = import.meta.env.ELECTRIC_URL || 'http://localhost:3000' @@ -22,14 +21,9 @@ type Todo = { created_at: Date } -const pglite: PGliteWithLive = await PGlite.create({ - extensions: { - electric: electricSync(), - live, - }, -}) await pglite.exec(localSchemaMigrations) +// This starts the read path sync using Electric. await pglite.electric.syncShapeToTable({ shape: { url: `${ELECTRIC_URL}/v1/shape`, diff --git a/examples/write-patterns/patterns/4-through-the-db/index.tsx b/examples/write-patterns/patterns/4-through-the-db/index.tsx new file mode 100644 index 0000000000..ca072b1d8c --- /dev/null +++ b/examples/write-patterns/patterns/4-through-the-db/index.tsx @@ -0,0 +1,147 @@ +import React from 'react' +import { v4 as uuidv4 } from 'uuid' + +import { + PGliteProvider, + useLiveQuery, + usePGlite, +} from '@electric-sql/pglite-react' + +import pglite from '../../shared/app/db' + +import SyncChanges from './sync' +import localSchemaMigrations from './local-schema.sql?raw' + +const ELECTRIC_URL = import.meta.env.ELECTRIC_URL || 'http://localhost:3000' + +type Todo = { + id: string + title: string + completed: boolean + created_at: Date +} + +// Note that the resources defined in the schema for this pattern +// are all suffixed with `p4_`. +await pglite.exec(localSchemaMigrations) + +// This starts the read path sync using Electric. +await pglite.electric.syncShapeToTable({ + shape: { + url: `${ELECTRIC_URL}/v1/shape`, + table: 'todos', + }, + shapeKey: 'todos', + table: 'p4_todos_synced', + primaryKey: ['id'], +}) + +// This starts the write path sync of changes captured in the triggers from +// writes to the local DB. +const syncChanges = new SyncChanges(pglite) +syncChanges.start() + +export default function Wrapper() { + return ( + + + + ) +} + +function ThroughTheDB() { + const db = usePGlite() + const results = useLiveQuery('SELECT * FROM p4_todos ORDER BY created_at') + + async function createTodo(event: React.FormEvent) { + event.preventDefault() + + const form = event.target as HTMLFormElement + const formData = new FormData(form) + const title = formData.get('todo') as string + + await db.sql` + INSERT INTO p4_todos ( + id, + title, + completed, + created_at + ) + VALUES ( + ${uuidv4()}, + ${title}, + ${false}, + ${new Date()} + ) + ` + + form.reset() + } + + async function updateTodo(todo: Todo) { + const { id, completed } = todo + + await db.sql` + UPDATE p4_todos + SET completed = ${!completed} + WHERE id = ${id} + ` + } + + async function deleteTodo(event: React.MouseEvent, todo: Todo) { + event.preventDefault() + + await db.sql` + DELETE FROM p4_todos + WHERE id = ${todo.id} + ` + } + + if (results === undefined) { + return
Loading …
+ } + + const todos = results.rows + + // The template below the heading is identical to the other patterns. + + // prettier-ignore + return ( +
+

+ + 4. Through the DB sync + +

+ +
+ + +
+
+ ) +} diff --git a/examples/write-patterns/patterns/4-through-the-db/local-schema.sql b/examples/write-patterns/patterns/4-through-the-db/local-schema.sql index 25c739ea05..0a7de2c099 100644 --- a/examples/write-patterns/patterns/4-through-the-db/local-schema.sql +++ b/examples/write-patterns/patterns/4-through-the-db/local-schema.sql @@ -1,15 +1,18 @@ -- This is the local database schema for PGlite. --- The `todos_synced` table for immutable, synced state from the server. -CREATE TABLE IF NOT EXISTS todos_synced ( +-- Note that the resources are prefixed by a `p4` namespace (standing for pattern 4) +-- in order to avoid clashing with the resources defined in pattern 3. + +-- The `p4_todos_synced` table for immutable, synced state from the server. +CREATE TABLE IF NOT EXISTS p4_todos_synced ( id UUID PRIMARY KEY, title TEXT NOT NULL, completed BOOLEAN NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL ); --- The `todos_local` table for local optimistic state. -CREATE TABLE IF NOT EXISTS todos_local ( +-- The `p4_todos_local` table for local optimistic state. +CREATE TABLE IF NOT EXISTS p4_todos_local ( id UUID PRIMARY KEY, title TEXT, completed BOOLEAN, @@ -18,8 +21,8 @@ CREATE TABLE IF NOT EXISTS todos_local ( is_deleted BOOLEAN DEFAULT FALSE ); --- The `todos` view to combine the two tables on read. -CREATE OR REPLACE VIEW todos AS +-- The `p4_todos` view to combine the two tables on read. +CREATE OR REPLACE VIEW p4_todos AS SELECT COALESCE(local.id, synced.id) AS id, CASE @@ -37,28 +40,28 @@ CREATE OR REPLACE VIEW todos AS THEN local.created_at ELSE synced.created_at END AS created_at - FROM todos_synced AS synced - FULL OUTER JOIN todos_local AS local + FROM p4_todos_synced AS synced + FULL OUTER JOIN p4_todos_local AS local ON synced.id = local.id WHERE local.id IS NULL OR local.is_deleted = FALSE; -- A trigger to automatically remove local optimistic state. -CREATE OR REPLACE FUNCTION delete_local_on_sync_trigger() +CREATE OR REPLACE FUNCTION p4_delete_local_on_sync_trigger() RETURNS TRIGGER AS $$ BEGIN - DELETE FROM todos_local WHERE id = OLD.id; + DELETE FROM p4_todos_local WHERE id = OLD.id; RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE OR REPLACE TRIGGER delete_local_on_sync -AFTER INSERT OR UPDATE OR DELETE ON todos_synced +CREATE OR REPLACE TRIGGER p4_delete_local_on_sync +AFTER INSERT OR UPDATE OR DELETE ON p4_todos_synced FOR EACH ROW -EXECUTE FUNCTION delete_local_on_sync_trigger(); +EXECUTE FUNCTION p4_delete_local_on_sync_trigger(); -- The local `changes` table for capturing and persisting a log -- of local write operations that we want to sync to the server. -CREATE TABLE IF NOT EXISTS changes ( +CREATE TABLE IF NOT EXISTS p4_changes ( id BIGSERIAL PRIMARY KEY, operation TEXT NOT NULL, value JSONB NOT NULL, @@ -70,17 +73,17 @@ CREATE TABLE IF NOT EXISTS changes ( -- 2. to capture write operations and write change messages into the -- The insert trigger -CREATE OR REPLACE FUNCTION todos_insert_trigger() +CREATE OR REPLACE FUNCTION p4_todos_insert_trigger() RETURNS TRIGGER AS $$ BEGIN - IF EXISTS (SELECT 1 FROM todos_synced WHERE id = NEW.id) THEN + IF EXISTS (SELECT 1 FROM p4_todos_synced WHERE id = NEW.id) THEN RAISE EXCEPTION 'Cannot insert: id already exists in the synced table'; END IF; - IF EXISTS (SELECT 1 FROM todos_local WHERE id = NEW.id) THEN + IF EXISTS (SELECT 1 FROM p4_todos_local WHERE id = NEW.id) THEN RAISE EXCEPTION 'Cannot insert: id already exists in the local table'; END IF; - INSERT INTO todos_local ( + INSERT INTO p4_todos_local ( id, title, completed, @@ -95,7 +98,7 @@ BEGIN ARRAY['title', 'completed', 'created_at'] ); - INSERT INTO changes ( + INSERT INTO p4_changes ( operation, value, transaction_id @@ -116,16 +119,16 @@ END; $$ LANGUAGE plpgsql; -- The update trigger -CREATE OR REPLACE FUNCTION todos_update_trigger() +CREATE OR REPLACE FUNCTION p4_todos_update_trigger() RETURNS TRIGGER AS $$ DECLARE - synced todos_synced%ROWTYPE; - local todos_local%ROWTYPE; + synced p4_todos_synced%ROWTYPE; + local p4_todos_local%ROWTYPE; changed_cols TEXT[] := '{}'; BEGIN -- Fetch the corresponding rows from the synced and local tables - SELECT * INTO synced FROM todos_synced WHERE id = NEW.id; - SELECT * INTO local FROM todos_local WHERE id = NEW.id; + SELECT * INTO synced FROM p4_todos_synced WHERE id = NEW.id; + SELECT * INTO local FROM p4_todos_local WHERE id = NEW.id; -- If the row is not present in the local table, insert it IF NOT FOUND THEN @@ -140,7 +143,7 @@ BEGIN changed_cols := array_append(changed_cols, 'created_at'); END IF; - INSERT INTO todos_local ( + INSERT INTO p4_todos_local ( id, title, completed, @@ -158,7 +161,7 @@ BEGIN -- Otherwise, if the row is already in the local table, update it and adjust -- the changed_columns ELSE - UPDATE todos_local + UPDATE p4_todos_local SET title = CASE @@ -196,19 +199,18 @@ BEGIN THEN COALESCE(NEW.created_at, local.created_at) IS DISTINCT FROM synced.created_at END ) - ), - synced_at = NULL + ) WHERE id = NEW.id; END IF; - INSERT INTO changes ( + INSERT INTO p4_changes ( operation, value, transaction_id ) VALUES ( 'update', - json_strip_nulls( + jsonb_strip_nulls( jsonb_build_object( 'id', NEW.id, 'title', NEW.title, @@ -224,29 +226,26 @@ END; $$ LANGUAGE plpgsql; -- The delete trigger -CREATE OR REPLACE FUNCTION todos_delete_trigger() +CREATE OR REPLACE FUNCTION p4_todos_delete_trigger() RETURNS TRIGGER AS $$ BEGIN - IF EXISTS (SELECT 1 FROM todos_local WHERE id = OLD.id) THEN - UPDATE todos_local + IF EXISTS (SELECT 1 FROM p4_todos_local WHERE id = OLD.id) THEN + UPDATE p4_todos_local SET - is_deleted = TRUE, - synced_at = NULL + is_deleted = TRUE WHERE id = OLD.id; ELSE - INSERT INTO todos_local ( + INSERT INTO p4_todos_local ( id, - is_deleted, - synced_at + is_deleted ) VALUES ( OLD.id, - TRUE, - NULL + TRUE ); END IF; - INSERT INTO changes ( + INSERT INTO p4_changes ( operation, value, transaction_id @@ -254,7 +253,7 @@ BEGIN VALUES ( 'delete', jsonb_build_object( - 'id', NEW.id + 'id', OLD.id ), pg_current_xact_id() ); @@ -263,17 +262,30 @@ BEGIN END; $$ LANGUAGE plpgsql; -CREATE OR REPLACE TRIGGER todos_insert -INSTEAD OF INSERT ON todos +CREATE OR REPLACE TRIGGER p4_todos_insert +INSTEAD OF INSERT ON p4_todos +FOR EACH ROW +EXECUTE FUNCTION p4_todos_insert_trigger(); + +CREATE OR REPLACE TRIGGER p4_todos_update +INSTEAD OF UPDATE ON p4_todos FOR EACH ROW -EXECUTE FUNCTION todos_insert_trigger(); +EXECUTE FUNCTION p4_todos_update_trigger(); -CREATE OR REPLACE TRIGGER todos_update -INSTEAD OF UPDATE ON todos +CREATE OR REPLACE TRIGGER p4_todos_delete +INSTEAD OF DELETE ON p4_todos FOR EACH ROW -EXECUTE FUNCTION todos_update_trigger(); +EXECUTE FUNCTION p4_todos_delete_trigger(); + +CREATE OR REPLACE FUNCTION p4_changes_notify_trigger() +RETURNS TRIGGER AS $$ +BEGIN + NOTIFY p4_changes; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; -CREATE OR REPLACE TRIGGER todos_delete -INSTEAD OF DELETE ON todos +CREATE OR REPLACE TRIGGER p4_changes_notify +AFTER INSERT ON p4_changes FOR EACH ROW -EXECUTE FUNCTION todos_delete_trigger(); +EXECUTE FUNCTION p4_changes_notify_trigger(); diff --git a/examples/write-patterns/patterns/4-through-the-db/sync.ts b/examples/write-patterns/patterns/4-through-the-db/sync.ts new file mode 100644 index 0000000000..8334f8623b --- /dev/null +++ b/examples/write-patterns/patterns/4-through-the-db/sync.ts @@ -0,0 +1,201 @@ +import { type Operation } from '@electric-sql/client' +import { type PGliteWithLive } from '@electric-sql/pglite/live' + +import api from '../../shared/app/client' + +type TransactionId = string + +type Change = { + operation: Operation + value: { + id: string + title?: string + completed?: boolean + created_at?: Date + } + transaction_id: TransactionId +} + +type SendResult = + 'accepted' | + 'rejected' | + 'retry' + +/* + * Minimal, naive synchronization utility, just to illustrate the pattern of + * `listen` to `changes` and `POST` them to the api server. + */ +export default class LocalChangeSynchronizer { + #db: PGliteWithLive + #position: TransactionId + + #status: 'idle' | 'processing' = 'idle' + #hasChangedWhileProcessing: boolean = false + + #unsubscribe?: () => Promise + #shouldContinue: boolean = true + + constructor(db: PGliteWithLive, position = '0') { + console.log('new LocalChangeSynchronizer', db) + + this.#db = db + this.#position = position + } + + /* + * Start by listening for notifications. + */ + async start(): Promise { + console.log('start') + + this.#unsubscribe = await this.#db.listen('p4_changes', this.handle.bind(this)) + } + + /* + * On notify, either kick off processing or note down that there were changes + * so we can process them straightaway on the next loop. + */ + async handle(): Promise { + console.log('handle') + + if (this.#status === 'processing') { + this.#hasChangedWhileProcessing = true + + return + } + + this.process() + } + + // Process the changes by fetching them and posting them to the server. + // If the changes are accepted then proceed, otherwise rollback or retry. + async process(): Promise { + console.log('process', this.#position) + + this.#status === 'processing' + this.#hasChangedWhileProcessing = false + + const { changes, position } = await this.query() + + if (changes.length) { + const result: SendResult = await this.send(changes) + + switch (result) { + case 'accepted': + await this.proceed(position) + + break; + + case 'rejected': + await this.rollback() + + break; + + case 'retry': + this.#hasChangedWhileProcessing = true + + break; + } + } + + if (this.#hasChangedWhileProcessing && this.#shouldContinue) { + return await this.process() + } + + this.#status === 'idle' + } + + /* + * Fetch the current batch of changes + */ + async query(): Promise<{ changes: Change[], position: TransactionId}> { + console.log('query') + + const { rows } = await this.#db.sql` + SELECT * from p4_changes + WHERE transaction_id > ${this.#position} + ORDER BY + transaction_id asc, + id asc + ` + + console.log('rows', rows) + + const position = rows.length + ? rows.at(-1)!.transaction_id + : this.#position + + return { + changes: rows, + position + } + } + + /* + * Send the current batch of changes to the server, grouped by transaction. + */ + async send(changes: Change[]): Promise { + console.log('send', changes) + + const path = '/changes' + + const groups = Object.groupBy(changes, x => x.transaction_id) + const sorted = Object.entries(groups).sort((a,b) => b[0].localeCompare(a[0])) + const transactions = sorted.map(([transaction_id, changes]) => { + return { + id: transaction_id, + changes: changes + } + }) + + const response = await api.request(path, 'POST', transactions) + + if (response === undefined) { + return 'retry' + } + + if (response instanceof Response) { + return response.status < 500 ? 'rejected' : 'retry' + } + + return 'accepted' + } + + /* + * Proceed by clearing the processed changes and moving the position forward. + */ + async proceed(position: TransactionId): Promise { + console.log('proceed', position) + + await this.#db.sql` + DELETE from p4_changes + WHERE id <= ${position} + ` + + this.#position = position + } + + /* + * Rollback with an extremely naive strategy: if any write is rejected, simply + * wipe the entire local state. + */ + async rollback(): Promise { + console.log('rollback') + + await this.#db.transaction(async (tx) => { + await tx.sql`DELETE from p4_changes` + await tx.sql`DELETE from p4_todos_local` + }) + } + + /* + * Stop synchronizing + */ + async stop(): Promise { + this.#shouldContinue = false + + if (this.#unsubscribe !== undefined) { + await this.#unsubscribe() + } + } +} diff --git a/examples/write-patterns/patterns/index.ts b/examples/write-patterns/patterns/index.ts index e90488e18f..120e96e3a8 100644 --- a/examples/write-patterns/patterns/index.ts +++ b/examples/write-patterns/patterns/index.ts @@ -1,4 +1,4 @@ export { default as OnlineWrites } from './1-online-writes' export { default as OptimisticState } from './2-optimistic-state' export { default as CombineOnRead } from './3-combine-on-read' -// export { default as ThroughTheDB } from './4-through-the-db' +export { default as ThroughTheDB } from './4-through-the-db' diff --git a/examples/write-patterns/shared/app/App.tsx b/examples/write-patterns/shared/app/App.tsx index bb06b8303b..6c55878b92 100644 --- a/examples/write-patterns/shared/app/App.tsx +++ b/examples/write-patterns/shared/app/App.tsx @@ -1,6 +1,11 @@ import './style.css' -import { CombineOnRead, OnlineWrites, OptimisticState } from '../../patterns' +import { + CombineOnRead, + OnlineWrites, + OptimisticState, + ThroughTheDB +} from '../../patterns' const App = () => { return ( @@ -8,7 +13,7 @@ const App = () => { -
+ ) } diff --git a/examples/write-patterns/shared/app/client.ts b/examples/write-patterns/shared/app/client.ts index 7500579930..ac5d5dd84b 100644 --- a/examples/write-patterns/shared/app/client.ts +++ b/examples/write-patterns/shared/app/client.ts @@ -42,6 +42,8 @@ async function resilientFetch( return await response.json() } + return response + // Could also retry here if you want to be resilient // to 4xx and 5xx responses as well as network errors } catch (err) { diff --git a/examples/write-patterns/shared/app/db.ts b/examples/write-patterns/shared/app/db.ts new file mode 100644 index 0000000000..79ae25620b --- /dev/null +++ b/examples/write-patterns/shared/app/db.ts @@ -0,0 +1,12 @@ +import { PGlite } from '@electric-sql/pglite' +import { PGliteWithLive, live } from '@electric-sql/pglite/live' +import { electricSync } from '@electric-sql/pglite-sync' + +const pglite: PGliteWithLive = await PGlite.create({ + extensions: { + electric: electricSync(), + live, + }, +}) + +export default pglite \ No newline at end of file diff --git a/examples/write-patterns/shared/app/style.css b/examples/write-patterns/shared/app/style.css index 49c2bdc03d..d77fc923dc 100644 --- a/examples/write-patterns/shared/app/style.css +++ b/examples/write-patterns/shared/app/style.css @@ -31,7 +31,7 @@ code { background-color: #262628; border-radius:10px; - min-height: calc(50vh - 80px); + min-height: calc(50vh - 115px); background-image: url(/shared/app/icons/icon.svg); background-position: 17px 23px;