From 81a870e686f26bacbd9845d6cfae24ffd770d0f6 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 31 Jul 2024 15:03:31 +0100 Subject: [PATCH] address feedback --- packages/pglite/src/worker/index.ts | 129 +++++++++++++------------- packages/pglite/tests/targets/base.js | 1 - 2 files changed, 65 insertions(+), 65 deletions(-) diff --git a/packages/pglite/src/worker/index.ts b/packages/pglite/src/worker/index.ts index 81be8a3e..15f205b2 100644 --- a/packages/pglite/src/worker/index.ts +++ b/packages/pglite/src/worker/index.ts @@ -32,6 +32,8 @@ export class PGliteWorker implements PGliteInterface { #workerProcess: Worker; #workerID?: string; + #workerHerePromise?: Promise; + #workerReadyPromise?: Promise; #broadcastChannel?: BroadcastChannel; #tabChannel?: BroadcastChannel; @@ -49,6 +51,32 @@ export class PGliteWorker implements PGliteInterface { this.#workerProcess = worker; this.#tabId = uuid(); this.#extensions = options?.extensions ?? {}; + + this.#workerHerePromise = new Promise((resolve) => { + this.#workerProcess.addEventListener( + "message", + (event) => { + if (event.data.type === "here") { + resolve(); + } else { + throw new Error("Invalid message"); + } + }, + { once: true }, + ); + }); + + this.#workerReadyPromise = new Promise((resolve) => { + const callback = (event: MessageEvent) => { + if (event.data.type === "ready") { + this.#workerID = event.data.id; + this.#workerProcess.removeEventListener("message", callback); + resolve(); + } + }; + this.#workerProcess.addEventListener("message", callback); + }); + this.#initPromise = this.#init(options); } @@ -93,7 +121,7 @@ export class PGliteWorker implements PGliteInterface { ); } if (extRet.init) { - extRet.init(); + await extRet.init(); } if (extRet.close) { this.#extensionsClose.push(extRet.close); @@ -102,19 +130,7 @@ export class PGliteWorker implements PGliteInterface { } // Wait for the worker let us know it's here - await new Promise((resolve) => { - this.#workerProcess.addEventListener( - "message", - (event) => { - if (event.data.type === "here") { - resolve(); - } else { - throw new Error("Invalid message"); - } - }, - { once: true }, - ); - }); + await this.#workerHerePromise; // Send the worker the options const { extensions, ...workerOptions } = options; @@ -124,20 +140,12 @@ export class PGliteWorker implements PGliteInterface { }); // Wait for the worker let us know it's ready - await new Promise((resolve) => { - this.#workerProcess.addEventListener( - "message", - (event) => { - if (event.data.type === "ready") { - this.#workerID = event.data.id; - resolve(); - } else { - throw new Error("Invalid message"); - } - }, - { once: true }, - ); - }); + await this.#workerReadyPromise; + + // Acquire the tab close lock, this is released then the tab, or this + // PGliteWorker instance, is closed + const tabCloseLockId = `pglite-tab-close:${this.#tabId}`; + this.#releaseTabCloseLock = await acquireLock(tabCloseLockId); // Start the broadcast channel used to communicate with tabs and leader election const broadcastChannelId = `pglite-broadcast:${this.#workerID}`; @@ -147,18 +155,6 @@ export class PGliteWorker implements PGliteInterface { const tabChannelId = `pglite-tab:${this.#tabId}`; this.#tabChannel = new BroadcastChannel(tabChannelId); - // Acquire the tab close lock, this is released then the tab, or this - // PGliteWorker instance, is closed - const tabCloseLockId = `pglite-tab-close:${this.#tabId}`; - await new Promise((resolve) => { - navigator.locks.request(tabCloseLockId, () => { - return new Promise((release) => { - resolve(); - this.#releaseTabCloseLock = release; - }); - }); - }); - this.#broadcastChannel.addEventListener("message", async (event) => { if (event.data.type === "leader-here") { this.#connected = false; @@ -229,7 +225,7 @@ export class PGliteWorker implements PGliteInterface { const leaderChangeListener = () => { // If the leader changes, throw an error to reject the promise cleanup(); - throw new LeaderChangedError(); + reject(new LeaderChangedError()); }; const cleanup = () => { this.#tabChannel!.removeEventListener("message", listener); @@ -532,16 +528,9 @@ export async function worker({ init }: WorkerOptions) { const connectedTabs = new Set(); // Await the main lock which is used to elect the leader - await new Promise((resolve) => { - navigator.locks.request(electionLockId, () => { - return new Promise((_releaseLock) => { - // This is now the leader! - // We don't release the load by resolving the promise - // It will be released when the worker is closed - resolve(); - }); - }); - }); + // We don't release this lock, its automatically released when the worker or + // tab is closed + await acquireLock(electionLockId); // Now we are the leader, start the worker const dbPromise = init(options); @@ -638,10 +627,6 @@ function makeWorkerApi(db: PGliteInterface) { >(); return { - async init() { - await db.waitReady; - return true; - }, async getDebugLevel() { return db.debug; }, @@ -656,18 +641,11 @@ function makeWorkerApi(db: PGliteInterface) { }, async transactionStart() { const txId = uuid(); - let resolveTxPromise: (v: { + const { promise: txPromise, resolve: resolveTxPromise } = makePromise<{ tx: Transaction; resolve: () => void; reject: (error: any) => void; - }) => void; - const txPromise = new Promise<{ - tx: Transaction; - resolve: () => void; - reject: (error: any) => void; - }>((resolve) => { - resolveTxPromise = resolve; - }); + }>(); transactions.set(txId, txPromise); db.transaction((newTx) => { return new Promise((resolveTx, rejectTx) => { @@ -733,6 +711,29 @@ export class LeaderChangedError extends Error { } } +async function acquireLock(lockId: string) { + let release; + await new Promise((resolve) => { + navigator.locks.request(lockId, () => { + return new Promise((releaseCallback) => { + release = releaseCallback; + resolve(); + }); + }); + }); + return release; +} + +function makePromise() { + let resolve: (value: T) => void; + let reject: (error: any) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve: resolve!, reject: reject! }; +} + type WorkerApi = ReturnType; type WorkerRpcMethod = keyof WorkerApi; diff --git a/packages/pglite/tests/targets/base.js b/packages/pglite/tests/targets/base.js index 71110356..ca0c80b5 100644 --- a/packages/pglite/tests/targets/base.js +++ b/packages/pglite/tests/targets/base.js @@ -1,4 +1,3 @@ -import { c } from "../../dist/chunk-RZJOGGMK.js"; import test from "../polytest.js"; import playwright from "playwright";