diff --git a/packages/pglite/README.md b/packages/pglite/README.md index 9074749f..6a2f3bb1 100644 --- a/packages/pglite/README.md +++ b/packages/pglite/README.md @@ -318,12 +318,33 @@ The `.query()` method can take a TypeScript type describing the expected shap ### Web Workers: -It's likely that you will want to run PGlite in a Web Worker so that it doesn't block the main thread. To aid in this we provide a `PGliteWorker` with the same API as the core `PGlite` but it runs Postgres in a dedicated Web Worker. To use, import from the `/worker` export: +It's likely that you will want to run PGlite in a Web Worker so that it doesn't block the main thread. To aid in this we provide a `PGliteWorker` with the same API as the core `PGlite` but it runs Postgres in a dedicated Web Worker. + +First you need to create a js file for your worker instance, initiate PGlite with the worker extension, and start it: + +```js +// my-pglite-worker.js +import { PGlite } from "@electric-sql/pglite"; +import { worker } from "@electric-sql/pglite/worker"; + +worker({ + async init() { + return new PGlite(); + }, +}); +``` + +Then connect the `PGliteWorker` to your new worker process: ```js import { PGliteWorker } from "@electric-sql/pglite/worker"; -const pg = new PGliteWorker('idb://my-database'); +const pg = new PGliteWorker( + new Worker(new URL("./my-pglite-worker.js", import.meta.url), { + type: "module", + }) +); + await pg.exec(` CREATE TABLE IF NOT EXISTS test ( id SERIAL PRIMARY KEY, diff --git a/packages/pglite/examples/worker-process.js b/packages/pglite/examples/worker-process.js new file mode 100644 index 00000000..ac51d9ab --- /dev/null +++ b/packages/pglite/examples/worker-process.js @@ -0,0 +1,17 @@ +import { PGlite } from "../dist/index.js"; +import { worker } from "../dist/worker/index.js"; +import { vector } from "../dist/vector/index.js"; + +worker({ + async init() { + const pg = new PGlite({ + extensions: { + vector, + }, + }); + // If you want run any specific setup code for the worker process, you can do it here. + return pg; + }, +}); + +console.log("Worker process started"); diff --git a/packages/pglite/examples/worker.html b/packages/pglite/examples/worker.html index 3769bfa1..f258c3b6 100644 --- a/packages/pglite/examples/worker.html +++ b/packages/pglite/examples/worker.html @@ -1,47 +1,84 @@ - + // pg.live.query( + pg.live.incrementalQuery( + `SELECT * FROM test`, + [], + 'id', + (data) => { + const output = document.getElementById("output"); + output.textContent = JSON.stringify(data.rows, null, 2); + } + ); + + + +

PGlite Worker Example

+

Leader: false

+

+ + +

+

+  
+
diff --git a/packages/pglite/package.json b/packages/pglite/package.json
index 663e6408..990270c7 100644
--- a/packages/pglite/package.json
+++ b/packages/pglite/package.json
@@ -59,7 +59,6 @@
     "ava": "^6.1.2",
     "buffer": "^6.0.3",
     "bun": "^1.1.18",
-    "comlink": "^4.4.1",
     "concurrently": "^8.2.2",
     "http-server": "^14.1.1",
     "pg-protocol": "^1.6.1",
diff --git a/packages/pglite/src/extensionUtils.ts b/packages/pglite/src/extensionUtils.ts
index 02dcc071..3e7a3c49 100644
--- a/packages/pglite/src/extensionUtils.ts
+++ b/packages/pglite/src/extensionUtils.ts
@@ -35,7 +35,7 @@ export async function loadExtensionBundle(
     const response = await fetch(bundlePath.toString());
     if (!response.ok || !response.body) {
       return null;
-    } else if (response.headers.get('Content-Encoding') === 'gzip') {
+    } else if (response.headers.get("Content-Encoding") === "gzip") {
       // Although the bundle is manually compressed, some servers will recognize
       // that and add a content-encoding header. Fetch will then automatically
       // decompress the response.
diff --git a/packages/pglite/src/interface.ts b/packages/pglite/src/interface.ts
index 36aa2fff..e6ad441d 100644
--- a/packages/pglite/src/interface.ts
+++ b/packages/pglite/src/interface.ts
@@ -32,6 +32,7 @@ export interface ExtensionSetupResult {
 export type ExtensionSetup = (
   pg: PGliteInterface,
   emscriptenOpts: any,
+  clientOnly?: boolean,
 ) => Promise;
 
 export interface Extension {
diff --git a/packages/pglite/src/live/index.ts b/packages/pglite/src/live/index.ts
index 7a68281f..4b8ce368 100644
--- a/packages/pglite/src/live/index.ts
+++ b/packages/pglite/src/live/index.ts
@@ -10,12 +10,11 @@ import type {
   LiveChangesReturn,
   Change,
 } from "./interface";
+import { uuid } from "../utils.js";
 
-const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
-  // Counter use to generate unique IDs for live queries
-  // This is used to create temporary views and so are scoped to the current connection
-  let liveQueryCounter = 0;
+const MAX_RETRIES = 5;
 
+const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
   // The notify triggers are only ever added and never removed
   // Keep track of which triggers have been added to avoid adding them multiple times
   const tableNotifyTriggersAdded = new Set();
@@ -24,37 +23,57 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
     async query(
       query: string,
       params: any[] | undefined | null,
-      callback: (results: Results) => void
+      callback: (results: Results) => void,
     ) {
-      const id = liveQueryCounter++;
+      const id = uuid().replace(/-/g, "");
 
       let results: Results;
       let tables: { table_name: string; schema_name: string }[];
 
-      await pg.transaction(async (tx) => {
-        // Create a temporary view with the query
-        await tx.query(
-          `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
-          params ?? []
-        );
+      const init = async () => {
+        await pg.transaction(async (tx) => {
+          // Create a temporary view with the query
+          await tx.query(
+            `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
+            params ?? [],
+          );
 
-        // Get the tables used in the view and add triggers to notify when they change
-        tables = await getTablesForView(tx, `live_query_${id}_view`);
-        await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);
+          // Get the tables used in the view and add triggers to notify when they change
+          tables = await getTablesForView(tx, `live_query_${id}_view`);
+          await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);
 
-        // Create prepared statement to get the results
-        await tx.exec(`
-          PREPARE live_query_${id}_get AS
-          SELECT * FROM live_query_${id}_view;
-        `);
+          // Create prepared statement to get the results
+          await tx.exec(`
+            PREPARE live_query_${id}_get AS
+            SELECT * FROM live_query_${id}_view;
+          `);
 
-        // Get the initial results
-        results = await tx.query(`EXECUTE live_query_${id}_get;`);
-      });
+          // Get the initial results
+          results = await tx.query(`EXECUTE live_query_${id}_get;`);
+        });
+      };
+      await init();
 
       // Function to refresh the query
-      const refresh = async () => {
-        results = await pg.query(`EXECUTE live_query_${id}_get;`);
+      const refresh = async (count = 0) => {
+        try {
+          results = await pg.query(`EXECUTE live_query_${id}_get;`);
+        } catch (e) {
+          const msg = (e as Error).message;
+          if (
+            msg == `prepared statement "live_query_${id}_get" does not exist`
+          ) {
+            // If the prepared statement does not exist, reset and try again
+            // This can happen if using the multi-tab worker
+            if (count > MAX_RETRIES) {
+              throw e;
+            }
+            await init();
+            refresh(count + 1);
+          } else {
+            throw e;
+          }
+        }
         callback(results);
       };
 
@@ -65,7 +84,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
           `table_change__${table.schema_name}__${table.table_name}`,
           async () => {
             refresh();
-          }
+          },
         );
         unsubList.push(unsub);
       }
@@ -96,137 +115,171 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
       query: string,
       params: any[] | undefined | null,
       key: string,
-      callback: (changes: Array>) => void
+      callback: (changes: Array>) => void,
     ) {
-      const id = liveQueryCounter++;
+      const id = uuid().replace(/-/g, "");
+
       let tables: { table_name: string; schema_name: string }[];
       let stateSwitch: 1 | 2 = 1;
       let changes: Results>;
 
-      await pg.transaction(async (tx) => {
-        // Create a temporary view with the query
-        await tx.query(
-          `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
-          params ?? []
-        );
-
-        // Get the tables used in the view and add triggers to notify when they change
-        tables = await getTablesForView(tx, `live_query_${id}_view`);
-        await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);
-
-        // Get the columns of the view
-        const columns = [
-          ...(
-            await tx.query(`
-              SELECT column_name, data_type 
-              FROM information_schema.columns 
-              WHERE table_name = 'live_query_${id}_view'
-            `)
-          ).rows,
-          { column_name: "__after__", data_type: "integer" },
-        ];
-
-        // Init state tables as empty temp table
-        await tx.exec(`
-          CREATE TEMP TABLE live_query_${id}_state1 (LIKE live_query_${id}_view INCLUDING ALL);
-          CREATE TEMP TABLE live_query_${id}_state2 (LIKE live_query_${id}_view INCLUDING ALL);
-        `);
+      const init = async () => {
+        await pg.transaction(async (tx) => {
+          // Create a temporary view with the query
+          await tx.query(
+            `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
+            params ?? [],
+          );
 
-        // Create Diff views and prepared statements
-        for (const curr of [1, 2]) {
-          const prev = curr === 1 ? 2 : 1;
+          // Get the tables used in the view and add triggers to notify when they change
+          tables = await getTablesForView(tx, `live_query_${id}_view`);
+          await addNotifyTriggersToTables(tx, tables, tableNotifyTriggersAdded);
+
+          // Get the columns of the view
+          const columns = [
+            ...(
+              await tx.query(`
+                SELECT column_name, data_type, udt_name
+                FROM information_schema.columns 
+                WHERE table_name = 'live_query_${id}_view'
+              `)
+            ).rows,
+            { column_name: "__after__", data_type: "integer" },
+          ];
+
+          // Init state tables as empty temp table
           await tx.exec(`
-            PREPARE live_query_${id}_diff${curr} AS
-            WITH
-              prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}),
-              curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}),
-              data_diff AS (
-                -- INSERT operations: Include all columns
-                SELECT 
-                  'INSERT' AS __op__,
-                  ${columns
-                    .map(
-                      ({ column_name }) =>
-                        `curr."${column_name}" AS "${column_name}"`
-                    )
-                    .join(",\n")},
-                  ARRAY[]::text[] AS __changed_columns__
-                FROM curr
-                LEFT JOIN prev ON curr.${key} = prev.${key}
-                WHERE prev.${key} IS NULL
-              UNION ALL
-                -- DELETE operations: Include only the primary key
-                SELECT 
-                  'DELETE' AS __op__,
-                  ${columns
-                    .map(({ column_name, data_type }) => {
-                      if (column_name === key) {
-                        return `prev."${column_name}" AS "${column_name}"`;
-                      } else {
-                        return `NULL::${data_type} AS "${column_name}"`;
-                      }
-                    })
-                    .join(",\n")},
-                    ARRAY[]::text[] AS __changed_columns__
-                FROM prev
-                LEFT JOIN curr ON prev.${key} = curr.${key}
-                WHERE curr.${key} IS NULL
-              UNION ALL
-                -- UPDATE operations: Include only changed columns
-                SELECT 
-                  'UPDATE' AS __op__,
-                  ${columns
-                    .map(({ column_name, data_type }) =>
-                      column_name === key
-                        ? `curr."${column_name}" AS "${column_name}"`
-                        : `CASE 
-                            WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" 
-                            THEN curr."${column_name}"
-                            ELSE NULL::${data_type} 
-                            END AS "${column_name}"`
-                    )
-                    .join(",\n")},
-                    ARRAY(SELECT unnest FROM unnest(ARRAY[${columns
-                      .filter(({ column_name }) => column_name !== key)
+            CREATE TEMP TABLE live_query_${id}_state1 (LIKE live_query_${id}_view INCLUDING ALL);
+            CREATE TEMP TABLE live_query_${id}_state2 (LIKE live_query_${id}_view INCLUDING ALL);
+          `);
+
+          // Create Diff views and prepared statements
+          for (const curr of [1, 2]) {
+            const prev = curr === 1 ? 2 : 1;
+            await tx.exec(`
+              PREPARE live_query_${id}_diff${curr} AS
+              WITH
+                prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}),
+                curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}),
+                data_diff AS (
+                  -- INSERT operations: Include all columns
+                  SELECT 
+                    'INSERT' AS __op__,
+                    ${columns
                       .map(
                         ({ column_name }) =>
-                          `CASE
-                            WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" 
-                            THEN '${column_name}' 
-                            ELSE NULL 
-                            END`
+                          `curr."${column_name}" AS "${column_name}"`,
                       )
-                      .join(
-                        ", "
-                      )}]) WHERE unnest IS NOT NULL) AS __changed_columns__
-                FROM curr
-                INNER JOIN prev ON curr.${key} = prev.${key}
-                WHERE NOT (curr IS NOT DISTINCT FROM prev)
-              )
-            SELECT * FROM data_diff;
-          `);
-        }
-      });
+                      .join(",\n")},
+                    ARRAY[]::text[] AS __changed_columns__
+                  FROM curr
+                  LEFT JOIN prev ON curr.${key} = prev.${key}
+                  WHERE prev.${key} IS NULL
+                UNION ALL
+                  -- DELETE operations: Include only the primary key
+                  SELECT 
+                    'DELETE' AS __op__,
+                    ${columns
+                      .map(({ column_name, data_type, udt_name }) => {
+                        if (column_name === key) {
+                          return `prev."${column_name}" AS "${column_name}"`;
+                        } else {
+                          return `NULL::${data_type == "USER-DEFINED" ? udt_name : data_type} AS "${column_name}"`;
+                        }
+                      })
+                      .join(",\n")},
+                      ARRAY[]::text[] AS __changed_columns__
+                  FROM prev
+                  LEFT JOIN curr ON prev.${key} = curr.${key}
+                  WHERE curr.${key} IS NULL
+                UNION ALL
+                  -- UPDATE operations: Include only changed columns
+                  SELECT 
+                    'UPDATE' AS __op__,
+                    ${columns
+                      .map(({ column_name, data_type, udt_name }) =>
+                        column_name === key
+                          ? `curr."${column_name}" AS "${column_name}"`
+                          : `CASE 
+                              WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" 
+                              THEN curr."${column_name}"
+                              ELSE NULL::${data_type == "USER-DEFINED" ? udt_name : data_type} 
+                              END AS "${column_name}"`,
+                      )
+                      .join(",\n")},
+                      ARRAY(SELECT unnest FROM unnest(ARRAY[${columns
+                        .filter(({ column_name }) => column_name !== key)
+                        .map(
+                          ({ column_name }) =>
+                            `CASE
+                              WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" 
+                              THEN '${column_name}' 
+                              ELSE NULL 
+                              END`,
+                        )
+                        .join(
+                          ", ",
+                        )}]) WHERE unnest IS NOT NULL) AS __changed_columns__
+                  FROM curr
+                  INNER JOIN prev ON curr.${key} = prev.${key}
+                  WHERE NOT (curr IS NOT DISTINCT FROM prev)
+                )
+              SELECT * FROM data_diff;
+            `);
+          }
+        });
+      };
 
-      const refresh = async () => {
-        await pg.transaction(async (tx) => {
-          // Populate the state table
-          await tx.exec(`
-            DELETE FROM live_query_${id}_state${stateSwitch};
-            INSERT INTO live_query_${id}_state${stateSwitch} 
-              SELECT * FROM live_query_${id}_view;
-          `);
+      await init();
 
-          // Get the changes
-          changes = await tx.query(
-            `EXECUTE live_query_${id}_diff${stateSwitch};`
-          );
-        });
+      const refresh = async () => {
+        let reset = false;
+        for (let i = 0; i < 5; i++) {
+          try {
+            await pg.transaction(async (tx) => {
+              // Populate the state table
+              await tx.exec(`
+                DELETE FROM live_query_${id}_state${stateSwitch};
+                INSERT INTO live_query_${id}_state${stateSwitch} 
+                  SELECT * FROM live_query_${id}_view;
+              `);
+
+              // Get the changes
+              changes = await tx.query(
+                `EXECUTE live_query_${id}_diff${stateSwitch};`,
+              );
+            });
+            break;
+          } catch (e) {
+            const msg = (e as Error).message;
+            if (
+              msg ==
+              `relation "live_query_${id}_state${stateSwitch}" does not exist`
+            ) {
+              // If the state table does not exist, reset and try again
+              // This can happen if using the multi-tab worker
+              reset = true;
+              await init();
+              continue;
+            } else {
+              throw e;
+            }
+          }
+        }
 
         // Switch state
         stateSwitch = stateSwitch === 1 ? 2 : 1;
 
-        callback(changes!.rows);
+        callback([
+          ...(reset
+            ? [
+                {
+                  __op__: "RESET" as const,
+                },
+              ]
+            : []),
+          ...changes!.rows,
+        ]);
       };
 
       // Setup the listeners
@@ -236,7 +289,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
           `table_change__${table.schema_name}__${table.table_name}`,
           async () => {
             refresh();
-          }
+          },
         );
         unsubList.push(unsub);
       }
@@ -261,7 +314,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
       // Fields
       const fields = changes!.fields.filter(
         (field) =>
-          !["__after__", "__op__", "__changed_columns__"].includes(field.name)
+          !["__after__", "__op__", "__changed_columns__"].includes(field.name),
       );
 
       // Return the initial results
@@ -277,7 +330,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
       query: string,
       params: any[] | undefined | null,
       key: string,
-      callback: (results: Results>) => void
+      callback: (results: Results>) => void,
     ) {
       const rowsMap: Map = new Map();
       const afterMap: Map = new Map();
@@ -297,6 +350,10 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
               ...obj
             } = change as typeof change & { [key: string]: any };
             switch (op) {
+              case "RESET":
+                rowsMap.clear();
+                afterMap.clear();
+                break;
               case "INSERT":
                 rowsMap.set(obj[key], obj);
                 afterMap.set(obj.__after__, obj[key]);
@@ -340,7 +397,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
               fields,
             });
           }
-        }
+        },
       );
 
       firstRun = false;
@@ -378,7 +435,7 @@ export const live = {
  */
 async function getTablesForView(
   tx: Transaction | PGliteInterface,
-  viewName: string
+  viewName: string,
 ): Promise<{ table_name: string; schema_name: string }[]> {
   return (
     await tx.query<{
@@ -399,7 +456,7 @@ async function getTablesForView(
         )
         AND d.deptype = 'n';
       `,
-      [viewName]
+      [viewName],
     )
   ).rows.filter((row) => row.table_name !== viewName);
 }
@@ -412,14 +469,14 @@ async function getTablesForView(
 async function addNotifyTriggersToTables(
   tx: Transaction | PGliteInterface,
   tables: { table_name: string; schema_name: string }[],
-  tableNotifyTriggersAdded: Set
+  tableNotifyTriggersAdded: Set,
 ) {
   const triggers = tables
     .filter(
       (table) =>
         !tableNotifyTriggersAdded.has(
-          `${table.schema_name}_${table.table_name}`
-        )
+          `${table.schema_name}_${table.table_name}`,
+        ),
     )
     .map((table) => {
       return `
@@ -439,6 +496,6 @@ async function addNotifyTriggersToTables(
     await tx.exec(triggers);
   }
   tables.map((table) =>
-    tableNotifyTriggersAdded.add(`${table.schema_name}_${table.table_name}`)
+    tableNotifyTriggersAdded.add(`${table.schema_name}_${table.table_name}`),
   );
 }
diff --git a/packages/pglite/src/live/interface.ts b/packages/pglite/src/live/interface.ts
index 9bbb7814..6c3a3661 100644
--- a/packages/pglite/src/live/interface.ts
+++ b/packages/pglite/src/live/interface.ts
@@ -77,4 +77,12 @@ export type ChangeUpdate = {} & {
   __after__: number;
 } & T;
 
-export type Change = ChangeInsert | ChangeDelete | ChangeUpdate;
+export type ChangeReset = {
+  __op__: "RESET";
+} & T;
+
+export type Change =
+  | ChangeInsert
+  | ChangeDelete
+  | ChangeUpdate
+  | ChangeReset;
diff --git a/packages/pglite/src/pglite.ts b/packages/pglite/src/pglite.ts
index 7df875e4..371057ff 100644
--- a/packages/pglite/src/pglite.ts
+++ b/packages/pglite/src/pglite.ts
@@ -110,13 +110,28 @@ export class PGlite implements PGliteInterface {
     // Save the extensions for later use
     this.#extensions = options.extensions ?? {};
 
-    // Save the extensions for later use
-    this.#extensions = options.extensions ?? {};
-
     // Initialize the database, and store the promise so we can wait for it to be ready
     this.waitReady = this.#init(options ?? {});
   }
 
+  /**
+   * Create a new PGlite instance with extensions on the Typescript interface
+   * (The main constructor does enable extensions, however due to the limitations
+   * of Typescript, the extensions are not available on the instance interface)
+   * @param dataDir The directory to store the database files
+   *                Prefix with idb:// to use indexeddb filesystem in the browser
+   *                Use memory:// to use in-memory filesystem
+   * @param options Optional options
+   * @returns A promise that resolves to the PGlite instance when it's ready.
+   */
+  static async create(
+    options?: O,
+  ): Promise> {
+    const pg = new PGlite(options);
+    await pg.waitReady;
+    return pg as any;
+  }
+
   /**
    * Initialize the database
    * @returns A promise that resolves when the database is ready
@@ -746,22 +761,6 @@ export class PGlite implements PGliteInterface {
     this.#globalNotifyListeners.delete(callback);
   }
 
-  /**
-   * Create a new PGlite instance with extensions on the Typescript interface
-   * (The main constructor does enable extensions, however due to the limitations
-   * of Typescript, the extensions are not available on the instance interface)
-   * @param dataDir The directory to store the database files
-   *                Prefix with idb:// to use indexeddb filesystem in the browser
-   *                Use memory:// to use in-memory filesystem
-   * @param options Optional options
-   * @returns A new PGlite instance with extensions
-   */
-  static withExtensions(
-    options?: O,
-  ): PGlite & PGliteInterfaceExtensions {
-    return new PGlite(options) as any;
-  }
-
   /**
    * Dump the PGDATA dir from the filesystem to a gziped tarball.
    * @returns The tarball as a File object where available, and fallback to a Blob
diff --git a/packages/pglite/src/utils.ts b/packages/pglite/src/utils.ts
index 89c29356..b023fd57 100644
--- a/packages/pglite/src/utils.ts
+++ b/packages/pglite/src/utils.ts
@@ -29,3 +29,42 @@ export async function makeLocateFile() {
     return url?.toString() ?? "";
   };
 }
+
+export const uuid = (): string => {
+  // best case, `crypto.randomUUID` is available
+  if (globalThis.crypto?.randomUUID) {
+    return globalThis.crypto.randomUUID();
+  }
+
+  const bytes = new Uint8Array(16);
+
+  if (globalThis.crypto?.getRandomValues) {
+    // `crypto.getRandomValues` is available even in non-secure contexts
+    globalThis.crypto.getRandomValues(bytes);
+  } else {
+    // fallback to Math.random, if the Crypto API is completely missing
+    for (let i = 0; i < bytes.length; i++) {
+      bytes[i] = Math.floor(Math.random() * 256);
+    }
+  }
+
+  bytes[6] = (bytes[6] & 0x0f) | 0x40; // Set the 4 most significant bits to 0100
+  bytes[8] = (bytes[8] & 0x3f) | 0x80; // Set the 2 most significant bits to 10
+
+  const hexValues: string[] = [];
+  bytes.forEach((byte) => {
+    hexValues.push(byte.toString(16).padStart(2, "0"));
+  });
+
+  return (
+    hexValues.slice(0, 4).join("") +
+    "-" +
+    hexValues.slice(4, 6).join("") +
+    "-" +
+    hexValues.slice(6, 8).join("") +
+    "-" +
+    hexValues.slice(8, 10).join("") +
+    "-" +
+    hexValues.slice(10).join("")
+  );
+};
diff --git a/packages/pglite/src/worker/index.ts b/packages/pglite/src/worker/index.ts
index 4358a0d4..15f205b2 100644
--- a/packages/pglite/src/worker/index.ts
+++ b/packages/pglite/src/worker/index.ts
@@ -1,106 +1,415 @@
-import * as Comlink from "comlink";
 import type {
+  DebugLevel,
+  Extensions,
   PGliteInterface,
+  PGliteInterfaceExtensions,
   PGliteOptions,
-  FilesystemType,
-  DebugLevel,
-  Results,
   QueryOptions,
+  Results,
+  Transaction,
 } from "../interface.js";
+import { uuid } from "../utils.js";
 import type { BackendMessage } from "pg-protocol/dist/messages.js";
-import { parseDataDir } from "../fs/index.js";
-import type { Worker as WorkerInterface } from "./process.js";
+
+export type PGliteWorkerOptions = PGliteOptions & {
+  meta?: any;
+  id?: string;
+};
 
 export class PGliteWorker implements PGliteInterface {
-  readonly dataDir?: string;
-  // @ts-ignore
-  readonly fsType: FilesystemType;
-  readonly waitReady: Promise;
-  readonly debug: DebugLevel = 0;
+  #initPromise: Promise;
+  #debug: DebugLevel = 0;
 
   #ready = false;
   #closed = false;
+  #isLeader = false;
+
+  #eventTarget = new EventTarget();
 
-  #worker: WorkerInterface;
-  #options: PGliteOptions;
+  #tabId: string;
+
+  #connected = false;
+
+  #workerProcess: Worker;
+  #workerID?: string;
+  #workerHerePromise?: Promise;
+  #workerReadyPromise?: Promise;
+
+  #broadcastChannel?: BroadcastChannel;
+  #tabChannel?: BroadcastChannel;
+  #releaseTabCloseLock?: () => void;
 
   #notifyListeners = new Map void>>();
   #globalNotifyListeners = new Set<
     (channel: string, payload: string) => void
   >();
 
-  constructor(dataDir: string, options?: PGliteOptions) {
-    const { dataDir: dir, fsType } = parseDataDir(dataDir);
-    this.dataDir = dir;
-    // @ts-ignore
-    this.fsType = fsType;
-    this.#options = options ?? {};
-    this.debug = options?.debug ?? 0;
-
-    this.#worker = Comlink.wrap(
-      // the below syntax is required by webpack in order to
-      // identify the worker properly during static analysis
-      // see: https://webpack.js.org/guides/web-workers/
-      new Worker(new URL("./process.js", import.meta.url), { type: "module" }),
-    );
+  #extensions: Extensions;
+  #extensionsClose: Array<() => Promise> = [];
+
+  constructor(worker: Worker, options?: PGliteWorkerOptions) {
+    this.#workerProcess = worker;
+    this.#tabId = uuid();
+    this.#extensions = options?.extensions ?? {};
+
+    this.#workerHerePromise = new Promise((resolve) => {
+      this.#workerProcess.addEventListener(
+        "message",
+        (event) => {
+          if (event.data.type === "here") {
+            resolve();
+          } else {
+            throw new Error("Invalid message");
+          }
+        },
+        { once: true },
+      );
+    });
+
+    this.#workerReadyPromise = new Promise((resolve) => {
+      const callback = (event: MessageEvent) => {
+        if (event.data.type === "ready") {
+          this.#workerID = event.data.id;
+          this.#workerProcess.removeEventListener("message", callback);
+          resolve();
+        }
+      };
+      this.#workerProcess.addEventListener("message", callback);
+    });
+
+    this.#initPromise = this.#init(options);
+  }
+
+  /**
+   * Create a new PGlite instance with extensions on the Typescript interface
+   * This also awaits the instance to be ready before resolving
+   * (The main constructor does enable extensions, however due to the limitations
+   * of Typescript, the extensions are not available on the instance interface)
+   * @param worker The worker to use
+   * @param options Optional options
+   * @returns A promise that resolves to the PGlite instance when it's ready.
+   */
+  static async create(
+    worker: Worker,
+    options?: O,
+  ): Promise> {
+    const pg = new PGliteWorker(worker, options);
+    await pg.#initPromise;
+    return pg as PGliteWorker & PGliteInterfaceExtensions;
+  }
+
+  async #init(options: PGliteWorkerOptions = {}) {
+    // Setup the extensions
+    for (const [extName, ext] of Object.entries(this.#extensions)) {
+      if (ext instanceof URL) {
+        throw new Error(
+          "URL extensions are not supported on the client side of a worker",
+        );
+      } else {
+        const extRet = await ext.setup(this, {}, true);
+        if (extRet.emscriptenOpts) {
+          console.warn(
+            `PGlite extension ${extName} returned emscriptenOpts, these are not supported on the client side of a worker`,
+          );
+        }
+        if (extRet.namespaceObj) {
+          (this as any)[extName] = extRet.namespaceObj;
+        }
+        if (extRet.bundlePath) {
+          console.warn(
+            `PGlite extension ${extName} returned bundlePath, this is not supported on the client side of a worker`,
+          );
+        }
+        if (extRet.init) {
+          await extRet.init();
+        }
+        if (extRet.close) {
+          this.#extensionsClose.push(extRet.close);
+        }
+      }
+    }
+
+    // Wait for the worker let us know it's here
+    await this.#workerHerePromise;
+
+    // Send the worker the options
+    const { extensions, ...workerOptions } = options;
+    this.#workerProcess.postMessage({
+      type: "init",
+      options: workerOptions,
+    });
+
+    // Wait for the worker let us know it's ready
+    await this.#workerReadyPromise;
+
+    // Acquire the tab close lock, this is released then the tab, or this
+    // PGliteWorker instance, is closed
+    const tabCloseLockId = `pglite-tab-close:${this.#tabId}`;
+    this.#releaseTabCloseLock = await acquireLock(tabCloseLockId);
 
-    // pass unparsed dataDir value
-    this.waitReady = this.#init(dataDir);
+    // Start the broadcast channel used to communicate with tabs and leader election
+    const broadcastChannelId = `pglite-broadcast:${this.#workerID}`;
+    this.#broadcastChannel = new BroadcastChannel(broadcastChannelId);
+
+    // Start the tab channel used to communicate with the leader directly
+    const tabChannelId = `pglite-tab:${this.#tabId}`;
+    this.#tabChannel = new BroadcastChannel(tabChannelId);
+
+    this.#broadcastChannel.addEventListener("message", async (event) => {
+      if (event.data.type === "leader-here") {
+        this.#connected = false;
+        this.#eventTarget.dispatchEvent(new Event("leader-change"));
+        this.#leaderNotifyLoop();
+      } else if (event.data.type === "notify") {
+        this.#receiveNotification(event.data.channel, event.data.payload);
+      }
+    });
+
+    this.#tabChannel.addEventListener("message", async (event) => {
+      if (event.data.type === "connected") {
+        this.#connected = true;
+        this.#eventTarget.dispatchEvent(new Event("connected"));
+        this.#debug = await this.#rpc("getDebugLevel");
+        this.#ready = true;
+      }
+    });
+
+    this.#workerProcess.addEventListener("message", async (event) => {
+      if (event.data.type === "leader-now") {
+        this.#isLeader = true;
+        this.#eventTarget.dispatchEvent(new Event("leader-change"));
+      }
+    });
+
+    this.#leaderNotifyLoop();
+  }
+
+  async #leaderNotifyLoop() {
+    if (!this.#connected) {
+      this.#broadcastChannel!.postMessage({
+        type: "tab-here",
+        id: this.#tabId,
+      });
+      setTimeout(() => this.#leaderNotifyLoop(), 16);
+    }
   }
 
-  async #init(dataDir: string) {
-    await this.#worker.init(
-      dataDir,
-      this.#options,
-      Comlink.proxy(this.receiveNotification.bind(this)),
+  async #rpc(
+    method: Method,
+    ...args: Parameters
+  ): Promise> {
+    const callId = uuid();
+    const message: WorkerRpcCall = {
+      type: "rpc-call",
+      callId,
+      method,
+      args,
+    };
+    this.#tabChannel!.postMessage(message);
+    return await new Promise>(
+      (resolve, reject) => {
+        const listener = (event: MessageEvent) => {
+          if (event.data.callId !== callId) return;
+          cleanup();
+          const message: WorkerRpcResponse = event.data;
+          if (message.type === "rpc-return") {
+            resolve(message.result);
+          } else if (message.type === "rpc-error") {
+            const error = new Error(message.error.message);
+            Object.assign(error, message.error);
+            reject(error);
+          } else {
+            reject(new Error("Invalid message"));
+          }
+        };
+        const leaderChangeListener = () => {
+          // If the leader changes, throw an error to reject the promise
+          cleanup();
+          reject(new LeaderChangedError());
+        };
+        const cleanup = () => {
+          this.#tabChannel!.removeEventListener("message", listener);
+          this.#eventTarget.removeEventListener(
+            "leader-change",
+            leaderChangeListener,
+          );
+        };
+        this.#eventTarget.addEventListener(
+          "leader-change",
+          leaderChangeListener,
+        );
+        this.#tabChannel!.addEventListener("message", listener);
+      },
     );
-    this.#ready = true;
   }
 
+  get waitReady() {
+    return new Promise(async (resolve) => {
+      await this.#initPromise;
+      if (!this.#connected) {
+        resolve(
+          new Promise((resolve) => {
+            this.#eventTarget.addEventListener("connected", () => {
+              resolve();
+            });
+          }),
+        );
+      } else {
+        resolve();
+      }
+    });
+  }
+
+  get debug() {
+    return this.#debug;
+  }
+
+  /**
+   * The ready state of the database
+   */
   get ready() {
     return this.#ready;
   }
 
+  /**
+   * The closed state of the database
+   */
   get closed() {
     return this.#closed;
   }
 
+  /**
+   * The leader state of this tab
+   */
+  get isLeader() {
+    return this.#isLeader;
+  }
+
+  /**
+   * Close the database
+   * @returns Promise that resolves when the connection to shared PGlite is closed
+   */
   async close() {
-    await this.#worker.close();
+    if (this.#closed) {
+      return;
+    }
     this.#closed = true;
+    this.#broadcastChannel?.close();
+    this.#tabChannel?.close();
+    this.#releaseTabCloseLock?.();
+    this.#workerProcess.terminate();
   }
 
+  /**
+   * Execute a single SQL statement
+   * This uses the "Extended Query" postgres wire protocol message.
+   * @param query The query to execute
+   * @param params Optional parameters for the query
+   * @returns The result of the query
+   */
   async query(
     query: string,
     params?: any[],
     options?: QueryOptions,
   ): Promise> {
-    return this.#worker.query(query, params, options) as Promise>;
+    await this.waitReady;
+    return (await this.#rpc("query", query, params, options)) as Results;
   }
 
+  /**
+   * Execute a SQL query, this can have multiple statements.
+   * This uses the "Simple Query" postgres wire protocol message.
+   * @param query The query to execute
+   * @returns The result of the query
+   */
   async exec(query: string, options?: QueryOptions): Promise> {
-    return this.#worker.exec(query, options);
+    await this.waitReady;
+    return (await this.#rpc("exec", query, options)) as Array;
   }
 
-  async transaction(callback: (tx: any) => Promise) {
-    const callbackProxy = Comlink.proxy(callback);
-    return this.#worker.transaction(callbackProxy);
+  /**
+   * Execute a transaction
+   * @param callback A callback function that takes a transaction object
+   * @returns The result of the transaction
+   */
+  async transaction(
+    callback: (tx: Transaction) => Promise,
+  ): Promise {
+    await this.waitReady;
+    const txId = await this.#rpc("transactionStart");
+    let ret: T | undefined;
+    try {
+      ret = await callback({
+        query: async (query, params, options) => {
+          return await this.#rpc(
+            "transactionQuery",
+            txId,
+            query,
+            params,
+            options,
+          );
+        },
+        exec: async (query, options) => {
+          return (await this.#rpc(
+            "transactionExec",
+            txId,
+            query,
+            options,
+          )) as any;
+        },
+        rollback: async () => {
+          await this.#rpc("transactionRollback", txId);
+        },
+        closed: false,
+      } as Transaction);
+    } catch (error) {
+      await this.#rpc("transactionRollback", txId);
+      throw error;
+    }
+    await this.#rpc("transactionCommit", txId);
+    return ret;
   }
 
+  /**
+   * Execute a postgres wire protocol message directly without wrapping the response.
+   * Only use if `execProtocol()` doesn't suite your needs.
+   *
+   * **Warning:** This bypasses PGlite's protocol wrappers that manage error/notice messages,
+   * transactions, and notification listeners. Only use if you need to bypass these wrappers and
+   * don't intend to use the above features.
+   *
+   * @param message The postgres wire protocol message to execute
+   * @returns The direct message data response produced by Postgres
+   */
   async execProtocolRaw(message: Uint8Array): Promise {
-    return this.#worker.execProtocolRaw(message);
+    await this.waitReady;
+    return (await this.#rpc("execProtocolRaw", message)) as Uint8Array;
   }
 
+  /**
+   * Execute a postgres wire protocol message
+   * @param message The postgres wire protocol message to execute
+   * @returns The result of the query
+   */
   async execProtocol(
     message: Uint8Array,
   ): Promise> {
-    return this.#worker.execProtocol(message);
+    await this.waitReady;
+    return (await this.#rpc("execProtocol", message)) as Array<
+      [BackendMessage, Uint8Array]
+    >;
   }
 
+  /**
+   * Listen for a notification
+   * @param channel The channel to listen on
+   * @param callback The callback to call when a notification is received
+   */
   async listen(
     channel: string,
     callback: (payload: string) => void,
   ): Promise<() => Promise> {
+    await this.waitReady;
     if (!this.#notifyListeners.has(channel)) {
       this.#notifyListeners.set(channel, new Set());
     }
@@ -111,10 +420,16 @@ export class PGliteWorker implements PGliteInterface {
     };
   }
 
+  /**
+   * Stop listening for a notification
+   * @param channel The channel to stop listening on
+   * @param callback The callback to remove
+   */
   async unlisten(
     channel: string,
     callback?: (payload: string) => void,
   ): Promise {
+    await this.waitReady;
     if (callback) {
       this.#notifyListeners.get(channel)?.delete(callback);
     } else {
@@ -126,6 +441,10 @@ export class PGliteWorker implements PGliteInterface {
     }
   }
 
+  /**
+   * Listen to notifications
+   * @param callback The callback to call when a notification is received
+   */
   onNotification(callback: (channel: string, payload: string) => void) {
     this.#globalNotifyListeners.add(callback);
     return () => {
@@ -133,11 +452,15 @@ export class PGliteWorker implements PGliteInterface {
     };
   }
 
+  /**
+   * Stop listening to notifications
+   * @param callback The callback to remove
+   */
   offNotification(callback: (channel: string, payload: string) => void) {
     this.#globalNotifyListeners.delete(callback);
   }
 
-  receiveNotification(channel: string, payload: string) {
+  #receiveNotification(channel: string, payload: string) {
     const listeners = this.#notifyListeners.get(channel);
     if (listeners) {
       for (const listener of listeners) {
@@ -149,7 +472,291 @@ export class PGliteWorker implements PGliteInterface {
     }
   }
 
-  async dumpDataDir() {
-    return this.#worker.dumpDataDir();
+  async dumpDataDir(): Promise {
+    return (await this.#rpc("dumpDataDir")) as File | Blob;
+  }
+
+  onLeaderChange(callback: () => void) {
+    this.#eventTarget.addEventListener("leader-change", callback);
+    return () => {
+      this.#eventTarget.removeEventListener("leader-change", callback);
+    };
+  }
+
+  offLeaderChange(callback: () => void) {
+    this.#eventTarget.removeEventListener("leader-change", callback);
+  }
+}
+
+export interface WorkerOptions {
+  init: (
+    options: Exclude,
+  ) => Promise;
+}
+
+export async function worker({ init }: WorkerOptions) {
+  // Send a message to the main thread to let it know we are here
+  postMessage({ type: "here" });
+
+  // Await the main thread to send us the options
+  const options = await new Promise>(
+    (resolve) => {
+      addEventListener(
+        "message",
+        (event) => {
+          if (event.data.type === "init") {
+            resolve(event.data.options);
+          }
+        },
+        { once: true },
+      );
+    },
+  );
+
+  // ID for this multi-tab worker - this is used to identify the group of workers
+  // that are trying to elect a leader for a shared PGlite instance.
+  // It defaults to the URL of the worker, and the dataDir if provided
+  // but can be overridden by the options.
+  const id = options.id ?? `${import.meta.url}:${options.dataDir ?? ""}`;
+
+  // Let the main thread know we are ready
+  postMessage({ type: "ready", id });
+
+  const electionLockId = `pglite-election-lock:${id}`;
+  const broadcastChannelId = `pglite-broadcast:${id}`;
+  const broadcastChannel = new BroadcastChannel(broadcastChannelId);
+  const connectedTabs = new Set();
+
+  // Await the main lock which is used to elect the leader
+  // We don't release this lock, its automatically released when the worker or
+  // tab is closed
+  await acquireLock(electionLockId);
+
+  // Now we are the leader, start the worker
+  const dbPromise = init(options);
+
+  // Start listening for messages from tabs
+  broadcastChannel.onmessage = async (event) => {
+    const msg = event.data;
+    switch (msg.type) {
+      case "tab-here":
+        // A new tab has joined,
+        connectTab(msg.id, await dbPromise, connectedTabs);
+        break;
+    }
+  };
+
+  // Notify the other tabs that we are the leader
+  broadcastChannel.postMessage({ type: "leader-here", id });
+
+  // Let the main thread know we are the leader
+  postMessage({ type: "leader-now" });
+
+  const db = await dbPromise;
+
+  // Listen for notifications and broadcast them to all tabs
+  db.onNotification((channel, payload) => {
+    broadcastChannel.postMessage({ type: "notify", channel, payload });
+  });
+}
+
+function connectTab(
+  tabId: string,
+  pg: PGliteInterface,
+  connectedTabs: Set,
+) {
+  if (connectedTabs.has(tabId)) {
+    return;
+  }
+  connectedTabs.add(tabId);
+  const tabChannelId = `pglite-tab:${tabId}`;
+  const tabCloseLockId = `pglite-tab-close:${tabId}`;
+  const tabChannel = new BroadcastChannel(tabChannelId);
+
+  // Use a tab close lock to unsubscribe the tab
+  navigator.locks.request(tabCloseLockId, () => {
+    return new Promise((resolve) => {
+      // The tab has been closed, unsubscribe the tab broadcast channel
+      tabChannel.close();
+      connectedTabs.delete(tabId);
+      resolve();
+    });
+  });
+
+  const api = makeWorkerApi(pg);
+
+  tabChannel.addEventListener("message", async (event) => {
+    const msg = event.data;
+    switch (msg.type) {
+      case "rpc-call":
+        const { callId, method, args } = msg as WorkerRpcCall;
+        try {
+          // @ts-ignore
+          const result = (await api[method](...args)) as WorkerRpcResult<
+            typeof method
+          >["result"];
+          tabChannel.postMessage({
+            type: "rpc-return",
+            callId,
+            result,
+          } satisfies WorkerRpcResult);
+        } catch (error) {
+          console.error(error);
+          tabChannel.postMessage({
+            type: "rpc-error",
+            callId,
+            error: { message: (error as Error).message },
+          } satisfies WorkerRpcError);
+        }
+        break;
+    }
+  });
+
+  // Send a message to the tab to let it know it's connected
+  tabChannel.postMessage({ type: "connected" });
+}
+
+function makeWorkerApi(db: PGliteInterface) {
+  const transactions = new Map<
+    string,
+    Promise<{
+      tx: Transaction;
+      resolve: () => void;
+      reject: (error: any) => void;
+    }>
+  >();
+
+  return {
+    async getDebugLevel() {
+      return db.debug;
+    },
+    async close() {
+      await db.close();
+    },
+    async query(query: string, params?: any[], options?: QueryOptions) {
+      return await db.query(query, params, options);
+    },
+    async exec(query: string, options?: QueryOptions) {
+      return await db.exec(query, options);
+    },
+    async transactionStart() {
+      const txId = uuid();
+      const { promise: txPromise, resolve: resolveTxPromise } = makePromise<{
+        tx: Transaction;
+        resolve: () => void;
+        reject: (error: any) => void;
+      }>();
+      transactions.set(txId, txPromise);
+      db.transaction((newTx) => {
+        return new Promise((resolveTx, rejectTx) => {
+          resolveTxPromise({
+            tx: newTx,
+            resolve: resolveTx,
+            reject: rejectTx,
+          });
+        });
+      });
+      return txId;
+    },
+    async transactionCommit(id: string) {
+      if (!transactions.has(id)) {
+        throw new Error("No transaction");
+      }
+      (await transactions.get(id)!).resolve();
+      transactions.delete(id);
+    },
+    async transactionQuery(
+      id: string,
+      query: string,
+      params?: any[],
+      options?: QueryOptions,
+    ) {
+      if (!transactions.has(id)) {
+        throw new Error("No transaction");
+      }
+      const tx = (await transactions.get(id)!).tx;
+      return await tx.query(query, params, options);
+    },
+    async transactionExec(id: string, query: string, options?: QueryOptions) {
+      if (!transactions.has(id)) {
+        throw new Error("No transaction");
+      }
+      const tx = (await transactions.get(id)!).tx;
+      return tx.exec(query, options);
+    },
+    async transactionRollback(id: string) {
+      if (!transactions.has(id)) {
+        throw new Error("No transaction");
+      }
+      const tx = await transactions.get(id)!;
+      await tx.tx.rollback();
+      tx.reject(new Error("Transaction rolled back"));
+      transactions.delete(id);
+    },
+    async execProtocol(message: Uint8Array) {
+      return await db.execProtocol(message);
+    },
+    async execProtocolRaw(message: Uint8Array) {
+      return await db.execProtocolRaw(message);
+    },
+    async dumpDataDir() {
+      return await db.dumpDataDir();
+    },
+  };
+}
+
+export class LeaderChangedError extends Error {
+  constructor() {
+    super("Leader changed, pending operation in indeterminate state");
   }
 }
+
+async function acquireLock(lockId: string) {
+  let release;
+  await new Promise((resolve) => {
+    navigator.locks.request(lockId, () => {
+      return new Promise((releaseCallback) => {
+        release = releaseCallback;
+        resolve();
+      });
+    });
+  });
+  return release;
+}
+
+function makePromise() {
+  let resolve: (value: T) => void;
+  let reject: (error: any) => void;
+  const promise = new Promise((res, rej) => {
+    resolve = res;
+    reject = rej;
+  });
+  return { promise, resolve: resolve!, reject: reject! };
+}
+
+type WorkerApi = ReturnType;
+
+type WorkerRpcMethod = keyof WorkerApi;
+
+type WorkerRpcCall = {
+  type: "rpc-call";
+  callId: string;
+  method: Method;
+  args: Parameters;
+};
+
+type WorkerRpcResult = {
+  type: "rpc-return";
+  callId: string;
+  result: ReturnType;
+};
+
+type WorkerRpcError = {
+  type: "rpc-error";
+  callId: string;
+  error: any;
+};
+
+type WorkerRpcResponse =
+  | WorkerRpcResult
+  | WorkerRpcError;
diff --git a/packages/pglite/src/worker/process.ts b/packages/pglite/src/worker/process.ts
deleted file mode 100644
index eac2df57..00000000
--- a/packages/pglite/src/worker/process.ts
+++ /dev/null
@@ -1,48 +0,0 @@
-import * as Comlink from "comlink";
-import { PGlite } from "../index.js";
-import type { PGliteOptions, QueryOptions } from "../interface.js";
-
-let db: PGlite;
-
-const worker = {
-  async init(
-    dataDir?: string,
-    options?: PGliteOptions,
-    onNotification?: (channel: string, payload: string) => void,
-  ) {
-    db = new PGlite(dataDir, options);
-    await db.waitReady;
-    if (onNotification) {
-      db.onNotification(onNotification);
-    }
-    return true;
-  },
-  async close() {
-    await db.close();
-  },
-  async query(query: string, params?: any[], options?: QueryOptions) {
-    return await db.query(query, params, options);
-  },
-  async exec(query: string, options?: QueryOptions) {
-    return await db.exec(query, options);
-  },
-  async transaction(callback: (tx: any) => Promise) {
-    return await db.transaction((tx) => {
-      return callback(Comlink.proxy(tx));
-    });
-  },
-  async execProtocolRaw(message: Uint8Array) {
-    return await db.execProtocolRaw(message);
-  },
-  async execProtocol(message: Uint8Array) {
-    return await db.execProtocol(message);
-  },
-  async dumpDataDir() {
-    const file = await db.dumpDataDir();
-    return Comlink.transfer(file, [await file.arrayBuffer()]);
-  },
-};
-
-Comlink.expose(worker);
-
-export type Worker = typeof worker;
diff --git a/packages/pglite/tests/targets/base.js b/packages/pglite/tests/targets/base.js
index 63b05b22..ca0c80b5 100644
--- a/packages/pglite/tests/targets/base.js
+++ b/packages/pglite/tests/targets/base.js
@@ -6,6 +6,7 @@ const wsPort = process.env.WS_PORT || 3334;
 export function tests(env, dbFilename, target) {
   let browser;
   let evaluate;
+  let context;
   let page;
   let db;
 
@@ -25,11 +26,21 @@ export function tests(env, dbFilename, target) {
     if (env === "node") {
       evaluate = async (fn) => fn();
     } else {
-      const context = await browser.newContext();
+      context = await browser.newContext();
       page = await context.newPage();
       await page.goto(`http://localhost:${wsPort}/tests/blank.html`);
       page.evaluate(`window.dbFilename = "${dbFilename}";`);
-      evaluate = async (fn) => await page.evaluate(fn);
+      page.on("console", (msg) => {
+        console.log(msg);
+      });
+      evaluate = async (fn) => {
+        try {
+          return await page.evaluate(fn);
+        } catch (e) {
+          console.error(e);
+          throw e;
+        }
+      };
     }
   });
 
@@ -149,4 +160,242 @@ export function tests(env, dbFilename, target) {
       ],
     });
   });
+
+  if (env === "node") {
+    // Skip the rest of the tests for node as they are browser specific
+    return;
+  }
+
+  test.serial(`targets ${target} worker live query`, async (t) => {
+    const page2 = await context.newPage();
+    await page2.goto(`http://localhost:${wsPort}/tests/blank.html`);
+    page2.evaluate(`window.dbFilename = "${dbFilename}";`);
+    page.on("console", (msg) => {
+      console.log(msg);
+    });
+
+    const res2Prom = page2.evaluate(async () => {
+      const { live } = await import("../../dist/live/index.js");
+      const { PGliteWorker } = await import("../../dist/worker/index.js");
+
+      let db;
+      db = new PGliteWorker(
+        new Worker("/tests/targets/worker.js", {
+          type: "module",
+        }),
+        {
+          dataDir: window.dbFilename,
+          extensions: { live },
+        }
+      );
+
+      await db.waitReady;
+
+      let updatedResults;
+      const eventTarget = new EventTarget();
+      const { initialResults, unsubscribe } = await db.live.query(
+        "SELECT * FROM test ORDER BY name;",
+        [],
+        (result) => {
+          updatedResults = result;
+          eventTarget.dispatchEvent(new Event("updated"));
+        }
+      );
+      await new Promise((resolve) => {
+        eventTarget.addEventListener("updated", resolve);
+      });
+      return { initialResults, updatedResults };
+    });
+
+    const res1 = await evaluate(async () => {
+      const { live } = await import("../../dist/live/index.js");
+      const { PGliteWorker } = await import("../../dist/worker/index.js");
+
+      let db;
+      db = new PGliteWorker(
+        new Worker("/tests/targets/worker.js", {
+          type: "module",
+        }),
+        {
+          dataDir: window.dbFilename,
+          extensions: { live },
+        }
+      );
+
+      await db.waitReady;
+
+      let updatedResults;
+      const eventTarget = new EventTarget();
+      const { initialResults, unsubscribe } = await db.live.query(
+        "SELECT * FROM test ORDER BY name;",
+        [],
+        (result) => {
+          updatedResults = result;
+          eventTarget.dispatchEvent(new Event("updated"));
+        }
+      );
+      await new Promise((resolve) => setTimeout(resolve, 500));
+      await db.query("INSERT INTO test (id, name) VALUES (3, 'test3');");
+      await new Promise((resolve) => {
+        eventTarget.addEventListener("updated", resolve);
+      });
+      return { initialResults, updatedResults };
+    });
+
+    const res2 = await res2Prom;
+
+    t.deepEqual(res1.initialResults.rows, [
+      {
+        id: 1,
+        name: "test",
+      },
+      {
+        id: 2,
+        name: "test2",
+      },
+    ]);
+
+    for (const res of [res1, res2]) {
+      t.deepEqual(res.updatedResults.rows, [
+        {
+          id: 1,
+          name: "test",
+        },
+        {
+          id: 2,
+          name: "test2",
+        },
+        {
+          id: 3,
+          name: "test3",
+        },
+      ]);
+    }
+  });
+
+  test.serial(`targets ${target} worker live incremental query`, async (t) => {
+    const page2 = await context.newPage();
+    await page2.goto(`http://localhost:${wsPort}/tests/blank.html`);
+    page2.evaluate(`window.dbFilename = "${dbFilename}";`);
+    page.on("console", (msg) => {
+      console.log(msg);
+    });
+
+    const res2Prom = page2.evaluate(async () => {
+      const { live } = await import("../../dist/live/index.js");
+      const { PGliteWorker } = await import("../../dist/worker/index.js");
+
+      let db;
+      db = new PGliteWorker(
+        new Worker("/tests/targets/worker.js", {
+          type: "module",
+        }),
+        {
+          dataDir: window.dbFilename,
+          extensions: { live },
+        }
+      );
+
+      await db.waitReady;
+
+      let updatedResults;
+      const eventTarget = new EventTarget();
+      const { initialResults, unsubscribe } = await db.live.incrementalQuery(
+        "SELECT * FROM test ORDER BY name;",
+        [],
+        "id",
+        (result) => {
+          updatedResults = result;
+          eventTarget.dispatchEvent(new Event("updated"));
+        }
+      );
+      await new Promise((resolve) => {
+        eventTarget.addEventListener("updated", resolve);
+      });
+      return { initialResults, updatedResults };
+    });
+
+    const res1 = await evaluate(async () => {
+      const { live } = await import("../../dist/live/index.js");
+      const { PGliteWorker } = await import("../../dist/worker/index.js");
+
+      let db;
+      db = new PGliteWorker(
+        new Worker("/tests/targets/worker.js", {
+          type: "module",
+        }),
+        {
+          dataDir: window.dbFilename,
+          extensions: { live },
+        }
+      );
+
+      await db.waitReady;
+
+      let updatedResults;
+      const eventTarget = new EventTarget();
+      const { initialResults, unsubscribe } = await db.live.incrementalQuery(
+        "SELECT * FROM test ORDER BY name;",
+        [],
+        "id",
+        (result) => {
+          updatedResults = result;
+          eventTarget.dispatchEvent(new Event("updated"));
+        }
+      );
+      await new Promise((resolve) => setTimeout(resolve, 500));
+      const ret = await db.query(
+        "INSERT INTO test (id, name) VALUES (4, 'test4');"
+      );
+      await new Promise((resolve) => {
+        eventTarget.addEventListener("updated", resolve);
+      });
+      return { initialResults, updatedResults };
+    });
+
+    const res2 = await res2Prom;
+
+    t.deepEqual(res1.initialResults.rows, [
+      {
+        __after__: null,
+        id: 1,
+        name: "test",
+      },
+      {
+        __after__: 1,
+        id: 2,
+        name: "test2",
+      },
+      {
+        __after__: 2,
+        id: 3,
+        name: "test3",
+      },
+    ]);
+
+    for (const res of [res1, res2]) {
+      t.deepEqual(res.updatedResults.rows, [
+        {
+          __after__: null,
+          id: 1,
+          name: "test",
+        },
+        {
+          __after__: 1,
+          id: 2,
+          name: "test2",
+        },
+        {
+          __after__: 2,
+          id: 3,
+          name: "test3",
+        },
+        {
+          __after__: 3,
+          id: 4,
+          name: "test4",
+        },
+      ]);
+    }
+  });
 }
diff --git a/packages/pglite/tests/targets/worker.js b/packages/pglite/tests/targets/worker.js
new file mode 100644
index 00000000..a3a0b4a5
--- /dev/null
+++ b/packages/pglite/tests/targets/worker.js
@@ -0,0 +1,10 @@
+import { PGlite } from "../../../dist/index.js";
+import { worker } from "../../../dist/worker/index.js";
+
+worker({
+  async init(options) {
+    return new PGlite({
+      dataDir: options.dataDir,
+    });
+  },
+});
diff --git a/packages/pglite/tsup.config.ts b/packages/pglite/tsup.config.ts
index 3b3c1b16..87781168 100644
--- a/packages/pglite/tsup.config.ts
+++ b/packages/pglite/tsup.config.ts
@@ -19,7 +19,6 @@ const entryPoints = [
   "src/index.ts",
   'src/live/index.ts',
   "src/worker/index.ts",
-  "src/worker/process.ts",
   "src/vector/index.ts",
 ];
 
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index 44955bb4..812cec5a 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -58,9 +58,6 @@ importers:
       bun:
         specifier: ^1.1.18
         version: 1.1.18
-      comlink:
-        specifier: ^4.4.1
-        version: 4.4.1
       concurrently:
         specifier: ^8.2.2
         version: 8.2.2
@@ -2467,10 +2464,6 @@ packages:
       delayed-stream: 1.0.0
     dev: true
 
-  /comlink@4.4.1:
-    resolution: {integrity: sha512-+1dlx0aY5Jo1vHy/tSsIGpSkN4tS9rZSW8FIhG0JH/crs9wwweswIo/POr451r7bZww3hFbPAKnTpimzL/mm4Q==}
-    dev: true
-
   /commander@2.20.3:
     resolution: {integrity: sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ==}
     dev: true