From bbea896709ef1416b8a14615a3d98b321bac4e72 Mon Sep 17 00:00:00 2001 From: James Arthur Date: Fri, 6 Dec 2024 21:33:34 +0100 Subject: [PATCH] examples: upgrade the write-patterns merge logic. --- .../patterns/3-shared-persistent/README.md | 9 +++ .../patterns/3-shared-persistent/index.tsx | 73 +++++++++++------ .../patterns/4-through-the-db/README.md | 18 ++--- .../4-through-the-db/local-schema.sql | 81 ++++++++++++++----- .../patterns/4-through-the-db/sync.ts | 37 +++++---- examples/write-patterns/shared/app/client.ts | 19 ++--- examples/write-patterns/shared/backend/api.js | 47 ++++++----- .../shared/migrations/02-add-write-id.sql | 11 +++ website/docs/guides/writes.md | 12 +-- 9 files changed, 192 insertions(+), 115 deletions(-) create mode 100644 examples/write-patterns/shared/migrations/02-add-write-id.sql diff --git a/examples/write-patterns/patterns/3-shared-persistent/README.md b/examples/write-patterns/patterns/3-shared-persistent/README.md index 9bbb97b6e3..8b1951dcdf 100644 --- a/examples/write-patterns/patterns/3-shared-persistent/README.md +++ b/examples/write-patterns/patterns/3-shared-persistent/README.md @@ -28,6 +28,15 @@ Combining data on-read makes local reads slightly slower. Writes are still made via an API. This can often be helpful and pragmatic, allowing you to [re-use your existing API](https://electric-sql.com/blog/2024/11/21/local-first-with-your-existing-api). However, you may want to avoid running an API and leverage [through the DB sync](../4-through-the-db) for a purer local-first approach. +## Implementation notes + +The merge logic in the `matchWrite` function supports rebasing local optimistic state on concurrent updates from other users. + +This differs from the previous optimistic state example, in that it matches inserts and updates on the `write_id`, rather than the `id`. This means that concurrent updates to the same row will not +clear the optimistic state, which allows it to be rebased on changes made concurrently to the same data by other users. + +Note that we still match deletes by `id`, because delete operations can't update the `write_id` column. If you'd like to support revertable concurrent deletes, you can use soft deletes (which are obviously actually updates). + ## How to run See the [How to run](../../README.md#how-to-run) section in the example README. diff --git a/examples/write-patterns/patterns/3-shared-persistent/index.tsx b/examples/write-patterns/patterns/3-shared-persistent/index.tsx index 52d4ee2220..4c3c5b6f45 100644 --- a/examples/write-patterns/patterns/3-shared-persistent/index.tsx +++ b/examples/write-patterns/patterns/3-shared-persistent/index.tsx @@ -22,14 +22,14 @@ type PartialTodo = Partial & { id: string } -type Write = { - key: string +type LocalWrite = { + id: string operation: Operation value: PartialTodo } // Define a shared, persistent, reactive store for local optimistic state. -const optimisticState = proxyMap( +const optimisticState = proxyMap( JSON.parse(localStorage.getItem(KEY) || '[]') ) subscribe(optimisticState, () => { @@ -39,15 +39,16 @@ subscribe(optimisticState, () => { /* * Add a local write to the optimistic state */ -function addLocalWrite(operation: Operation, value: PartialTodo): Write { - const key = uuidv4() - const write: Write = { - key, +function addLocalWrite(operation: Operation, value: PartialTodo): LocalWrite { + const id = uuidv4() + + const write: LocalWrite = { + id, operation, value, } - optimisticState.set(key, write) + optimisticState.set(id, write) return write } @@ -56,29 +57,50 @@ function addLocalWrite(operation: Operation, value: PartialTodo): Write { * Subscribe to the shape `stream` until the local write syncs back through it. * At which point, delete the local write from the optimistic state. */ -async function matchWrite(stream: ShapeStream, write: Write) { - const { key, operation, value } = write +async function matchWrite( + stream: ShapeStream, + write: LocalWrite +): Promise { + const { operation, value } = write + + const matchFn = + operation === 'delete' + ? matchBy('id', value.id) + : matchBy('write_id', write.id) try { - await matchStream(stream, [operation], matchBy('id', value.id)) + await matchStream(stream, [operation], matchFn) } catch (_err) { return } - optimisticState.delete(key) + optimisticState.delete(write.id) } /* * Make an HTTP request to send the write to the API server. * If the request fails, delete the local write from the optimistic state. + * If it succeeds, return the `txid` of the write from the response data. */ -async function sendRequest(path: string, method: string, write: Write) { - const { key, value } = write +async function sendRequest( + path: string, + method: string, + { id, value }: LocalWrite +): Promise { + const data = { + ...value, + write_id: id, + } + let response: Response | undefined try { - await api.request(path, method, value) - } catch (_err) { - optimisticState.delete(key) + response = await api.request(path, method, data) + } catch (err) { + // ignore + } + + if (response === undefined || !response.ok) { + optimisticState.delete(id) } } @@ -95,15 +117,16 @@ export default function SharedPersistent() { timestamptz: (value: string) => new Date(value), }, }) + const sorted = data ? data.sort((a, b) => +a.created_at - +b.created_at) : [] // Get the local optimistic state. - const writes = useSnapshot>(optimisticState) + const localWrites = useSnapshot>(optimisticState) // Merge the synced state with the local state. - const todos = writes + const todos = localWrites .values() - .reduce((synced: Todo[], { operation, value }: Write) => { + .reduce((synced: Todo[], { operation, value }: LocalWrite) => { switch (operation) { case 'insert': return synced.some((todo) => todo.id === value.id) @@ -140,7 +163,6 @@ export default function SharedPersistent() { startTransition(async () => { const write = addLocalWrite('insert', data) - const fetchPromise = sendRequest(path, 'POST', write) const syncPromise = matchWrite(stream, write) @@ -155,13 +177,12 @@ export default function SharedPersistent() { const path = `/todos/${id}` const data = { - id: id, + id, completed: !completed, } startTransition(async () => { const write = addLocalWrite('update', data) - const fetchPromise = sendRequest(path, 'PUT', write) const syncPromise = matchWrite(stream, write) @@ -175,10 +196,12 @@ export default function SharedPersistent() { const { id } = todo const path = `/todos/${id}` + const data = { + id, + } startTransition(async () => { - const write = addLocalWrite('delete', { id }) - + const write = addLocalWrite('delete', data) const fetchPromise = sendRequest(path, 'DELETE', write) const syncPromise = matchWrite(stream, write) diff --git a/examples/write-patterns/patterns/4-through-the-db/README.md b/examples/write-patterns/patterns/4-through-the-db/README.md index c08bb5438e..726e396243 100644 --- a/examples/write-patterns/patterns/4-through-the-db/README.md +++ b/examples/write-patterns/patterns/4-through-the-db/README.md @@ -36,23 +36,15 @@ Good use-cases include: Using a local embedded database adds a relatively-heavy dependency to your app. The shadow table and trigger machinery complicate your client side schema definition. -## Complexities - -### 1. Merge logic - -The entrypoint in the code for merge logic is the very blunt `delete_local_on_synced_trigger` defined in the [`./local-schema.sql`](./local-schema.sql). The current implementation just wipes any local state for a row when any insert, updater or delete to that row syncs in from the server. - -This approach works and is simple to reason about. However, it won't preserve local changes on top of concurrent changes by other users (or tabs or devices). More sophisticated implementations could do more sophisticated merge logic here. Such as rebasing the local changes on the new server state. This typically involved maintaining more bookkeeping info and having more complex triggers. - -### 2. Rollbacks - Syncing changes in the background complicates any potential rollback handling. In the [shared persistent optimistic state](../../3-shared-persistent) pattern, you can detect a write being rejected by the server whilst still in context, handling user input. With through the database sync, this context is harder to reconstruct. -In this example implementation, we implement an extremely blunt rollback strategy of clearing all local state and writes in the event of any write being rejected by the server. +## Implementation notes + +The merge logic in the `delete_local_on_synced_insert_and_update_trigger` in [`./local-schema.sql`](./local-schema.sql) supports rebasing local optimistic state on concurrent updates from other users. -You may want to implement a more nuanced strategy and, for example, provide information to the user about what is happening and / or minimise data loss by only clearing local-state that's causally dependent on a rejected write. This opens the door to a lot of complexity that may best be addressed by using an existing framework. +The rollback strategy in the `rollback` method of the `ChangeLogSynchronizer` in [`./sync.ts`](./sync.ts) is very naive: clearing all local state and writes in the event of any write being rejected by the server. You may want to implement a more nuanced strategy. For example, to provide information to the user about what is happening and / or minimise data loss by only clearing local-state that's causally dependent on a rejected write. -See the [Writes guide](https://electric-sql.com/docs/guides/writes) for more information and links to [existing frameworks](https://electric-sql.com/docs/guides/writes#tools). +This opens the door to a lot of complexity that may best be addressed by using an existing framework. See the [Writes guide](https://electric-sql.com/docs/guides/writes) for more information and links to [existing frameworks](https://electric-sql.com/docs/guides/writes#tools). ## How to run diff --git a/examples/write-patterns/patterns/4-through-the-db/local-schema.sql b/examples/write-patterns/patterns/4-through-the-db/local-schema.sql index 7d9b8bebcd..d20a2133e1 100644 --- a/examples/write-patterns/patterns/4-through-the-db/local-schema.sql +++ b/examples/write-patterns/patterns/4-through-the-db/local-schema.sql @@ -10,7 +10,9 @@ CREATE TABLE IF NOT EXISTS todos_synced ( id UUID PRIMARY KEY, title TEXT NOT NULL, completed BOOLEAN NOT NULL, - created_at TIMESTAMP WITH TIME ZONE NOT NULL + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + -- Bookkeeping column. + write_id UUID ); -- The `todos_local` table for local optimistic state. @@ -19,8 +21,10 @@ CREATE TABLE IF NOT EXISTS todos_local ( title TEXT, completed BOOLEAN, created_at TIMESTAMP WITH TIME ZONE, + -- Bookkeeping columns. changed_columns TEXT[], - is_deleted BOOLEAN DEFAULT FALSE + is_deleted BOOLEAN NOT NULL DEFAULT FALSE, + write_id UUID NOT NULL ); -- The `todos` view to combine the two tables on read. @@ -47,22 +51,41 @@ CREATE OR REPLACE VIEW todos AS ON synced.id = local.id WHERE local.id IS NULL OR local.is_deleted = FALSE; --- A trigger to automatically remove local optimistic state when the --- corresponding row syncs over the replication stream. This is a blunt --- merge strategy. More sophisticated apps can implement more --- sophisticated merge / rebase strategies. -CREATE OR REPLACE FUNCTION delete_local_on_sync_trigger() +-- Triggers to automatically remove local optimistic state when the corresponding +-- row syncs over the replication stream. Match on `write_id`, to allow local +-- state to be rebased on concurrent changes to the same row. +CREATE OR REPLACE FUNCTION delete_local_on_synced_insert_and_update_trigger() RETURNS TRIGGER AS $$ BEGIN - DELETE FROM todos_local WHERE id = OLD.id; + DELETE FROM todos_local + WHERE id = NEW.id + AND write_id IS NOT NULL + AND write_id = NEW.write_id; RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE OR REPLACE TRIGGER delete_local_on_sync -AFTER INSERT OR UPDATE OR DELETE ON todos_synced +-- N.b.: deletes can be concurrent, but can't update the `write_id` and aren't +-- revertable (once a row is deleted, it would be re-created with an insert), +-- so its safe to just match on ID. You could implement revertable concurrent +-- deletes using soft deletes (which are actually updates). +CREATE OR REPLACE FUNCTION delete_local_on_synced_delete_trigger() +RETURNS TRIGGER AS $$ +BEGIN + DELETE FROM todos_local WHERE id = OLD.id; + RETURN OLD; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER delete_local_on_synced_insert +AFTER INSERT ON todos_synced +FOR EACH ROW +EXECUTE FUNCTION delete_local_on_synced_insert_trigger(); + +CREATE OR REPLACE TRIGGER delete_local_on_synced_insert_and_update +AFTER UPDATE ON todos_synced FOR EACH ROW -EXECUTE FUNCTION delete_local_on_sync_trigger(); +EXECUTE FUNCTION delete_local_on_synced_insert_and_update_trigger(); -- The local `changes` table for capturing and persisting a log -- of local write operations that we want to sync to the server. @@ -70,6 +93,7 @@ CREATE TABLE IF NOT EXISTS changes ( id BIGSERIAL PRIMARY KEY, operation TEXT NOT NULL, value JSONB NOT NULL, + write_id UUID NOT NULL, transaction_id XID8 NOT NULL ); @@ -80,6 +104,8 @@ CREATE TABLE IF NOT EXISTS changes ( -- The insert trigger CREATE OR REPLACE FUNCTION todos_insert_trigger() RETURNS TRIGGER AS $$ +DECLARE + local_write_id UUID := gen_random_uuid(); BEGIN IF EXISTS (SELECT 1 FROM todos_synced WHERE id = NEW.id) THEN RAISE EXCEPTION 'Cannot insert: id already exists in the synced table'; @@ -94,20 +120,23 @@ BEGIN title, completed, created_at, - changed_columns + changed_columns, + write_id ) VALUES ( NEW.id, NEW.title, NEW.completed, NEW.created_at, - ARRAY['title', 'completed', 'created_at'] + ARRAY['title', 'completed', 'created_at'], + local_write_id ); -- Record the write operation in the change log. INSERT INTO changes ( operation, value, + write_id, transaction_id ) VALUES ( @@ -118,6 +147,7 @@ BEGIN 'completed', NEW.completed, 'created_at', NEW.created_at ), + local_write_id, pg_current_xact_id() ); @@ -132,6 +162,7 @@ DECLARE synced todos_synced%ROWTYPE; local todos_local%ROWTYPE; changed_cols TEXT[] := '{}'; + local_write_id UUID := gen_random_uuid(); BEGIN -- Fetch the corresponding rows from the synced and local tables SELECT * INTO synced FROM todos_synced WHERE id = NEW.id; @@ -155,14 +186,16 @@ BEGIN title, completed, created_at, - changed_columns + changed_columns, + write_id ) VALUES ( NEW.id, NEW.title, NEW.completed, NEW.created_at, - changed_cols + changed_cols, + local_write_id ); -- Otherwise, if the row is already in the local table, update it and adjust @@ -206,7 +239,8 @@ BEGIN THEN COALESCE(NEW.created_at, local.created_at) IS DISTINCT FROM synced.created_at END ) - ) + ), + write_id = local_write_id WHERE id = NEW.id; END IF; @@ -214,6 +248,7 @@ BEGIN INSERT INTO changes ( operation, value, + write_id, transaction_id ) VALUES ( @@ -226,6 +261,7 @@ BEGIN 'created_at', NEW.created_at ) ), + local_write_id, pg_current_xact_id() ); @@ -236,21 +272,26 @@ $$ LANGUAGE plpgsql; -- The delete trigger CREATE OR REPLACE FUNCTION todos_delete_trigger() RETURNS TRIGGER AS $$ +DECLARE + local_write_id UUID := gen_random_uuid(); BEGIN -- Upsert a soft-deletion record in the local table. IF EXISTS (SELECT 1 FROM todos_local WHERE id = OLD.id) THEN UPDATE todos_local SET - is_deleted = TRUE + is_deleted = TRUE, + write_id = local_write_id WHERE id = OLD.id; ELSE INSERT INTO todos_local ( id, - is_deleted + is_deleted, + write_id ) VALUES ( OLD.id, - TRUE + TRUE, + local_write_id ); END IF; @@ -258,6 +299,7 @@ BEGIN INSERT INTO changes ( operation, value, + write_id, transaction_id ) VALUES ( @@ -265,6 +307,7 @@ BEGIN jsonb_build_object( 'id', OLD.id ), + local_write_id, pg_current_xact_id() ); diff --git a/examples/write-patterns/patterns/4-through-the-db/sync.ts b/examples/write-patterns/patterns/4-through-the-db/sync.ts index 25bc4e2adb..98b832a776 100644 --- a/examples/write-patterns/patterns/4-through-the-db/sync.ts +++ b/examples/write-patterns/patterns/4-through-the-db/sync.ts @@ -3,9 +3,8 @@ import { type PGliteWithLive } from '@electric-sql/pglite/live' import api from '../../shared/app/client' -type TransactionId = string - type Change = { + id: number operation: Operation value: { id: string @@ -13,7 +12,8 @@ type Change = { completed?: boolean created_at?: Date } - transaction_id: TransactionId + write_id: string + transaction_id: string } type SendResult = 'accepted' | 'rejected' | 'retry' @@ -24,7 +24,7 @@ type SendResult = 'accepted' | 'rejected' | 'retry' */ export default class ChangeLogSynchronizer { #db: PGliteWithLive - #position: TransactionId + #position: number #hasChangedWhileProcessing: boolean = false #shouldContinue: boolean = true @@ -33,7 +33,7 @@ export default class ChangeLogSynchronizer { #abortController?: AbortController #unsubscribe?: () => Promise - constructor(db: PGliteWithLive, position = '0') { + constructor(db: PGliteWithLive, position = 0) { this.#db = db this.#position = position } @@ -59,13 +59,14 @@ export default class ChangeLogSynchronizer { return } + this.#status = 'processing' + this.process() } // Process the changes by fetching them and posting them to the server. // If the changes are accepted then proceed, otherwise rollback or retry. async process(): Promise { - this.#status === 'processing' this.#hasChangedWhileProcessing = false const { changes, position } = await this.query() @@ -95,22 +96,20 @@ export default class ChangeLogSynchronizer { return await this.process() } - this.#status === 'idle' + this.#status = 'idle' } /* * Fetch the current batch of changes */ - async query(): Promise<{ changes: Change[]; position: TransactionId }> { + async query(): Promise<{ changes: Change[]; position: number }> { const { rows } = await this.#db.sql` SELECT * from changes - WHERE transaction_id > ${this.#position} - ORDER BY - transaction_id asc, - id asc + WHERE id > ${this.#position} + ORDER BY id asc ` - const position = rows.length ? rows.at(-1)!.transaction_id : this.#position + const position = rows.length ? rows.at(-1)!.id : this.#position return { changes: rows, @@ -126,7 +125,7 @@ export default class ChangeLogSynchronizer { const groups = Object.groupBy(changes, (x) => x.transaction_id) const sorted = Object.entries(groups).sort((a, b) => - b[0].localeCompare(a[0]) + a[0].localeCompare(b[0]) ) const transactions = sorted.map(([transaction_id, changes]) => { return { @@ -137,7 +136,7 @@ export default class ChangeLogSynchronizer { const signal = this.#abortController?.signal - let response: Response + let response: Response | undefined try { response = await api.request(path, 'POST', transactions, signal) } catch (_err) { @@ -148,17 +147,17 @@ export default class ChangeLogSynchronizer { return 'retry' } - if (response instanceof Response) { - return response.status < 500 ? 'rejected' : 'retry' + if (response.ok) { + return 'accepted' } - return 'accepted' + return response.status < 500 ? 'rejected' : 'retry' } /* * Proceed by clearing the processed changes and moving the position forward. */ - async proceed(position: TransactionId): Promise { + async proceed(position: number): Promise { await this.#db.sql` DELETE from changes WHERE id <= ${position} diff --git a/examples/write-patterns/shared/app/client.ts b/examples/write-patterns/shared/app/client.ts index 5db70c4306..967cca7419 100644 --- a/examples/write-patterns/shared/app/client.ts +++ b/examples/write-patterns/shared/app/client.ts @@ -17,7 +17,7 @@ async function retryFetch( url: string, options: RequestOptions, retryCount: number -) { +): Promise { if (retryCount > maxRetries) { return } @@ -35,18 +35,11 @@ async function resilientFetch( url: string, options: RequestOptions, retryCount: number -) { +): Promise { try { - const response = await fetch(url, options) - - if (response.ok) { - return await response.json() - } - - return response - - // Could also retry here if you want to be resilient - // to 4xx and 5xx responses as well as network errors + // Could also check the status and retry before returning if you want to be + // resilient to 4xx and 5xx responses as well as network errors + return await fetch(url, options) } catch (err) { return await retryFetch(url, options, retryCount + 1) } @@ -57,7 +50,7 @@ async function request( method: string, data?: object, signal?: AbortSignal -) { +): Promise { const url = `${API_URL}${path}` const options: RequestOptions = { diff --git a/examples/write-patterns/shared/backend/api.js b/examples/write-patterns/shared/backend/api.js index bd202ec0bd..6b4f4a8798 100644 --- a/examples/write-patterns/shared/backend/api.js +++ b/examples/write-patterns/shared/backend/api.js @@ -22,49 +22,55 @@ const idSchema = z.string().uuid() const createSchema = z.object({ id: z.string().uuid(), title: z.string(), - created_at: z.string() + created_at: z.string(), + write_id: z.string().optional() }) const updateSchema = z.object({ - completed: z.boolean() + completed: z.boolean(), + write_id: z.string().optional() +}) +const deleteSchema = z.object({ + write_id: z.string().optional() }) // Define functions to create, update and delete todos // using the `db` client. -const createTodo = async (id, title, created_at) => { +const createTodo = async (id, title, created_at, write_id) => { const sql = ` - INSERT INTO todos (id, title, completed, created_at) - VALUES ($1, $2, false, $3) + INSERT INTO todos (id, title, completed, created_at, write_id) + VALUES ($1, $2, false, $3, $4) ` const params = [ id, title, - created_at + created_at, + write_id || null ] - return await db.query(sql, params) + await db.query(sql, params) } -const updateTodo = async (id, completed) => { +const updateTodo = async (id, completed, write_id) => { const sql = ` - UPDATE todos SET completed = $1 - WHERE id::text = $2 + UPDATE todos SET completed = $1, write_id = $2 + WHERE id = $3 ` const params = [ completed ? '1' : '0', + write_id || null, id ] - return await db.query(sql, params) + await db.query(sql, params) } const deleteTodo = async (id) => { const sql = `DELETE from todos where id = $1` const params = [id] - - return await db.query(sql, params) + await db.query(sql, params) } // Expose the shared REST API to create, update and delete todos. @@ -79,7 +85,7 @@ app.post(`/todos`, async (req, res) => { } try { - await createTodo(data.id, data.title, data.created_at) + await createTodo(data.id, data.title, data.created_at, data.write_id) } catch (err) { return res.status(500).json({ errors: err }) @@ -99,7 +105,7 @@ app.put(`/todos/:id`, async (req, res) => { } try { - await updateTodo(id, data.completed) + await updateTodo(id, data.completed, data.write_id) } catch (err) { return res.status(500).json({ errors: err }) @@ -140,8 +146,9 @@ const transactionsSchema = z.array( id: z.string().uuid(), title: z.string().optional(), completed: z.boolean().optional(), - created_at: z.string().optional() - }) + created_at: z.string().optional(), + }), + write_id: z.string() }) ) }) @@ -160,14 +167,14 @@ app.post(`/changes`, async (req, res) => { await db.query('BEGIN') data.forEach((tx) => { - tx.changes.forEach(({operation, value}) => { + tx.changes.forEach(({operation, value, write_id}) => { switch (operation) { case 'insert': - createTodo(value.id, value.title, value.created_at) + createTodo(value.id, value.title, value.created_at, write_id) break case 'update': - updateTodo(value.id, value.completed) + updateTodo(value.id, value.completed, write_id) break case 'delete': diff --git a/examples/write-patterns/shared/migrations/02-add-write-id.sql b/examples/write-patterns/shared/migrations/02-add-write-id.sql new file mode 100644 index 0000000000..489d39be46 --- /dev/null +++ b/examples/write-patterns/shared/migrations/02-add-write-id.sql @@ -0,0 +1,11 @@ +-- Add an optional `write_id` field to the table. +-- +-- This is not necessary for simpler patterns but provides an option for more +-- advanced patterns to match on when monitoring the Electric replication +-- stream in order to invalidate local state. +-- +-- Matching on a per-operation update key and not, say, just the row `id`, +-- allows you to rebase local optimistic state on top of concurrent changes to +-- the same row made by other users. (Because you only clear the local state +-- when *your* local write syncs through, not anyone else's). +ALTER TABLE todos ADD COLUMN write_id UUID; diff --git a/website/docs/guides/writes.md b/website/docs/guides/writes.md index 52426d9b0c..cd183a20eb 100644 --- a/website/docs/guides/writes.md +++ b/website/docs/guides/writes.md @@ -161,6 +161,8 @@ Combining data on-read makes local reads slightly slower. Whilst a persistent lo #### Implementation notes +The merge logic in the `matchWrite` function differs from the previous optimistic state example in that it supports rebasing local optimistic state on concurrent updates from other users. + The entrypoint for handling rollbacks has the local write context available. So it's able to rollback individual writes, rather than wiping the whole local state. Because it has the shared store available, it would also be possible to extend this to implement more sophisticated strategies. Such as also removing other local writes that causally depended-on or were related-to the rejected write. @@ -231,11 +233,9 @@ Syncing changes in the background complicates any potential [rollback handling]( #### Implementation notes -The entrypoint in the code for [merge logic](#merge-logic) is the very blunt `delete_local_on_synced_trigger` defined in the [`./local-schema.sql`](https://github.com/electric-sql/electric/blog/main/examples/write-patterns/patterns/4-through-the-db/local-schema.sql). The current implementation just wipes any local state for a row when any insert, updater or delete to that row syncs in from the server. - -This approach works and is simple to reason about. However, it won't preserve local changes on top of concurrent changes by other users (or tabs or devices). More sophisticated implementations could do more sophisticated merge logic here. This typically involves maintaining more bookkeeping info and having more complex triggers. +The [merge logic](#merge-logic) in the `delete_local_on_synced_insert_and_update_trigger` in [`./local-schema.sql`](https://github.com/electric-sql/electric/blog/main/examples/write-patterns/patterns/4-through-the-db/local-schema.sql) supports rebasing local optimistic state on concurrent updates from other users. -The example also implements a very blunt rollback strategy of clearing all local state and writes in the event of any write being rejected by the server. You may want to implement a more nuanced strategy and, for example, provide information to the user about what is happening and / or minimise data loss by only clearing local-state that's causally dependent on a rejected write. +The rollback strategy in the `rollback` method of the `ChangeLogSynchronizer` in [`./sync.ts`](https://github.com/electric-sql/electric/blog/main/examples/write-patterns/patterns/4-through-the-db/sync.ts) is very naive: clearing all local state and writes in the event of any write being rejected by the server. You may want to implement a more nuanced strategy. For example, to provide information to the user about what is happening and / or minimise data loss by only clearing local-state that's causally dependent on a rejected write. This opens the door to a lot of complexity that may best be addressed by [using an existing framework](#framework) or one of the [simpler patterns](#patterns). @@ -252,9 +252,9 @@ There are two key complexities introduced by handling offline writes or local wr ### Merge logic -When a change syncs in over the Electric replication stream, the application has to decide how to handle any overlapping optimistic state. This can be complicated by concurrency, when changes syncing in may be made by other users (or devices, or even tabs). In these cases, it may be necessary to rebase the local state on the synced state, rather than just naively clearing the local state. +When a change syncs in over the Electric replication stream, the application has to decide how to handle any overlapping optimistic state. This can be complicated by concurrency, when changes syncing in may be made by other users (or devices, or even tabs). The third and fourth examples both demonstrate approaches to rebasing the local state on the synced state, rather than just naively clearing the local state, in order to preserve local changes. -[Linearlite](https://github.com/electric-sql/electric/blog/main/examples/linearlite) is an example of through-the-DB sync with sophisticated merge logic. +[Linearlite](https://github.com/electric-sql/electric/blog/main/examples/linearlite) is another example of through-the-DB sync with more sophisticated merge logic. ### Rollbacks