From c3ff9a456a1d23847fbb95bb17aea9fb72325257 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 29 Jul 2024 17:05:57 +0100 Subject: [PATCH] Multi tab live.changes and live.incrementalQuery --- packages/pglite/examples/worker.html | 4 +- packages/pglite/src/live/index.ts | 263 +++++++++++++++----------- packages/pglite/src/live/interface.ts | 10 +- packages/pglite/src/worker/index.ts | 58 +++--- 4 files changed, 196 insertions(+), 139 deletions(-) diff --git a/packages/pglite/examples/worker.html b/packages/pglite/examples/worker.html index e4a28478..f258c3b6 100644 --- a/packages/pglite/examples/worker.html +++ b/packages/pglite/examples/worker.html @@ -56,9 +56,11 @@ const btnClear = document.querySelector("#clear"); btnClear.addEventListener("click", clearData); - pg.live.query( + // pg.live.query( + pg.live.incrementalQuery( `SELECT * FROM test`, [], + 'id', (data) => { const output = document.getElementById("output"); output.textContent = JSON.stringify(data.rows, null, 2); diff --git a/packages/pglite/src/live/index.ts b/packages/pglite/src/live/index.ts index 62ff9787..4b8ce368 100644 --- a/packages/pglite/src/live/index.ts +++ b/packages/pglite/src/live/index.ts @@ -63,6 +63,8 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { if ( msg == `prepared statement "live_query_${id}_get" does not exist` ) { + // If the prepared statement does not exist, reset and try again + // This can happen if using the multi-tab worker if (count > MAX_RETRIES) { throw e; } @@ -121,130 +123,163 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { let stateSwitch: 1 | 2 = 1; let changes: Results>; - await pg.transaction(async (tx) => { - // Create a temporary view with the query - await tx.query( - `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`, - params ?? [], - ); + const init = async () => { + await pg.transaction(async (tx) => { + // Create a temporary view with the query + await tx.query( + `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`, + params ?? [], + ); - // Get the tables used in the view and add triggers to notify when they change - tables = await getTablesForView(tx, `live_query_${id}_view`); - await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded); - - // Get the columns of the view - const columns = [ - ...( - await tx.query(` - SELECT column_name, data_type - FROM information_schema.columns - WHERE table_name = 'live_query_${id}_view' - `) - ).rows, - { column_name: "__after__", data_type: "integer" }, - ]; - - // Init state tables as empty temp table - await tx.exec(` - CREATE TEMP TABLE live_query_${id}_state1 (LIKE live_query_${id}_view INCLUDING ALL); - CREATE TEMP TABLE live_query_${id}_state2 (LIKE live_query_${id}_view INCLUDING ALL); - `); + // Get the tables used in the view and add triggers to notify when they change + tables = await getTablesForView(tx, `live_query_${id}_view`); + await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded); - // Create Diff views and prepared statements - for (const curr of [1, 2]) { - const prev = curr === 1 ? 2 : 1; + // Get the columns of the view + const columns = [ + ...( + await tx.query(` + SELECT column_name, data_type, udt_name + FROM information_schema.columns + WHERE table_name = 'live_query_${id}_view' + `) + ).rows, + { column_name: "__after__", data_type: "integer" }, + ]; + + // Init state tables as empty temp table await tx.exec(` - PREPARE live_query_${id}_diff${curr} AS - WITH - prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}), - curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}), - data_diff AS ( - -- INSERT operations: Include all columns - SELECT - 'INSERT' AS __op__, - ${columns - .map( - ({ column_name }) => - `curr."${column_name}" AS "${column_name}"`, - ) - .join(",\n")}, - ARRAY[]::text[] AS __changed_columns__ - FROM curr - LEFT JOIN prev ON curr.${key} = prev.${key} - WHERE prev.${key} IS NULL - UNION ALL - -- DELETE operations: Include only the primary key - SELECT - 'DELETE' AS __op__, - ${columns - .map(({ column_name, data_type }) => { - if (column_name === key) { - return `prev."${column_name}" AS "${column_name}"`; - } else { - return `NULL::${data_type} AS "${column_name}"`; - } - }) - .join(",\n")}, - ARRAY[]::text[] AS __changed_columns__ - FROM prev - LEFT JOIN curr ON prev.${key} = curr.${key} - WHERE curr.${key} IS NULL - UNION ALL - -- UPDATE operations: Include only changed columns - SELECT - 'UPDATE' AS __op__, - ${columns - .map(({ column_name, data_type }) => - column_name === key - ? `curr."${column_name}" AS "${column_name}"` - : `CASE - WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" - THEN curr."${column_name}" - ELSE NULL::${data_type} - END AS "${column_name}"`, - ) - .join(",\n")}, - ARRAY(SELECT unnest FROM unnest(ARRAY[${columns - .filter(({ column_name }) => column_name !== key) + CREATE TEMP TABLE live_query_${id}_state1 (LIKE live_query_${id}_view INCLUDING ALL); + CREATE TEMP TABLE live_query_${id}_state2 (LIKE live_query_${id}_view INCLUDING ALL); + `); + + // Create Diff views and prepared statements + for (const curr of [1, 2]) { + const prev = curr === 1 ? 2 : 1; + await tx.exec(` + PREPARE live_query_${id}_diff${curr} AS + WITH + prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}), + curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}), + data_diff AS ( + -- INSERT operations: Include all columns + SELECT + 'INSERT' AS __op__, + ${columns .map( ({ column_name }) => - `CASE - WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" - THEN '${column_name}' - ELSE NULL - END`, + `curr."${column_name}" AS "${column_name}"`, ) - .join( - ", ", - )}]) WHERE unnest IS NOT NULL) AS __changed_columns__ - FROM curr - INNER JOIN prev ON curr.${key} = prev.${key} - WHERE NOT (curr IS NOT DISTINCT FROM prev) - ) - SELECT * FROM data_diff; - `); - } - }); + .join(",\n")}, + ARRAY[]::text[] AS __changed_columns__ + FROM curr + LEFT JOIN prev ON curr.${key} = prev.${key} + WHERE prev.${key} IS NULL + UNION ALL + -- DELETE operations: Include only the primary key + SELECT + 'DELETE' AS __op__, + ${columns + .map(({ column_name, data_type, udt_name }) => { + if (column_name === key) { + return `prev."${column_name}" AS "${column_name}"`; + } else { + return `NULL::${data_type == "USER-DEFINED" ? udt_name : data_type} AS "${column_name}"`; + } + }) + .join(",\n")}, + ARRAY[]::text[] AS __changed_columns__ + FROM prev + LEFT JOIN curr ON prev.${key} = curr.${key} + WHERE curr.${key} IS NULL + UNION ALL + -- UPDATE operations: Include only changed columns + SELECT + 'UPDATE' AS __op__, + ${columns + .map(({ column_name, data_type, udt_name }) => + column_name === key + ? `curr."${column_name}" AS "${column_name}"` + : `CASE + WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" + THEN curr."${column_name}" + ELSE NULL::${data_type == "USER-DEFINED" ? udt_name : data_type} + END AS "${column_name}"`, + ) + .join(",\n")}, + ARRAY(SELECT unnest FROM unnest(ARRAY[${columns + .filter(({ column_name }) => column_name !== key) + .map( + ({ column_name }) => + `CASE + WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" + THEN '${column_name}' + ELSE NULL + END`, + ) + .join( + ", ", + )}]) WHERE unnest IS NOT NULL) AS __changed_columns__ + FROM curr + INNER JOIN prev ON curr.${key} = prev.${key} + WHERE NOT (curr IS NOT DISTINCT FROM prev) + ) + SELECT * FROM data_diff; + `); + } + }); + }; - const refresh = async () => { - await pg.transaction(async (tx) => { - // Populate the state table - await tx.exec(` - DELETE FROM live_query_${id}_state${stateSwitch}; - INSERT INTO live_query_${id}_state${stateSwitch} - SELECT * FROM live_query_${id}_view; - `); + await init(); - // Get the changes - changes = await tx.query( - `EXECUTE live_query_${id}_diff${stateSwitch};`, - ); - }); + const refresh = async () => { + let reset = false; + for (let i = 0; i < 5; i++) { + try { + await pg.transaction(async (tx) => { + // Populate the state table + await tx.exec(` + DELETE FROM live_query_${id}_state${stateSwitch}; + INSERT INTO live_query_${id}_state${stateSwitch} + SELECT * FROM live_query_${id}_view; + `); + + // Get the changes + changes = await tx.query( + `EXECUTE live_query_${id}_diff${stateSwitch};`, + ); + }); + break; + } catch (e) { + const msg = (e as Error).message; + if ( + msg == + `relation "live_query_${id}_state${stateSwitch}" does not exist` + ) { + // If the state table does not exist, reset and try again + // This can happen if using the multi-tab worker + reset = true; + await init(); + continue; + } else { + throw e; + } + } + } // Switch state stateSwitch = stateSwitch === 1 ? 2 : 1; - callback(changes!.rows); + callback([ + ...(reset + ? [ + { + __op__: "RESET" as const, + }, + ] + : []), + ...changes!.rows, + ]); }; // Setup the listeners @@ -315,6 +350,10 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { ...obj } = change as typeof change & { [key: string]: any }; switch (op) { + case "RESET": + rowsMap.clear(); + afterMap.clear(); + break; case "INSERT": rowsMap.set(obj[key], obj); afterMap.set(obj.__after__, obj[key]); diff --git a/packages/pglite/src/live/interface.ts b/packages/pglite/src/live/interface.ts index 9bbb7814..6c3a3661 100644 --- a/packages/pglite/src/live/interface.ts +++ b/packages/pglite/src/live/interface.ts @@ -77,4 +77,12 @@ export type ChangeUpdate = {} & { __after__: number; } & T; -export type Change = ChangeInsert | ChangeDelete | ChangeUpdate; +export type ChangeReset = { + __op__: "RESET"; +} & T; + +export type Change = + | ChangeInsert + | ChangeDelete + | ChangeUpdate + | ChangeReset; diff --git a/packages/pglite/src/worker/index.ts b/packages/pglite/src/worker/index.ts index 065f301f..ceccc9c3 100644 --- a/packages/pglite/src/worker/index.ts +++ b/packages/pglite/src/worker/index.ts @@ -317,29 +317,36 @@ export class PGliteWorker implements PGliteInterface { ): Promise { await this.waitReady; const txId = await this.#rpc("transactionStart"); - const ret = await callback({ - query: async (query, params, options) => { - return await this.#rpc( - "transactionQuery", - txId, - query, - params, - options, - ); - }, - exec: async (query, options) => { - return (await this.#rpc( - "transactionExec", - txId, - query, - options, - )) as any; - }, - rollback: async () => { - await this.#rpc("transactionRollback", txId); - }, - closed: false, - } as Transaction); + let ret: T | undefined; + try { + ret = await callback({ + query: async (query, params, options) => { + return await this.#rpc( + "transactionQuery", + txId, + query, + params, + options, + ); + }, + exec: async (query, options) => { + return (await this.#rpc( + "transactionExec", + txId, + query, + options, + )) as any; + }, + rollback: async () => { + await this.#rpc("transactionRollback", txId); + }, + closed: false, + } as Transaction); + } catch (error) { + console.log("Transaction error!!!!!!"); + await this.#rpc("transactionRollback", txId); + throw error; + } await this.#rpc("transactionCommit", txId); return ret; } @@ -656,8 +663,9 @@ function makeWorkerApi(db: PGliteInterface) { if (!transactions.has(id)) { throw new Error("No transaction"); } - const tx = (await transactions.get(id)!).tx; - await tx.rollback(); + const tx = await transactions.get(id)!; + await tx.tx.rollback(); + tx.reject(new Error("Transaction rolled back")); transactions.delete(id); }, async execProtocol(message: Uint8Array) {