Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Extension threading #858

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/data/schemata/consoleKind.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import * as S from '@effect/schema/Schema';

export const consoleKindSchema = S.union(
S.literal('debug'),
S.literal('error'),
S.literal('log'),
S.literal('info'),
S.literal('trace'),
S.literal('warn'),
);

export type ConsoleKind = S.Schema.To<typeof consoleKindSchema>;
export const parseConsoleKind = S.parseSync(consoleKindSchema);
6 changes: 6 additions & 0 deletions src/data/schemata/mainThreadMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import * as S from '@effect/schema/Schema';

const mainThreadMessageSchema = S.union(S.struct({ kind: S.literal('exit') }));

export type MainThreadMessage = S.Schema.To<typeof mainThreadMessageSchema>;
export const decodeMainThreadMessage = S.parseSync(mainThreadMessageSchema);
18 changes: 18 additions & 0 deletions src/data/schemata/workerThreadMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import * as S from '@effect/schema/Schema';
import { consoleKindSchema } from './consoleKind';

const workerThreadMessageSchema = S.union(
S.struct({
kind: S.literal('error'),
message: S.string,
path: S.union(S.string, S.undefined),
}),
S.struct({
kind: S.literal('console'),
consoleKind: consoleKindSchema,
message: S.string,
}),
);

export type WorkerThreadMessage = S.Schema.To<typeof workerThreadMessageSchema>;
export const decodeWorkerThreadMessage = S.parseSync(workerThreadMessageSchema);
39 changes: 39 additions & 0 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ import { GlobalStateTokenStorage, UserService } from './components/userService';
import { HomeDirectoryService } from './data/readHomeDirectoryCases';
import { isLeft } from 'fp-ts/lib/Either';
import { createClearStateCommand } from './commands/clearStateCommand';
import { isMainThread, Worker } from 'node:worker_threads';
import { executeWorkerThread } from './worker';
import { decodeWorkerThreadMessage } from './data/schemata/workerThreadMessage';

export const enum SEARCH_PARAMS_KEYS {
ENGINE = 'engine',
Expand All @@ -60,9 +63,45 @@ export const enum SEARCH_PARAMS_KEYS {
ACCESS_TOKEN = 'accessToken',
}

const WORKER_THREADS_COUNT = 10;

const messageBus = new MessageBus();

export async function activate(context: vscode.ExtensionContext) {
if (isMainThread) {
for (let i = 0; i < WORKER_THREADS_COUNT; ++i) {
const worker = new Worker(__filename);

worker.on('message', (m: unknown) => {
const workerThreadMessage = decodeWorkerThreadMessage(m);

if (workerThreadMessage.kind === 'console') {
console[workerThreadMessage.consoleKind](
workerThreadMessage.message,
);
return;
}

if (workerThreadMessage.kind === 'error') {
console.error(
workerThreadMessage.message,
workerThreadMessage.path,
);
}
});
}

execute(context).catch((error) => {
if (error instanceof Error) {
console.error(JSON.stringify({ message: error.message }));
}
});
} else {
executeWorkerThread();
}
}

async function execute(context: vscode.ExtensionContext) {
const rootUri = vscode.workspace.workspaceFolders?.[0]?.uri ?? null;

messageBus.setDisposables(context.subscriptions);
Expand Down
42 changes: 42 additions & 0 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { parentPort } from 'node:worker_threads';

import { ConsoleKind } from './data/schemata/consoleKind';
import { WorkerThreadMessage } from './data/schemata/workerThreadMessage';
import { decodeMainThreadMessage } from './data/schemata/mainThreadMessage';

class PathAwareError extends Error {
constructor(public readonly path: string, message?: string | undefined) {
super(message);
}
}

const sendLog = (consoleKind: ConsoleKind, message: string): void => {
parentPort?.postMessage({
kind: 'console',
consoleKind,
message,
} satisfies WorkerThreadMessage);
};

const messageHandler = async (m: unknown) => {
try {
const message = decodeMainThreadMessage(m);

if (message.kind === 'exit') {
parentPort?.off('message', messageHandler);
return;
}

sendLog('log', `Received message: ${JSON.stringify(message)}`);
} catch (error) {
parentPort?.postMessage({
kind: 'error',
message: error instanceof Error ? error.message : String(error),
path: error instanceof PathAwareError ? error.path : undefined,
} satisfies WorkerThreadMessage);
}
};

export const executeWorkerThread = () => {
parentPort?.on('message', messageHandler);
};
Loading