diff --git a/packages/pglite/examples/worker.html b/packages/pglite/examples/worker.html
index e4a28478..f258c3b6 100644
--- a/packages/pglite/examples/worker.html
+++ b/packages/pglite/examples/worker.html
@@ -56,9 +56,11 @@
const btnClear = document.querySelector("#clear");
btnClear.addEventListener("click", clearData);
- pg.live.query(
+ // pg.live.query(
+ pg.live.incrementalQuery(
`SELECT * FROM test`,
[],
+ 'id',
(data) => {
const output = document.getElementById("output");
output.textContent = JSON.stringify(data.rows, null, 2);
diff --git a/packages/pglite/src/live/index.ts b/packages/pglite/src/live/index.ts
index 62ff9787..4b8ce368 100644
--- a/packages/pglite/src/live/index.ts
+++ b/packages/pglite/src/live/index.ts
@@ -63,6 +63,8 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
if (
msg == `prepared statement "live_query_${id}_get" does not exist`
) {
+ // If the prepared statement does not exist, reset and try again
+ // This can happen if using the multi-tab worker
if (count > MAX_RETRIES) {
throw e;
}
@@ -121,130 +123,163 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
let stateSwitch: 1 | 2 = 1;
let changes: Results>;
- await pg.transaction(async (tx) => {
- // Create a temporary view with the query
- await tx.query(
- `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
- params ?? [],
- );
+ const init = async () => {
+ await pg.transaction(async (tx) => {
+ // Create a temporary view with the query
+ await tx.query(
+ `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
+ params ?? [],
+ );
- // Get the tables used in the view and add triggers to notify when they change
- tables = await getTablesForView(tx, `live_query_${id}_view`);
- await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);
-
- // Get the columns of the view
- const columns = [
- ...(
- await tx.query(`
- SELECT column_name, data_type
- FROM information_schema.columns
- WHERE table_name = 'live_query_${id}_view'
- `)
- ).rows,
- { column_name: "__after__", data_type: "integer" },
- ];
-
- // Init state tables as empty temp table
- await tx.exec(`
- CREATE TEMP TABLE live_query_${id}_state1 (LIKE live_query_${id}_view INCLUDING ALL);
- CREATE TEMP TABLE live_query_${id}_state2 (LIKE live_query_${id}_view INCLUDING ALL);
- `);
+ // Get the tables used in the view and add triggers to notify when they change
+ tables = await getTablesForView(tx, `live_query_${id}_view`);
+ await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);
- // Create Diff views and prepared statements
- for (const curr of [1, 2]) {
- const prev = curr === 1 ? 2 : 1;
+ // Get the columns of the view
+ const columns = [
+ ...(
+ await tx.query(`
+ SELECT column_name, data_type, udt_name
+ FROM information_schema.columns
+ WHERE table_name = 'live_query_${id}_view'
+ `)
+ ).rows,
+ { column_name: "__after__", data_type: "integer" },
+ ];
+
+ // Init state tables as empty temp table
await tx.exec(`
- PREPARE live_query_${id}_diff${curr} AS
- WITH
- prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}),
- curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}),
- data_diff AS (
- -- INSERT operations: Include all columns
- SELECT
- 'INSERT' AS __op__,
- ${columns
- .map(
- ({ column_name }) =>
- `curr."${column_name}" AS "${column_name}"`,
- )
- .join(",\n")},
- ARRAY[]::text[] AS __changed_columns__
- FROM curr
- LEFT JOIN prev ON curr.${key} = prev.${key}
- WHERE prev.${key} IS NULL
- UNION ALL
- -- DELETE operations: Include only the primary key
- SELECT
- 'DELETE' AS __op__,
- ${columns
- .map(({ column_name, data_type }) => {
- if (column_name === key) {
- return `prev."${column_name}" AS "${column_name}"`;
- } else {
- return `NULL::${data_type} AS "${column_name}"`;
- }
- })
- .join(",\n")},
- ARRAY[]::text[] AS __changed_columns__
- FROM prev
- LEFT JOIN curr ON prev.${key} = curr.${key}
- WHERE curr.${key} IS NULL
- UNION ALL
- -- UPDATE operations: Include only changed columns
- SELECT
- 'UPDATE' AS __op__,
- ${columns
- .map(({ column_name, data_type }) =>
- column_name === key
- ? `curr."${column_name}" AS "${column_name}"`
- : `CASE
- WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
- THEN curr."${column_name}"
- ELSE NULL::${data_type}
- END AS "${column_name}"`,
- )
- .join(",\n")},
- ARRAY(SELECT unnest FROM unnest(ARRAY[${columns
- .filter(({ column_name }) => column_name !== key)
+ CREATE TEMP TABLE live_query_${id}_state1 (LIKE live_query_${id}_view INCLUDING ALL);
+ CREATE TEMP TABLE live_query_${id}_state2 (LIKE live_query_${id}_view INCLUDING ALL);
+ `);
+
+ // Create Diff views and prepared statements
+ for (const curr of [1, 2]) {
+ const prev = curr === 1 ? 2 : 1;
+ await tx.exec(`
+ PREPARE live_query_${id}_diff${curr} AS
+ WITH
+ prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}),
+ curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}),
+ data_diff AS (
+ -- INSERT operations: Include all columns
+ SELECT
+ 'INSERT' AS __op__,
+ ${columns
.map(
({ column_name }) =>
- `CASE
- WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
- THEN '${column_name}'
- ELSE NULL
- END`,
+ `curr."${column_name}" AS "${column_name}"`,
)
- .join(
- ", ",
- )}]) WHERE unnest IS NOT NULL) AS __changed_columns__
- FROM curr
- INNER JOIN prev ON curr.${key} = prev.${key}
- WHERE NOT (curr IS NOT DISTINCT FROM prev)
- )
- SELECT * FROM data_diff;
- `);
- }
- });
+ .join(",\n")},
+ ARRAY[]::text[] AS __changed_columns__
+ FROM curr
+ LEFT JOIN prev ON curr.${key} = prev.${key}
+ WHERE prev.${key} IS NULL
+ UNION ALL
+ -- DELETE operations: Include only the primary key
+ SELECT
+ 'DELETE' AS __op__,
+ ${columns
+ .map(({ column_name, data_type, udt_name }) => {
+ if (column_name === key) {
+ return `prev."${column_name}" AS "${column_name}"`;
+ } else {
+ return `NULL::${data_type == "USER-DEFINED" ? udt_name : data_type} AS "${column_name}"`;
+ }
+ })
+ .join(",\n")},
+ ARRAY[]::text[] AS __changed_columns__
+ FROM prev
+ LEFT JOIN curr ON prev.${key} = curr.${key}
+ WHERE curr.${key} IS NULL
+ UNION ALL
+ -- UPDATE operations: Include only changed columns
+ SELECT
+ 'UPDATE' AS __op__,
+ ${columns
+ .map(({ column_name, data_type, udt_name }) =>
+ column_name === key
+ ? `curr."${column_name}" AS "${column_name}"`
+ : `CASE
+ WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
+ THEN curr."${column_name}"
+ ELSE NULL::${data_type == "USER-DEFINED" ? udt_name : data_type}
+ END AS "${column_name}"`,
+ )
+ .join(",\n")},
+ ARRAY(SELECT unnest FROM unnest(ARRAY[${columns
+ .filter(({ column_name }) => column_name !== key)
+ .map(
+ ({ column_name }) =>
+ `CASE
+ WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
+ THEN '${column_name}'
+ ELSE NULL
+ END`,
+ )
+ .join(
+ ", ",
+ )}]) WHERE unnest IS NOT NULL) AS __changed_columns__
+ FROM curr
+ INNER JOIN prev ON curr.${key} = prev.${key}
+ WHERE NOT (curr IS NOT DISTINCT FROM prev)
+ )
+ SELECT * FROM data_diff;
+ `);
+ }
+ });
+ };
- const refresh = async () => {
- await pg.transaction(async (tx) => {
- // Populate the state table
- await tx.exec(`
- DELETE FROM live_query_${id}_state${stateSwitch};
- INSERT INTO live_query_${id}_state${stateSwitch}
- SELECT * FROM live_query_${id}_view;
- `);
+ await init();
- // Get the changes
- changes = await tx.query(
- `EXECUTE live_query_${id}_diff${stateSwitch};`,
- );
- });
+ const refresh = async () => {
+ let reset = false;
+ for (let i = 0; i < 5; i++) {
+ try {
+ await pg.transaction(async (tx) => {
+ // Populate the state table
+ await tx.exec(`
+ DELETE FROM live_query_${id}_state${stateSwitch};
+ INSERT INTO live_query_${id}_state${stateSwitch}
+ SELECT * FROM live_query_${id}_view;
+ `);
+
+ // Get the changes
+ changes = await tx.query(
+ `EXECUTE live_query_${id}_diff${stateSwitch};`,
+ );
+ });
+ break;
+ } catch (e) {
+ const msg = (e as Error).message;
+ if (
+ msg ==
+ `relation "live_query_${id}_state${stateSwitch}" does not exist`
+ ) {
+ // If the state table does not exist, reset and try again
+ // This can happen if using the multi-tab worker
+ reset = true;
+ await init();
+ continue;
+ } else {
+ throw e;
+ }
+ }
+ }
// Switch state
stateSwitch = stateSwitch === 1 ? 2 : 1;
- callback(changes!.rows);
+ callback([
+ ...(reset
+ ? [
+ {
+ __op__: "RESET" as const,
+ },
+ ]
+ : []),
+ ...changes!.rows,
+ ]);
};
// Setup the listeners
@@ -315,6 +350,10 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
...obj
} = change as typeof change & { [key: string]: any };
switch (op) {
+ case "RESET":
+ rowsMap.clear();
+ afterMap.clear();
+ break;
case "INSERT":
rowsMap.set(obj[key], obj);
afterMap.set(obj.__after__, obj[key]);
diff --git a/packages/pglite/src/live/interface.ts b/packages/pglite/src/live/interface.ts
index 9bbb7814..6c3a3661 100644
--- a/packages/pglite/src/live/interface.ts
+++ b/packages/pglite/src/live/interface.ts
@@ -77,4 +77,12 @@ export type ChangeUpdate = {} & {
__after__: number;
} & T;
-export type Change = ChangeInsert | ChangeDelete | ChangeUpdate;
+export type ChangeReset = {
+ __op__: "RESET";
+} & T;
+
+export type Change =
+ | ChangeInsert
+ | ChangeDelete
+ | ChangeUpdate
+ | ChangeReset;
diff --git a/packages/pglite/src/worker/index.ts b/packages/pglite/src/worker/index.ts
index 065f301f..ceccc9c3 100644
--- a/packages/pglite/src/worker/index.ts
+++ b/packages/pglite/src/worker/index.ts
@@ -317,29 +317,36 @@ export class PGliteWorker implements PGliteInterface {
): Promise {
await this.waitReady;
const txId = await this.#rpc("transactionStart");
- const ret = await callback({
- query: async (query, params, options) => {
- return await this.#rpc(
- "transactionQuery",
- txId,
- query,
- params,
- options,
- );
- },
- exec: async (query, options) => {
- return (await this.#rpc(
- "transactionExec",
- txId,
- query,
- options,
- )) as any;
- },
- rollback: async () => {
- await this.#rpc("transactionRollback", txId);
- },
- closed: false,
- } as Transaction);
+ let ret: T | undefined;
+ try {
+ ret = await callback({
+ query: async (query, params, options) => {
+ return await this.#rpc(
+ "transactionQuery",
+ txId,
+ query,
+ params,
+ options,
+ );
+ },
+ exec: async (query, options) => {
+ return (await this.#rpc(
+ "transactionExec",
+ txId,
+ query,
+ options,
+ )) as any;
+ },
+ rollback: async () => {
+ await this.#rpc("transactionRollback", txId);
+ },
+ closed: false,
+ } as Transaction);
+ } catch (error) {
+ console.log("Transaction error!!!!!!");
+ await this.#rpc("transactionRollback", txId);
+ throw error;
+ }
await this.#rpc("transactionCommit", txId);
return ret;
}
@@ -656,8 +663,9 @@ function makeWorkerApi(db: PGliteInterface) {
if (!transactions.has(id)) {
throw new Error("No transaction");
}
- const tx = (await transactions.get(id)!).tx;
- await tx.rollback();
+ const tx = await transactions.get(id)!;
+ await tx.tx.rollback();
+ tx.reject(new Error("Transaction rolled back"));
transactions.delete(id);
},
async execProtocol(message: Uint8Array) {