Skip to content

Commit

Permalink
Working multi tab with live.query()
Browse files Browse the repository at this point in the history
  • Loading branch information
samwillis committed Jul 29, 2024
1 parent 912a1c2 commit 7ddac83
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 99 deletions.
10 changes: 9 additions & 1 deletion packages/pglite/examples/worker-process.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import { PGlite } from "../dist/index.js";
import { worker } from "../dist/worker/index.js";
import { vector } from "../dist/vector/index.js";

worker({
async init() {
return new PGlite();
const pg = new PGlite({
dataDir: "idb://worker-example",
extensions: {
vector,
},
});
// If you want run any specific setup code for the worker process, you can do it here.
return pg;
},
});

Expand Down
114 changes: 77 additions & 37 deletions packages/pglite/examples/worker.html
Original file line number Diff line number Diff line change
@@ -1,42 +1,82 @@
<script type="module">
import { PGliteWorker } from "../dist/worker/index.js";
<html>
<head>
<title>PGlite Worker Example</title>
<script type="module">
import { PGliteWorker } from "../dist/worker/index.js";
import { live } from "../dist/live/index.js";

console.log("Creating worker...");
const pg = new PGliteWorker(
new Worker(new URL("./worker-process.js", import.meta.url), {
type: "module",
}),
{
extensions: {
live,
},
}
);

console.log("Creating worker...");
const pg = new PGliteWorker(
new Worker(new URL("./worker-process.js", import.meta.url), {
type: "module",
})
);
pg.onLeaderChange(() => {
if (pg.isLeader) {
document.title = "[leader] PGlite Worker Example";
const leader = document.getElementById("leader");
leader.textContent = "true";
leader.style.color = "green";
}
});

await pg.exec(`
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS test (
id SERIAL PRIMARY KEY,
data vector(3)
);
`);

console.log("Creating table...");
await pg.exec(`
CREATE TABLE IF NOT EXISTS test (
id SERIAL PRIMARY KEY,
name TEXT
);
`);
async function insertData() {
const data = [
Math.random(),
Math.random(),
Math.random(),
];
await pg.query(
`INSERT INTO test (data) VALUES ($1)`,
[JSON.stringify(data)]
);
}

console.log("Inserting data...");
await pg.exec("INSERT INTO test (name) VALUES ('test');");
async function clearData() {
await pg.query(`DELETE FROM test`);
}

const btnInsert = document.querySelector("#insert");
btnInsert.addEventListener("click", insertData);

console.log("Selecting data...");
const res = await pg.exec(`
SELECT * FROM test;
`);
const btnClear = document.querySelector("#clear");
btnClear.addEventListener("click", clearData);

console.log(res);

// Transaction example:
console.log("Transaction example...");
await pg.transaction(async (tx) => {
await tx.exec("INSERT INTO test (name) VALUES ('test2');");
await tx.exec("INSERT INTO test (name) VALUES ('test3');");
});

console.log("Selecting data...");
const res2 = await pg.exec(`
SELECT * FROM test;
`);

console.log(res2);
</script>
pg.live.query(
`SELECT * FROM test`,
[],
(data) => {
const output = document.getElementById("output");
output.textContent = JSON.stringify(data.rows, null, 2);
}
);
</script>
</head>
<body>
<h1>PGlite Worker Example</h1>
<p>Leader: <span id="leader" style="color: red;">false</span></p>
<p>
<button id="insert">
Insert Data
</button>
<button id="clear">
Clear Data
</button>
</p>
<pre id="output"></pre>
</body>
</html>
1 change: 1 addition & 0 deletions packages/pglite/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface ExtensionSetupResult {
export type ExtensionSetup = (
pg: PGliteInterface,
emscriptenOpts: any,
clientOnly?: boolean,
) => Promise<ExtensionSetupResult>;

export interface Extension {
Expand Down
68 changes: 43 additions & 25 deletions packages/pglite/src/live/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import type {
LiveChangesReturn,
Change,
} from "./interface";
import { uuid } from "../utils.js";

const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
// Counter use to generate unique IDs for live queries
// This is used to create temporary views and so are scoped to the current connection
let liveQueryCounter = 0;
const MAX_RETRIES = 5;

const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
// The notify triggers are only ever added and never removed
// Keep track of which triggers have been added to avoid adding them multiple times
const tableNotifyTriggersAdded = new Set<string>();
Expand All @@ -26,35 +25,53 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
params: any[] | undefined | null,
callback: (results: Results<T>) => void,
) {
const id = liveQueryCounter++;
const id = uuid().replace(/-/g, "");

let results: Results<T>;
let tables: { table_name: string; schema_name: string }[];

await pg.transaction(async (tx) => {
// Create a temporary view with the query
await tx.query(
`CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
params ?? [],
);
const init = async () => {
await pg.transaction(async (tx) => {
// Create a temporary view with the query
await tx.query(
`CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
params ?? [],
);

// Get the tables used in the view and add triggers to notify when they change
tables = await getTablesForView(tx, `live_query_${id}_view`);
await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);
// Get the tables used in the view and add triggers to notify when they change
tables = await getTablesForView(tx, `live_query_${id}_view`);
await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);

// Create prepared statement to get the results
await tx.exec(`
PREPARE live_query_${id}_get AS
SELECT * FROM live_query_${id}_view;
`);
// Create prepared statement to get the results
await tx.exec(`
PREPARE live_query_${id}_get AS
SELECT * FROM live_query_${id}_view;
`);

// Get the initial results
results = await tx.query<T>(`EXECUTE live_query_${id}_get;`);
});
// Get the initial results
results = await tx.query<T>(`EXECUTE live_query_${id}_get;`);
});
};
await init();

// Function to refresh the query
const refresh = async () => {
results = await pg.query<T>(`EXECUTE live_query_${id}_get;`);
const refresh = async (count = 0) => {
try {
results = await pg.query<T>(`EXECUTE live_query_${id}_get;`);
} catch (e) {
const msg = (e as Error).message;
if (
msg == `prepared statement "live_query_${id}_get" does not exist`
) {
if (count > MAX_RETRIES) {
throw e;
}
await init();
refresh(count + 1);
} else {
throw e;
}
}
callback(results);
};

Expand Down Expand Up @@ -98,7 +115,8 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
key: string,
callback: (changes: Array<Change<T>>) => void,
) {
const id = liveQueryCounter++;
const id = uuid().replace(/-/g, "");

let tables: { table_name: string; schema_name: string }[];
let stateSwitch: 1 | 2 = 1;
let changes: Results<Change<T>>;
Expand Down
Loading

0 comments on commit 7ddac83

Please sign in to comment.