From 525b89acbbc0ff04abeef60179516339dd61737e Mon Sep 17 00:00:00 2001 From: Sasha Mysak Date: Tue, 16 Jan 2024 16:22:01 +0100 Subject: [PATCH 1/3] wip --- src/data/schemata/consoleKind.ts | 12 +++++++ src/data/schemata/mainThreadMessage.ts | 5 +++ src/data/schemata/workerThreadMessage.ts | 22 ++++++++++++ src/extension.ts | 14 ++++++++ src/worker.ts | 46 ++++++++++++++++++++++++ 5 files changed, 99 insertions(+) create mode 100644 src/data/schemata/consoleKind.ts create mode 100644 src/data/schemata/mainThreadMessage.ts create mode 100644 src/data/schemata/workerThreadMessage.ts create mode 100644 src/worker.ts diff --git a/src/data/schemata/consoleKind.ts b/src/data/schemata/consoleKind.ts new file mode 100644 index 00000000..2842698c --- /dev/null +++ b/src/data/schemata/consoleKind.ts @@ -0,0 +1,12 @@ +import * as S from '@effect/schema/Schema'; + +export const consoleKindSchema = S.union( + S.literal('debug'), + S.literal('error'), + S.literal('log'), + S.literal('info'), + S.literal('trace'), + S.literal('warn'), +); +export type ConsoleKind = S.Schema.To; +export const parseConsoleKind = S.parseSync(consoleKindSchema); diff --git a/src/data/schemata/mainThreadMessage.ts b/src/data/schemata/mainThreadMessage.ts new file mode 100644 index 00000000..c30645d9 --- /dev/null +++ b/src/data/schemata/mainThreadMessage.ts @@ -0,0 +1,5 @@ +import * as S from '@effect/schema/Schema'; + +const mainThreadMessageSchema = S.union(S.struct({ kind: S.literal('exit') })); +export type MainThreadMessage = S.Schema.To; +export const decodeMainThreadMessage = S.parseSync(mainThreadMessageSchema); diff --git a/src/data/schemata/workerThreadMessage.ts b/src/data/schemata/workerThreadMessage.ts new file mode 100644 index 00000000..8e521e41 --- /dev/null +++ b/src/data/schemata/workerThreadMessage.ts @@ -0,0 +1,22 @@ +import * as S from '@effect/schema/Schema'; +import { consoleKindSchema } from './consoleKind'; + +const workerThreadMessageSchema = S.union( + S.struct({ + kind: S.literal('commands'), + commands: S.unknown, + }), + S.struct({ + kind: S.literal('error'), + message: S.string, + path: S.union(S.string, S.undefined), + }), + S.struct({ + kind: S.literal('console'), + consoleKind: consoleKindSchema, + message: S.string, + }), +); + +export type WorkerThreadMessage = S.Schema.To; +export const decodeWorkerThreadMessage = S.parseSync(workerThreadMessageSchema); diff --git a/src/extension.ts b/src/extension.ts index 8964e8c6..e3d8b60e 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -47,6 +47,8 @@ import { GlobalStateTokenStorage, UserService } from './components/userService'; import { HomeDirectoryService } from './data/readHomeDirectoryCases'; import { isLeft } from 'fp-ts/lib/Either'; import { createClearStateCommand } from './commands/clearStateCommand'; +import { isMainThread } from 'node:worker_threads'; +import { executeWorkerThread } from './worker'; export const enum SEARCH_PARAMS_KEYS { ENGINE = 'engine', @@ -63,6 +65,18 @@ export const enum SEARCH_PARAMS_KEYS { const messageBus = new MessageBus(); export async function activate(context: vscode.ExtensionContext) { + if (isMainThread) { + execute(context).catch((error) => { + if (error instanceof Error) { + console.error(JSON.stringify({ message: error.message })); + } + }); + } else { + executeWorkerThread(); + } +} + +async function execute(context: vscode.ExtensionContext) { const rootUri = vscode.workspace.workspaceFolders?.[0]?.uri ?? null; messageBus.setDisposables(context.subscriptions); diff --git a/src/worker.ts b/src/worker.ts new file mode 100644 index 00000000..8511acef --- /dev/null +++ b/src/worker.ts @@ -0,0 +1,46 @@ +import { parentPort } from 'node:worker_threads'; + +import { ConsoleKind } from './data/schemata/consoleKind'; +import { WorkerThreadMessage } from './data/schemata/workerThreadMessage'; +import { decodeMainThreadMessage } from './data/schemata/mainThreadMessage'; + +class PathAwareError extends Error { + constructor(public readonly path: string, message?: string | undefined) { + super(message); + } +} + +const sendLog = (consoleKind: ConsoleKind, message: string): void => { + parentPort?.postMessage({ + kind: 'console', + consoleKind, + message, + } satisfies WorkerThreadMessage); +}; + +const messageHandler = async (m: unknown) => { + try { + const message = decodeMainThreadMessage(m); + + if (message.kind === 'exit') { + parentPort?.off('message', messageHandler); + return; + } + + sendLog('log', `Received message: ${JSON.stringify(message)}`); + + // TODO: + // parentPort?.postMessage({ + // } satisfies WorkerThreadMessage); + } catch (error) { + parentPort?.postMessage({ + kind: 'error', + message: error instanceof Error ? error.message : String(error), + path: error instanceof PathAwareError ? error.path : undefined, + } satisfies WorkerThreadMessage); + } +}; + +export const executeWorkerThread = () => { + parentPort?.on('message', messageHandler); +}; From 0af95e28a1db5cc91d1ed1d4c51c22f3894ca93b Mon Sep 17 00:00:00 2001 From: Sasha Mysak Date: Tue, 16 Jan 2024 16:34:37 +0100 Subject: [PATCH 2/3] init? --- src/data/schemata/workerThreadMessage.ts | 4 ---- src/extension.ts | 27 +++++++++++++++++++++++- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/data/schemata/workerThreadMessage.ts b/src/data/schemata/workerThreadMessage.ts index 8e521e41..841e0393 100644 --- a/src/data/schemata/workerThreadMessage.ts +++ b/src/data/schemata/workerThreadMessage.ts @@ -2,10 +2,6 @@ import * as S from '@effect/schema/Schema'; import { consoleKindSchema } from './consoleKind'; const workerThreadMessageSchema = S.union( - S.struct({ - kind: S.literal('commands'), - commands: S.unknown, - }), S.struct({ kind: S.literal('error'), message: S.string, diff --git a/src/extension.ts b/src/extension.ts index e3d8b60e..da88b8a9 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -47,8 +47,9 @@ import { GlobalStateTokenStorage, UserService } from './components/userService'; import { HomeDirectoryService } from './data/readHomeDirectoryCases'; import { isLeft } from 'fp-ts/lib/Either'; import { createClearStateCommand } from './commands/clearStateCommand'; -import { isMainThread } from 'node:worker_threads'; +import { isMainThread, Worker } from 'node:worker_threads'; import { executeWorkerThread } from './worker'; +import { decodeWorkerThreadMessage } from './data/schemata/workerThreadMessage'; export const enum SEARCH_PARAMS_KEYS { ENGINE = 'engine', @@ -62,10 +63,34 @@ export const enum SEARCH_PARAMS_KEYS { ACCESS_TOKEN = 'accessToken', } +const WORKER_THREADS_COUNT = 10; + const messageBus = new MessageBus(); export async function activate(context: vscode.ExtensionContext) { if (isMainThread) { + for (let i = 0; i < WORKER_THREADS_COUNT; ++i) { + const worker = new Worker(__filename); + + worker.on('message', (m: unknown) => { + const workerThreadMessage = decodeWorkerThreadMessage(m); + + if (workerThreadMessage.kind === 'console') { + console[workerThreadMessage.consoleKind]( + workerThreadMessage.message, + ); + return; + } + + if (workerThreadMessage.kind === 'error') { + console.error( + workerThreadMessage.message, + workerThreadMessage.path, + ); + } + }); + } + execute(context).catch((error) => { if (error instanceof Error) { console.error(JSON.stringify({ message: error.message })); From 0faaee7f66775f1a1c2c6495accd63922367fcc9 Mon Sep 17 00:00:00 2001 From: Sasha Mysak Date: Fri, 19 Jan 2024 11:06:49 +0100 Subject: [PATCH 3/3] wip --- src/data/schemata/consoleKind.ts | 1 + src/data/schemata/mainThreadMessage.ts | 1 + src/worker.ts | 4 ---- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/data/schemata/consoleKind.ts b/src/data/schemata/consoleKind.ts index 2842698c..6850de2e 100644 --- a/src/data/schemata/consoleKind.ts +++ b/src/data/schemata/consoleKind.ts @@ -8,5 +8,6 @@ export const consoleKindSchema = S.union( S.literal('trace'), S.literal('warn'), ); + export type ConsoleKind = S.Schema.To; export const parseConsoleKind = S.parseSync(consoleKindSchema); diff --git a/src/data/schemata/mainThreadMessage.ts b/src/data/schemata/mainThreadMessage.ts index c30645d9..6ad62463 100644 --- a/src/data/schemata/mainThreadMessage.ts +++ b/src/data/schemata/mainThreadMessage.ts @@ -1,5 +1,6 @@ import * as S from '@effect/schema/Schema'; const mainThreadMessageSchema = S.union(S.struct({ kind: S.literal('exit') })); + export type MainThreadMessage = S.Schema.To; export const decodeMainThreadMessage = S.parseSync(mainThreadMessageSchema); diff --git a/src/worker.ts b/src/worker.ts index 8511acef..9a91f784 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -28,10 +28,6 @@ const messageHandler = async (m: unknown) => { } sendLog('log', `Received message: ${JSON.stringify(message)}`); - - // TODO: - // parentPort?.postMessage({ - // } satisfies WorkerThreadMessage); } catch (error) { parentPort?.postMessage({ kind: 'error',