From 5cfbed1177db542c9bbefd4f8fd9667431e64c59 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 25 Jul 2024 23:49:39 +0100 Subject: [PATCH] Live Query API (#104) * WIP Live Query API * Incremental live queries * Update readme * add live query test * add live query test --- packages/pglite/README.md | 121 +++++ packages/pglite/examples/live-changes.html | 54 +++ .../pglite/examples/live-incremental.html | 54 +++ packages/pglite/examples/live.html | 49 ++ packages/pglite/package.json | 1 + packages/pglite/src/live/index.ts | 444 ++++++++++++++++++ packages/pglite/src/live/interface.ts | 80 ++++ packages/pglite/tests/live.test.js | 370 +++++++++++++++ packages/pglite/tsup.config.ts | 1 + 9 files changed, 1174 insertions(+) create mode 100644 packages/pglite/examples/live-changes.html create mode 100644 packages/pglite/examples/live-incremental.html create mode 100644 packages/pglite/examples/live.html create mode 100644 packages/pglite/src/live/index.ts create mode 100644 packages/pglite/src/live/interface.ts create mode 100644 packages/pglite/tests/live.test.js diff --git a/packages/pglite/README.md b/packages/pglite/README.md index 0164a375..9074749f 100644 --- a/packages/pglite/README.md +++ b/packages/pglite/README.md @@ -359,6 +359,127 @@ PGlite supports the pl/pgsql procedural language extension, this is included and In future we plan to support additional extensions, see the [roadmap](#roadmap). +## Live Queries + +The "live" extension enables you to subscribe to a query and receve updated results when the underlying tables change. + +To use the extension it needs adding to the PGlite instance when creating it: + +```ts +import { PGlite } from "@electric-sql/pglite"; +import { live } from "@electric-sql/pglite/live"; + +const pg = new PGlite({ + extensions: { + live, + }, +}); +``` + +There are three methods on the `live` namespace: +- `live.query()` for basic live queries. With less machinery in PG it's quicker for small results sets and narrow rows. +- `live.incrementalQuery()` for incremental queries. It materialises the full result set on each update from only the changes emitted by the `live.changes` api. Perfect for feeding into React and good performance for large result sets and wide rows. +- `live.changes()` a lower level API that emits the changes (insert/update/delete) that can then be mapped to mutations in a UI or other datastore. + +### live.query() + +This is very similar to a standard query, but takes an additional callback that receives the results whenever they change: + +```js +const ret = pg.live.query("SELECT * FROM test ORDER BY rand;", [], (res) => { + // res is the same as a standard query result object +}); +``` + +The returned value from the call is an object with this interface: + +```ts +interface LiveQueryReturn { + initialResults: Results; + unsubscribe: () => Promise; + refresh: () => Promise; +} +``` + +- `initialResults` is the initial results set (also sent to the callback +- `unsubscribe` allow you to unsubscribe from the live query +- `refresh` allows you to force a refresh of the query + +Internally it watches for the tables that the query depends on, and reruns the query whenever they are changed. + +### live.incrementalQuery() + +Similar to above, but maintains a temporary table inside of Postgres of the previous state. When the tables it depends on change the query is re-run and diffed with the last state. Only the changes from the last version of the query are copied from WASM into JS. + +It requires an additional `key` argument, the name of a column (often a PK) to key the diff on. + +```ts +const ret = pg.live.incrementalQuery( + "SELECT * FROM test ORDER BY rand;", [], "id", + (res) => { + // res is the same as a standard query result object + } +); +``` + +The returned value is of the same type as the `query` method above. + +### live.changes() + +A lower level API which is the backend for the `incrementalQuery`, it emits the change that have happened. It requires a `key` to key the diff on: + +```ts +const ret = pg.live.changes( + "SELECT * FROM test ORDER BY rand;", [], "id", + (res) => { + // res is a change result object + } +); +``` + +the returned value from the call is defined by this interface: + +```ts +interface LiveChangesReturn { + fields: { name: string; dataTypeID: number }[]; + initialChanges: Array>; + unsubscribe: () => Promise; + refresh: () => Promise; +} +``` + +The results passed to the callback are array of `Change` objects: + +```ts +type ChangeInsert = { + __changed_columns__: string[]; + __op__: "INSERT"; + __after__: number; +} & T; + +type ChangeDelete = { + __changed_columns__: string[]; + __op__: "DELETE"; + __after__: undefined; +} & T; + +type ChangeUpdate = { + __changed_columns__: string[]; + __op__: "UPDATE"; + __after__: number; +} & T; + +type Change = ChangeInsert | ChangeDelete | ChangeUpdate; +``` + +Each `Change` includes the new values along with: + +- `__changed_columns__` the columns names that were changes +- `__op__` the operation that is required to update the state (`INSERT`, `UPDATE`, `DELETE`) +- `__after__` the `key` of the row that this row should be after, it will be included in `__changed_columns__` if it has been changed. + +This API can be used to implement very efficient in-place DOM updates. + ## ORM support. - Drizzle ORM supports PGlite, see [their docs here](https://orm.drizzle.team/docs/get-started-postgresql#pglite). diff --git a/packages/pglite/examples/live-changes.html b/packages/pglite/examples/live-changes.html new file mode 100644 index 00000000..b53465e6 --- /dev/null +++ b/packages/pglite/examples/live-changes.html @@ -0,0 +1,54 @@ + +

+
diff --git a/packages/pglite/examples/live-incremental.html b/packages/pglite/examples/live-incremental.html
new file mode 100644
index 00000000..104a35c8
--- /dev/null
+++ b/packages/pglite/examples/live-incremental.html
@@ -0,0 +1,54 @@
+
+
+ diff --git a/packages/pglite/examples/live.html b/packages/pglite/examples/live.html new file mode 100644 index 00000000..c4b8e542 --- /dev/null +++ b/packages/pglite/examples/live.html @@ -0,0 +1,49 @@ + +
+ diff --git a/packages/pglite/package.json b/packages/pglite/package.json index 54192348..cd5a17ec 100644 --- a/packages/pglite/package.json +++ b/packages/pglite/package.json @@ -19,6 +19,7 @@ "main": "dist/index.js", "exports": { ".": "./dist/index.js", + "./live": "./dist/live/index.js", "./worker": "./dist/worker/index.js", "./vector": "./dist/vector/index.js" }, diff --git a/packages/pglite/src/live/index.ts b/packages/pglite/src/live/index.ts new file mode 100644 index 00000000..7a68281f --- /dev/null +++ b/packages/pglite/src/live/index.ts @@ -0,0 +1,444 @@ +import type { + Extension, + PGliteInterface, + Results, + Transaction, +} from "../interface"; +import type { + LiveNamespace, + LiveQueryReturn, + LiveChangesReturn, + Change, +} from "./interface"; + +const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { + // Counter use to generate unique IDs for live queries + // This is used to create temporary views and so are scoped to the current connection + let liveQueryCounter = 0; + + // The notify triggers are only ever added and never removed + // Keep track of which triggers have been added to avoid adding them multiple times + const tableNotifyTriggersAdded = new Set(); + + const namespaceObj: LiveNamespace = { + async query( + query: string, + params: any[] | undefined | null, + callback: (results: Results) => void + ) { + const id = liveQueryCounter++; + + let results: Results; + let tables: { table_name: string; schema_name: string }[]; + + await pg.transaction(async (tx) => { + // Create a temporary view with the query + await tx.query( + `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`, + params ?? [] + ); + + // Get the tables used in the view and add triggers to notify when they change + tables = await getTablesForView(tx, `live_query_${id}_view`); + await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded); + + // Create prepared statement to get the results + await tx.exec(` + PREPARE live_query_${id}_get AS + SELECT * FROM live_query_${id}_view; + `); + + // Get the initial results + results = await tx.query(`EXECUTE live_query_${id}_get;`); + }); + + // Function to refresh the query + const refresh = async () => { + results = await pg.query(`EXECUTE live_query_${id}_get;`); + callback(results); + }; + + // Setup the listeners + const unsubList: Array<() => Promise> = []; + for (const table of tables!) { + const unsub = await pg.listen( + `table_change__${table.schema_name}__${table.table_name}`, + async () => { + refresh(); + } + ); + unsubList.push(unsub); + } + + // Function to unsubscribe from the query + const unsubscribe = async () => { + for (const unsub of unsubList) { + await unsub(); + } + await pg.exec(` + DROP VIEW IF EXISTS live_query_${id}_view; + DEALLOCATE live_query_${id}_get; + `); + }; + + // Run the callback with the initial results + callback(results!); + + // Return the initial results + return { + initialResults: results!, + unsubscribe, + refresh, + } satisfies LiveQueryReturn; + }, + + async changes( + query: string, + params: any[] | undefined | null, + key: string, + callback: (changes: Array>) => void + ) { + const id = liveQueryCounter++; + let tables: { table_name: string; schema_name: string }[]; + let stateSwitch: 1 | 2 = 1; + let changes: Results>; + + await pg.transaction(async (tx) => { + // Create a temporary view with the query + await tx.query( + `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`, + params ?? [] + ); + + // Get the tables used in the view and add triggers to notify when they change + tables = await getTablesForView(tx, `live_query_${id}_view`); + await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded); + + // Get the columns of the view + const columns = [ + ...( + await tx.query(` + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = 'live_query_${id}_view' + `) + ).rows, + { column_name: "__after__", data_type: "integer" }, + ]; + + // Init state tables as empty temp table + await tx.exec(` + CREATE TEMP TABLE live_query_${id}_state1 (LIKE live_query_${id}_view INCLUDING ALL); + CREATE TEMP TABLE live_query_${id}_state2 (LIKE live_query_${id}_view INCLUDING ALL); + `); + + // Create Diff views and prepared statements + for (const curr of [1, 2]) { + const prev = curr === 1 ? 2 : 1; + await tx.exec(` + PREPARE live_query_${id}_diff${curr} AS + WITH + prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}), + curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}), + data_diff AS ( + -- INSERT operations: Include all columns + SELECT + 'INSERT' AS __op__, + ${columns + .map( + ({ column_name }) => + `curr."${column_name}" AS "${column_name}"` + ) + .join(",\n")}, + ARRAY[]::text[] AS __changed_columns__ + FROM curr + LEFT JOIN prev ON curr.${key} = prev.${key} + WHERE prev.${key} IS NULL + UNION ALL + -- DELETE operations: Include only the primary key + SELECT + 'DELETE' AS __op__, + ${columns + .map(({ column_name, data_type }) => { + if (column_name === key) { + return `prev."${column_name}" AS "${column_name}"`; + } else { + return `NULL::${data_type} AS "${column_name}"`; + } + }) + .join(",\n")}, + ARRAY[]::text[] AS __changed_columns__ + FROM prev + LEFT JOIN curr ON prev.${key} = curr.${key} + WHERE curr.${key} IS NULL + UNION ALL + -- UPDATE operations: Include only changed columns + SELECT + 'UPDATE' AS __op__, + ${columns + .map(({ column_name, data_type }) => + column_name === key + ? `curr."${column_name}" AS "${column_name}"` + : `CASE + WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" + THEN curr."${column_name}" + ELSE NULL::${data_type} + END AS "${column_name}"` + ) + .join(",\n")}, + ARRAY(SELECT unnest FROM unnest(ARRAY[${columns + .filter(({ column_name }) => column_name !== key) + .map( + ({ column_name }) => + `CASE + WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" + THEN '${column_name}' + ELSE NULL + END` + ) + .join( + ", " + )}]) WHERE unnest IS NOT NULL) AS __changed_columns__ + FROM curr + INNER JOIN prev ON curr.${key} = prev.${key} + WHERE NOT (curr IS NOT DISTINCT FROM prev) + ) + SELECT * FROM data_diff; + `); + } + }); + + const refresh = async () => { + await pg.transaction(async (tx) => { + // Populate the state table + await tx.exec(` + DELETE FROM live_query_${id}_state${stateSwitch}; + INSERT INTO live_query_${id}_state${stateSwitch} + SELECT * FROM live_query_${id}_view; + `); + + // Get the changes + changes = await tx.query( + `EXECUTE live_query_${id}_diff${stateSwitch};` + ); + }); + + // Switch state + stateSwitch = stateSwitch === 1 ? 2 : 1; + + callback(changes!.rows); + }; + + // Setup the listeners + const unsubList: Array<() => Promise> = []; + for (const table of tables!) { + const unsub = await pg.listen( + `table_change__${table.schema_name}__${table.table_name}`, + async () => { + refresh(); + } + ); + unsubList.push(unsub); + } + + // Function to unsubscribe from the query + const unsubscribe = async () => { + for (const unsub of unsubList) { + await unsub(); + } + await pg.exec(` + DROP VIEW IF EXISTS live_query_${id}_view; + DROP TABLE IF EXISTS live_query_${id}_state1; + DROP TABLE IF EXISTS live_query_${id}_state2; + DEALLOCATE live_query_${id}_diff1; + DEALLOCATE live_query_${id}_diff2; + `); + }; + + // Run the callback with the initial changes + await refresh(); + + // Fields + const fields = changes!.fields.filter( + (field) => + !["__after__", "__op__", "__changed_columns__"].includes(field.name) + ); + + // Return the initial results + return { + fields, + initialChanges: changes!.rows, + unsubscribe, + refresh, + } satisfies LiveChangesReturn; + }, + + async incrementalQuery( + query: string, + params: any[] | undefined | null, + key: string, + callback: (results: Results>) => void + ) { + const rowsMap: Map = new Map(); + const afterMap: Map = new Map(); + let lastRows: Change[] = []; + let firstRun = true; + + const { fields, unsubscribe, refresh } = await namespaceObj.changes( + query, + params, + key, + (changes) => { + // Process the changes + for (const change of changes) { + const { + __op__: op, + __changed_columns__: changedColumns, + ...obj + } = change as typeof change & { [key: string]: any }; + switch (op) { + case "INSERT": + rowsMap.set(obj[key], obj); + afterMap.set(obj.__after__, obj[key]); + break; + case "DELETE": + const oldObj = rowsMap.get(obj[key]); + rowsMap.delete(obj[key]); + afterMap.delete(oldObj.__after__); + break; + case "UPDATE": + const newObj = { ...(rowsMap.get(obj[key]) ?? {}) }; + for (const columnName of changedColumns) { + newObj[columnName] = obj[columnName]; + if (columnName === "__after__") { + afterMap.set(obj.__after__, obj[key]); + } + } + rowsMap.set(obj[key], newObj); + break; + } + } + + // Get the rows in order + const rows: Change[] = []; + let lastKey: any = null; + while (true) { + const nextKey = afterMap.get(lastKey); + const obj = rowsMap.get(nextKey); + if (!obj) { + break; + } + rows.push(obj); + lastKey = nextKey; + } + lastRows = rows; + + // Run the callback + if (!firstRun) { + callback({ + rows, + fields, + }); + } + } + ); + + firstRun = false; + callback({ + rows: lastRows, + fields, + }); + + return { + initialResults: { + rows: lastRows, + fields, + }, + unsubscribe, + refresh, + } satisfies LiveQueryReturn; + }, + }; + + return { + namespaceObj, + }; +}; + +export const live = { + name: "Live Queries", + setup, +} satisfies Extension; + +/** + * Get a list of all the tables used in a view + * @param tx a transaction or or PGlite instance + * @param viewName the name of the view + * @returns list of tables used in the view + */ +async function getTablesForView( + tx: Transaction | PGliteInterface, + viewName: string +): Promise<{ table_name: string; schema_name: string }[]> { + return ( + await tx.query<{ + table_name: string; + schema_name: string; + }>( + ` + SELECT DISTINCT + cl.relname AS table_name, + n.nspname AS schema_name + FROM pg_rewrite r + JOIN pg_depend d ON r.oid = d.objid + JOIN pg_class cl ON d.refobjid = cl.oid + JOIN pg_namespace n ON cl.relnamespace = n.oid + WHERE + r.ev_class = ( + SELECT oid FROM pg_class WHERE relname = $1 AND relkind = 'v' + ) + AND d.deptype = 'n'; + `, + [viewName] + ) + ).rows.filter((row) => row.table_name !== viewName); +} + +/** + * Add triggers to tables to notify when they change + * @param tx a transaction or PGlite instance + * @param tables list of tables to add triggers to + */ +async function addNotifyTriggersToTables( + tx: Transaction | PGliteInterface, + tables: { table_name: string; schema_name: string }[], + tableNotifyTriggersAdded: Set +) { + const triggers = tables + .filter( + (table) => + !tableNotifyTriggersAdded.has( + `${table.schema_name}_${table.table_name}` + ) + ) + .map((table) => { + return ` + CREATE OR REPLACE FUNCTION _notify_${table.schema_name}_${table.table_name}() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('table_change__${table.schema_name}__${table.table_name}', ''); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + CREATE OR REPLACE TRIGGER _notify_trigger_${table.schema_name}_${table.table_name} + AFTER INSERT OR UPDATE OR DELETE ON ${table.schema_name}.${table.table_name} + FOR EACH STATEMENT EXECUTE FUNCTION _notify_${table.schema_name}_${table.table_name}(); + `; + }) + .join("\n"); + if (triggers.trim() !== "") { + await tx.exec(triggers); + } + tables.map((table) => + tableNotifyTriggersAdded.add(`${table.schema_name}_${table.table_name}`) + ); +} diff --git a/packages/pglite/src/live/interface.ts b/packages/pglite/src/live/interface.ts new file mode 100644 index 00000000..9bbb7814 --- /dev/null +++ b/packages/pglite/src/live/interface.ts @@ -0,0 +1,80 @@ +import type { Results } from "../interface"; + +export interface LiveNamespace { + /** + * Create a live query + * @param query - The query to run + * @param params - The parameters to pass to the query + * @param callback - A callback to run when the query is updated + * @returns A promise that resolves to an object with the initial results, + * an unsubscribe function, and a refresh function + */ + query( + query: string, + params: any[] | undefined | null, + callback: (results: Results) => void, + ): Promise>; + + /** + * Create a live query that returns the changes to the query results + * @param query - The query to run + * @param params - The parameters to pass to the query + * @param callback - A callback to run when the query is updated + * @returns A promise that resolves to an object with the initial changes, + * an unsubscribe function, and a refresh function + */ + changes( + query: string, + params: any[] | undefined | null, + key: string, + callback: (changes: Array>) => void, + ): Promise>; + + /** + * Create a live query with incremental updates + * @param query - The query to run + * @param params - The parameters to pass to the query + * @param callback - A callback to run when the query is updated + * @returns A promise that resolves to an object with the initial results, + * an unsubscribe function, and a refresh function + */ + incrementalQuery( + query: string, + params: any[] | undefined | null, + key: string, + callback: (results: Results>) => void, + ): Promise>>; +} + +export interface LiveQueryReturn { + initialResults: Results; + unsubscribe: () => Promise; + refresh: () => Promise; +} + +export interface LiveChangesReturn { + fields: { name: string; dataTypeID: number }[]; + initialChanges: Array>; + unsubscribe: () => Promise; + refresh: () => Promise; +} + +export type ChangeInsert = { + __changed_columns__: string[]; + __op__: "INSERT"; + __after__: number; +} & T; + +export type ChangeDelete = {} & { + __changed_columns__: string[]; + __op__: "DELETE"; + __after__: undefined; +} & T; + +export type ChangeUpdate = {} & { + __changed_columns__: string[]; + __op__: "UPDATE"; + __after__: number; +} & T; + +export type Change = ChangeInsert | ChangeDelete | ChangeUpdate; diff --git a/packages/pglite/tests/live.test.js b/packages/pglite/tests/live.test.js new file mode 100644 index 00000000..e3fd37ea --- /dev/null +++ b/packages/pglite/tests/live.test.js @@ -0,0 +1,370 @@ +import test from "ava"; +import { PGlite } from "../dist/index.js"; +import { live } from "../dist/live/index.js"; + +test.serial("basic live query", async (t) => { + const db = new PGlite({ + extensions: { live }, + }); + + await db.exec(` + CREATE TABLE IF NOT EXISTS test ( + id SERIAL PRIMARY KEY, + number INT + ); + `); + + await db.exec(` + INSERT INTO test (number) + SELECT i*10 FROM generate_series(1, 5) i; + `); + + let updatedResults; + const eventTarget = new EventTarget(); + + const { initialResults, unsubscribe } = await db.live.query( + "SELECT * FROM test ORDER BY number;", + [], + (result) => { + updatedResults = result; + eventTarget.dispatchEvent(new Event("change")); + } + ); + + t.deepEqual(initialResults.rows, [ + { id: 1, number: 10 }, + { id: 2, number: 20 }, + { id: 3, number: 30 }, + { id: 4, number: 40 }, + { id: 5, number: 50 }, + ]); + + db.exec("INSERT INTO test (number) VALUES (25);"); + + await new Promise((resolve) => + eventTarget.addEventListener("change", resolve, { once: true }) + ); + + t.deepEqual(updatedResults.rows, [ + { id: 1, number: 10 }, + { id: 2, number: 20 }, + { id: 6, number: 25 }, + { id: 3, number: 30 }, + { id: 4, number: 40 }, + { id: 5, number: 50 }, + ]); + + db.exec("DELETE FROM test WHERE id = 6;"); + + await new Promise((resolve) => + eventTarget.addEventListener("change", resolve, { once: true }) + ); + + t.deepEqual(updatedResults.rows, [ + { id: 1, number: 10 }, + { id: 2, number: 20 }, + { id: 3, number: 30 }, + { id: 4, number: 40 }, + { id: 5, number: 50 }, + ]); + + db.exec("UPDATE test SET number = 15 WHERE id = 3;"); + + await new Promise((resolve) => + eventTarget.addEventListener("change", resolve, { once: true }) + ); + + t.deepEqual(updatedResults.rows, [ + { id: 1, number: 10 }, + { id: 3, number: 15 }, + { id: 2, number: 20 }, + { id: 4, number: 40 }, + { id: 5, number: 50 }, + ]); + + unsubscribe(); + + db.exec("INSERT INTO test (number) VALUES (35);"); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + t.deepEqual(updatedResults.rows, [ + { id: 1, number: 10 }, + { id: 3, number: 15 }, + { id: 2, number: 20 }, + { id: 4, number: 40 }, + { id: 5, number: 50 }, + ]); + +}); + +test.serial("basic live incremental query", async (t) => { + const db = new PGlite({ + extensions: { live }, + }); + + await db.exec(` + CREATE TABLE IF NOT EXISTS test ( + id SERIAL PRIMARY KEY, + number INT + ); + `); + + await db.exec(` + INSERT INTO test (number) + SELECT i*10 FROM generate_series(1, 5) i; + `); + + let updatedResults; + const eventTarget = new EventTarget(); + + const { initialResults, unsubscribe } = await db.live.incrementalQuery( + "SELECT * FROM test ORDER BY number;", + [], + "id", + (result) => { + updatedResults = result; + eventTarget.dispatchEvent(new Event("change")); + } + ); + + t.deepEqual(initialResults.rows, [ + { id: 1, number: 10, __after__: null }, + { id: 2, number: 20, __after__: 1 }, + { id: 3, number: 30, __after__: 2 }, + { id: 4, number: 40, __after__: 3 }, + { id: 5, number: 50, __after__: 4 }, + ]); + + await db.exec("INSERT INTO test (number) VALUES (25);"); + + await new Promise((resolve) => + eventTarget.addEventListener("change", resolve, { once: true }) + ); + + t.deepEqual(updatedResults.rows, [ + { id: 1, number: 10, __after__: null }, + { id: 2, number: 20, __after__: 1 }, + { id: 6, number: 25, __after__: 2 }, + { id: 3, number: 30, __after__: 6 }, + { id: 4, number: 40, __after__: 3 }, + { id: 5, number: 50, __after__: 4 }, + ]); + + await db.exec("DELETE FROM test WHERE id = 6;"); + + await new Promise((resolve) => + eventTarget.addEventListener("change", resolve, { once: true }) + ); + + t.deepEqual(updatedResults.rows, [ + { id: 1, number: 10, __after__: null }, + { id: 2, number: 20, __after__: 1 }, + { id: 3, number: 30, __after__: 2 }, + { id: 4, number: 40, __after__: 3 }, + { id: 5, number: 50, __after__: 4 }, + ]); + + await db.exec("UPDATE test SET number = 15 WHERE id = 3;"); + + await new Promise((resolve) => + eventTarget.addEventListener("change", resolve, { once: true }) + ); + + t.deepEqual(updatedResults.rows, [ + { id: 1, number: 10, __after__: null }, + { id: 3, number: 15, __after__: 1 }, + { id: 2, number: 20, __after__: 3 }, + { id: 4, number: 40, __after__: 2 }, + { id: 5, number: 50, __after__: 4 }, + ]); + + unsubscribe(); + + await db.exec("INSERT INTO test (number) VALUES (35);"); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + t.deepEqual(updatedResults.rows, [ + { id: 1, number: 10, __after__: null }, + { id: 3, number: 15, __after__: 1 }, + { id: 2, number: 20, __after__: 3 }, + { id: 4, number: 40, __after__: 2 }, + { id: 5, number: 50, __after__: 4 }, + ]); +}); + +test.serial("basic live changes", async (t) => { + const db = new PGlite({ + extensions: { live }, + }); + + await db.exec(` + CREATE TABLE IF NOT EXISTS test ( + id SERIAL PRIMARY KEY, + number INT + ); + `); + + await db.exec(` + INSERT INTO test (number) + SELECT i*10 FROM generate_series(1, 5) i; + `); + + let updatedChanges; + const eventTarget = new EventTarget(); + + const { initialChanges, unsubscribe } = await db.live.changes( + "SELECT * FROM test ORDER BY number;", + [], + "id", + (changes) => { + updatedChanges = changes; + eventTarget.dispatchEvent(new Event("change")); + } + ); + + t.deepEqual(initialChanges, [ + { + __op__: "INSERT", + id: 1, + number: 10, + __after__: null, + __changed_columns__: [], + }, + { + __op__: "INSERT", + id: 2, + number: 20, + __after__: 1, + __changed_columns__: [], + }, + { + __op__: "INSERT", + id: 3, + number: 30, + __after__: 2, + __changed_columns__: [], + }, + { + __op__: "INSERT", + id: 4, + number: 40, + __after__: 3, + __changed_columns__: [], + }, + { + __op__: "INSERT", + id: 5, + number: 50, + __after__: 4, + __changed_columns__: [], + }, + ]); + + db.exec("INSERT INTO test (number) VALUES (25);"); + + await new Promise((resolve) => + eventTarget.addEventListener("change", resolve, { once: true }) + ); + + t.deepEqual(updatedChanges, [ + { + __op__: "INSERT", + id: 6, + number: 25, + __after__: 2, + __changed_columns__: [], + }, + { + __after__: 6, + __changed_columns__: ["__after__"], + __op__: "UPDATE", + id: 3, + number: null, + }, + ]); + + db.exec("DELETE FROM test WHERE id = 6;"); + + await new Promise((resolve) => + eventTarget.addEventListener("change", resolve, { once: true }) + ); + + t.deepEqual(updatedChanges, [ + { + __op__: "DELETE", + id: 6, + number: null, + __after__: null, + __changed_columns__: [], + }, + { + __after__: 2, + __changed_columns__: ["__after__"], + __op__: "UPDATE", + id: 3, + number: null, + }, + ]); + + db.exec("UPDATE test SET number = 15 WHERE id = 3;"); + + await new Promise((resolve) => + eventTarget.addEventListener("change", resolve, { once: true }) + ); + + t.deepEqual(updatedChanges, [ + { + id: 2, + __after__: 3, + __changed_columns__: ["__after__"], + __op__: "UPDATE", + number: null, + }, + { + id: 3, + __after__: 1, + __changed_columns__: ["number", "__after__"], + __op__: "UPDATE", + number: 15, + }, + { + id: 4, + __after__: 2, + __changed_columns__: ["__after__"], + __op__: "UPDATE", + number: null, + }, + ]); + + unsubscribe(); + + db.exec("INSERT INTO test (number) VALUES (35);"); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + t.deepEqual(updatedChanges, [ + { + id: 2, + __after__: 3, + __changed_columns__: ["__after__"], + __op__: "UPDATE", + number: null, + }, + { + id: 3, + __after__: 1, + __changed_columns__: ["number", "__after__"], + __op__: "UPDATE", + number: 15, + }, + { + id: 4, + __after__: 2, + __changed_columns__: ["__after__"], + __op__: "UPDATE", + number: null, + }, + ]); +}); diff --git a/packages/pglite/tsup.config.ts b/packages/pglite/tsup.config.ts index c3de8fcb..3b3c1b16 100644 --- a/packages/pglite/tsup.config.ts +++ b/packages/pglite/tsup.config.ts @@ -17,6 +17,7 @@ const replaceAssertPlugin = { const entryPoints = [ "src/index.ts", + 'src/live/index.ts', "src/worker/index.ts", "src/worker/process.ts", "src/vector/index.ts",