Skip to content

Commit

Permalink
Multi tab live.changes and live.incrementalQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
samwillis committed Jul 29, 2024
1 parent 7ddac83 commit c3ff9a4
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 139 deletions.
4 changes: 3 additions & 1 deletion packages/pglite/examples/worker.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@
const btnClear = document.querySelector("#clear");
btnClear.addEventListener("click", clearData);

pg.live.query(
// pg.live.query(
pg.live.incrementalQuery(
`SELECT * FROM test`,
[],
'id',
(data) => {
const output = document.getElementById("output");
output.textContent = JSON.stringify(data.rows, null, 2);
Expand Down
263 changes: 151 additions & 112 deletions packages/pglite/src/live/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
if (
msg == `prepared statement "live_query_${id}_get" does not exist`
) {
// If the prepared statement does not exist, reset and try again
// This can happen if using the multi-tab worker
if (count > MAX_RETRIES) {
throw e;
}
Expand Down Expand Up @@ -121,130 +123,163 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
let stateSwitch: 1 | 2 = 1;
let changes: Results<Change<T>>;

await pg.transaction(async (tx) => {
// Create a temporary view with the query
await tx.query(
`CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
params ?? [],
);
const init = async () => {
await pg.transaction(async (tx) => {
// Create a temporary view with the query
await tx.query(
`CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
params ?? [],
);

// Get the tables used in the view and add triggers to notify when they change
tables = await getTablesForView(tx, `live_query_${id}_view`);
await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);

// Get the columns of the view
const columns = [
...(
await tx.query<any>(`
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'live_query_${id}_view'
`)
).rows,
{ column_name: "__after__", data_type: "integer" },
];

// Init state tables as empty temp table
await tx.exec(`
CREATE TEMP TABLE live_query_${id}_state1 (LIKE live_query_${id}_view INCLUDING ALL);
CREATE TEMP TABLE live_query_${id}_state2 (LIKE live_query_${id}_view INCLUDING ALL);
`);
// Get the tables used in the view and add triggers to notify when they change
tables = await getTablesForView(tx, `live_query_${id}_view`);
await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);

// Create Diff views and prepared statements
for (const curr of [1, 2]) {
const prev = curr === 1 ? 2 : 1;
// Get the columns of the view
const columns = [
...(
await tx.query<any>(`
SELECT column_name, data_type, udt_name
FROM information_schema.columns
WHERE table_name = 'live_query_${id}_view'
`)
).rows,
{ column_name: "__after__", data_type: "integer" },
];

// Init state tables as empty temp table
await tx.exec(`
PREPARE live_query_${id}_diff${curr} AS
WITH
prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}),
curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}),
data_diff AS (
-- INSERT operations: Include all columns
SELECT
'INSERT' AS __op__,
${columns
.map(
({ column_name }) =>
`curr."${column_name}" AS "${column_name}"`,
)
.join(",\n")},
ARRAY[]::text[] AS __changed_columns__
FROM curr
LEFT JOIN prev ON curr.${key} = prev.${key}
WHERE prev.${key} IS NULL
UNION ALL
-- DELETE operations: Include only the primary key
SELECT
'DELETE' AS __op__,
${columns
.map(({ column_name, data_type }) => {
if (column_name === key) {
return `prev."${column_name}" AS "${column_name}"`;
} else {
return `NULL::${data_type} AS "${column_name}"`;
}
})
.join(",\n")},
ARRAY[]::text[] AS __changed_columns__
FROM prev
LEFT JOIN curr ON prev.${key} = curr.${key}
WHERE curr.${key} IS NULL
UNION ALL
-- UPDATE operations: Include only changed columns
SELECT
'UPDATE' AS __op__,
${columns
.map(({ column_name, data_type }) =>
column_name === key
? `curr."${column_name}" AS "${column_name}"`
: `CASE
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
THEN curr."${column_name}"
ELSE NULL::${data_type}
END AS "${column_name}"`,
)
.join(",\n")},
ARRAY(SELECT unnest FROM unnest(ARRAY[${columns
.filter(({ column_name }) => column_name !== key)
CREATE TEMP TABLE live_query_${id}_state1 (LIKE live_query_${id}_view INCLUDING ALL);
CREATE TEMP TABLE live_query_${id}_state2 (LIKE live_query_${id}_view INCLUDING ALL);
`);

// Create Diff views and prepared statements
for (const curr of [1, 2]) {
const prev = curr === 1 ? 2 : 1;
await tx.exec(`
PREPARE live_query_${id}_diff${curr} AS
WITH
prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}),
curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}),
data_diff AS (
-- INSERT operations: Include all columns
SELECT
'INSERT' AS __op__,
${columns
.map(
({ column_name }) =>
`CASE
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
THEN '${column_name}'
ELSE NULL
END`,
`curr."${column_name}" AS "${column_name}"`,
)
.join(
", ",
)}]) WHERE unnest IS NOT NULL) AS __changed_columns__
FROM curr
INNER JOIN prev ON curr.${key} = prev.${key}
WHERE NOT (curr IS NOT DISTINCT FROM prev)
)
SELECT * FROM data_diff;
`);
}
});
.join(",\n")},
ARRAY[]::text[] AS __changed_columns__
FROM curr
LEFT JOIN prev ON curr.${key} = prev.${key}
WHERE prev.${key} IS NULL
UNION ALL
-- DELETE operations: Include only the primary key
SELECT
'DELETE' AS __op__,
${columns
.map(({ column_name, data_type, udt_name }) => {
if (column_name === key) {
return `prev."${column_name}" AS "${column_name}"`;
} else {
return `NULL::${data_type == "USER-DEFINED" ? udt_name : data_type} AS "${column_name}"`;
}
})
.join(",\n")},
ARRAY[]::text[] AS __changed_columns__
FROM prev
LEFT JOIN curr ON prev.${key} = curr.${key}
WHERE curr.${key} IS NULL
UNION ALL
-- UPDATE operations: Include only changed columns
SELECT
'UPDATE' AS __op__,
${columns
.map(({ column_name, data_type, udt_name }) =>
column_name === key
? `curr."${column_name}" AS "${column_name}"`
: `CASE
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
THEN curr."${column_name}"
ELSE NULL::${data_type == "USER-DEFINED" ? udt_name : data_type}
END AS "${column_name}"`,
)
.join(",\n")},
ARRAY(SELECT unnest FROM unnest(ARRAY[${columns
.filter(({ column_name }) => column_name !== key)
.map(
({ column_name }) =>
`CASE
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
THEN '${column_name}'
ELSE NULL
END`,
)
.join(
", ",
)}]) WHERE unnest IS NOT NULL) AS __changed_columns__
FROM curr
INNER JOIN prev ON curr.${key} = prev.${key}
WHERE NOT (curr IS NOT DISTINCT FROM prev)
)
SELECT * FROM data_diff;
`);
}
});
};

const refresh = async () => {
await pg.transaction(async (tx) => {
// Populate the state table
await tx.exec(`
DELETE FROM live_query_${id}_state${stateSwitch};
INSERT INTO live_query_${id}_state${stateSwitch}
SELECT * FROM live_query_${id}_view;
`);
await init();

// Get the changes
changes = await tx.query<any>(
`EXECUTE live_query_${id}_diff${stateSwitch};`,
);
});
const refresh = async () => {
let reset = false;
for (let i = 0; i < 5; i++) {
try {
await pg.transaction(async (tx) => {
// Populate the state table
await tx.exec(`
DELETE FROM live_query_${id}_state${stateSwitch};
INSERT INTO live_query_${id}_state${stateSwitch}
SELECT * FROM live_query_${id}_view;
`);

// Get the changes
changes = await tx.query<any>(
`EXECUTE live_query_${id}_diff${stateSwitch};`,
);
});
break;
} catch (e) {
const msg = (e as Error).message;
if (
msg ==
`relation "live_query_${id}_state${stateSwitch}" does not exist`
) {
// If the state table does not exist, reset and try again
// This can happen if using the multi-tab worker
reset = true;
await init();
continue;
} else {
throw e;
}
}
}

// Switch state
stateSwitch = stateSwitch === 1 ? 2 : 1;

callback(changes!.rows);
callback([
...(reset
? [
{
__op__: "RESET" as const,
},
]
: []),
...changes!.rows,
]);
};

// Setup the listeners
Expand Down Expand Up @@ -315,6 +350,10 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
...obj
} = change as typeof change & { [key: string]: any };
switch (op) {
case "RESET":
rowsMap.clear();
afterMap.clear();
break;
case "INSERT":
rowsMap.set(obj[key], obj);
afterMap.set(obj.__after__, obj[key]);
Expand Down
10 changes: 9 additions & 1 deletion packages/pglite/src/live/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,12 @@ export type ChangeUpdate<T> = {} & {
__after__: number;
} & T;

export type Change<T> = ChangeInsert<T> | ChangeDelete<T> | ChangeUpdate<T>;
export type ChangeReset<T> = {
__op__: "RESET";
} & T;

export type Change<T> =
| ChangeInsert<T>
| ChangeDelete<T>
| ChangeUpdate<T>
| ChangeReset<T>;
58 changes: 33 additions & 25 deletions packages/pglite/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,29 +317,36 @@ export class PGliteWorker implements PGliteInterface {
): Promise<T | undefined> {
await this.waitReady;
const txId = await this.#rpc("transactionStart");
const ret = await callback({
query: async (query, params, options) => {
return await this.#rpc(
"transactionQuery",
txId,
query,
params,
options,
);
},
exec: async (query, options) => {
return (await this.#rpc(
"transactionExec",
txId,
query,
options,
)) as any;
},
rollback: async () => {
await this.#rpc("transactionRollback", txId);
},
closed: false,
} as Transaction);
let ret: T | undefined;
try {
ret = await callback({
query: async (query, params, options) => {
return await this.#rpc(
"transactionQuery",
txId,
query,
params,
options,
);
},
exec: async (query, options) => {
return (await this.#rpc(
"transactionExec",
txId,
query,
options,
)) as any;
},
rollback: async () => {
await this.#rpc("transactionRollback", txId);
},
closed: false,
} as Transaction);
} catch (error) {
console.log("Transaction error!!!!!!");
await this.#rpc("transactionRollback", txId);
throw error;
}
await this.#rpc("transactionCommit", txId);
return ret;
}
Expand Down Expand Up @@ -656,8 +663,9 @@ function makeWorkerApi(db: PGliteInterface) {
if (!transactions.has(id)) {
throw new Error("No transaction");
}
const tx = (await transactions.get(id)!).tx;
await tx.rollback();
const tx = await transactions.get(id)!;
await tx.tx.rollback();
tx.reject(new Error("Transaction rolled back"));
transactions.delete(id);
},
async execProtocol(message: Uint8Array) {
Expand Down

0 comments on commit c3ff9a4

Please sign in to comment.