diff --git a/docker-compose.yml b/docker-compose.yml index b0124d0762..aa46392887 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -93,6 +93,8 @@ services: SEQUENCING_DB_SERVER: postgres SEQUENCING_DB_USER: "${AERIE_USERNAME}" SEQUENCING_LOCAL_STORE: /usr/src/app/sequencing_file_store + SEQUENCING_WORKER_NUM: 8 + SEQUENCING_MAX_WORKER_HEAP_MB: 1000 image: aerie_sequencing ports: ["27184:27184"] restart: always diff --git a/sequencing-server/src/app.ts b/sequencing-server/src/app.ts index 3c1f422a58..dd1370f406 100644 --- a/sequencing-server/src/app.ts +++ b/sequencing-server/src/app.ts @@ -24,6 +24,9 @@ import getLogger from './utils/logger.js'; import { commandExpansionRouter } from './routes/command-expansion.js'; import { seqjsonRouter } from './routes/seqjson.js'; import { getHasuraSession, canUserPerformAction, ENDPOINTS_WHITELIST } from './utils/hasura.js'; +import type { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads'; +import type { CacheItem, UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner'; +import { PromiseThrottler } from './utils/PromiseThrottler.js'; const logger = getLogger('app'); @@ -37,7 +40,14 @@ app.use(bodyParser.json({ limit: '100mb' })); DbExpansion.init(); export const db = DbExpansion.getDb(); -export const piscina = new Piscina({ filename: new URL('worker.js', import.meta.url).pathname }); +export const piscina = new Piscina({ + filename: new URL('worker.js', import.meta.url).pathname, + minThreads: parseInt(getEnv().SEQUENCING_WORKER_NUM), + resourceLimits: { maxOldGenerationSizeMb: parseInt(getEnv().SEQUENCING_MAX_WORKER_HEAP_MB) }, +}); +export const promiseThrottler = new PromiseThrottler(parseInt(getEnv().SEQUENCING_WORKER_NUM) - 2); +export const typeCheckingCache = new Map[]>>>(); + const temporalPolyfillTypes = fs.readFileSync(new URL('TemporalPolyfillTypes.ts', import.meta.url).pathname, 'utf-8'); export type Context = { @@ -234,4 +244,7 @@ app.use((err: any, _: Request, res: Response, next: NextFunction) => { app.listen(PORT, () => { logger.info(`connected to port ${PORT}`); + logger.info(`Worker pool initialized: + Total workers started: ${piscina.threads.length}, + Heap Size per Worker: ${getEnv().SEQUENCING_MAX_WORKER_HEAP_MB} MB`); }); diff --git a/sequencing-server/src/env.ts b/sequencing-server/src/env.ts index 2701912377..ee8564799f 100644 --- a/sequencing-server/src/env.ts +++ b/sequencing-server/src/env.ts @@ -10,6 +10,8 @@ export type Env = { POSTGRES_PORT: string; POSTGRES_USER: string; STORAGE: string; + SEQUENCING_WORKER_NUM: string; + SEQUENCING_MAX_WORKER_HEAP_MB: string; }; export const defaultEnv: Env = { @@ -24,6 +26,8 @@ export const defaultEnv: Env = { POSTGRES_PORT: '5432', POSTGRES_USER: '', STORAGE: 'sequencing_file_store', + SEQUENCING_WORKER_NUM: '8', + SEQUENCING_MAX_WORKER_HEAP_MB: '1000', }; export function getEnv(): Env { @@ -40,6 +44,9 @@ export function getEnv(): Env { const POSTGRES_PORT = env['SEQUENCING_DB_PORT'] ?? defaultEnv.POSTGRES_PORT; const POSTGRES_USER = env['SEQUENCING_DB_USER'] ?? defaultEnv.POSTGRES_USER; const STORAGE = env['SEQUENCING_LOCAL_STORE'] ?? defaultEnv.STORAGE; + const SEQUENCING_WORKER_NUM = env['SEQUENCING_WORKER_NUM'] ?? defaultEnv.SEQUENCING_WORKER_NUM; + const SEQUENCING_MAX_WORKER_HEAP_MB = + env['SEQUENCING_MAX_WORKER_HEAP_MB'] ?? defaultEnv.SEQUENCING_MAX_WORKER_HEAP_MB; return { HASURA_GRAPHQL_ADMIN_SECRET, LOG_FILE, @@ -52,5 +59,7 @@ export function getEnv(): Env { POSTGRES_PORT, POSTGRES_USER, STORAGE, + SEQUENCING_WORKER_NUM, + SEQUENCING_MAX_WORKER_HEAP_MB, }; } diff --git a/sequencing-server/src/routes/command-expansion.ts b/sequencing-server/src/routes/command-expansion.ts index f5073ac0d8..e53e2eb39b 100644 --- a/sequencing-server/src/routes/command-expansion.ts +++ b/sequencing-server/src/routes/command-expansion.ts @@ -1,6 +1,6 @@ -import type { CacheItem, UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner'; +import type { UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner'; import pgFormat from 'pg-format'; -import { Context, db, piscina } from './../app.js'; +import { Context, db, piscina, promiseThrottler, typeCheckingCache } from './../app.js'; import { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js'; import express from 'express'; import { serializeWithTemporal } from './../utils/temporalSerializers.js'; @@ -13,6 +13,7 @@ import { unwrapPromiseSettledResults } from '../lib/batchLoaders/index.js'; import { defaultSeqBuilder } from '../defaultSeqBuilder.js'; import { ActivateStep, CommandStem, LoadStep } from './../lib/codegen/CommandEDSLPreface.js'; import { getUsername } from '../utils/hasura.js'; +import * as crypto from 'crypto'; const logger = getLogger('app'); @@ -54,17 +55,18 @@ commandExpansionRouter.post('/put-expansion', async (req, res, next) => { activityTypeName, }); const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema); - - const result = Result.fromJSON( - await (piscina.run( - { - expansionLogic, - commandTypes: commandTypes, - activityTypes: activityTypescript, - }, - { name: 'typecheckExpansion' }, - ) as ReturnType), - ); + const result = await promiseThrottler.run(() => { + return ( + piscina.run( + { + commandTypes: commandTypes, + activityTypes: activityTypescript, + activityTypeName: activityTypeName, + }, + { name: 'typecheckExpansion' }, + ) as ReturnType + ).then(Result.fromJSON); + }); res.status(200).json({ id, errors: result.isErr() ? result.unwrapErr() : [] }); return next(); @@ -90,32 +92,65 @@ commandExpansionRouter.post('/put-expansion-set', async (req, res, next) => { if (expansion instanceof Error) { throw new InheritedError(`Expansion with id: ${expansionIds[index]} could not be loaded`, expansion); } + + const hash = crypto + .createHash('sha256') + .update( + JSON.stringify({ + commandDictionaryId, + missionModelId, + id: expansion.id, + expansionLogic: expansion.expansionLogic, + activityType: expansion.activityType, + }), + ) + .digest('hex'); + + if (typeCheckingCache.has(hash)) { + console.log(`Using cached typechecked data for ${expansion.activityType}`); + return typeCheckingCache.get(hash); + } + const activitySchema = await context.activitySchemaDataLoader.load({ missionModelId, activityTypeName: expansion.activityType, }); const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema); - const result = Result.fromJSON( - await (piscina.run( - { - expansionLogic: expansion.expansionLogic, - commandTypes: commandTypes, - activityTypes: activityTypescript, - }, - { name: 'typecheckExpansion' }, - ) as ReturnType), - ); + const typeCheckResult = promiseThrottler.run(() => { + return ( + piscina.run( + { + expansionLogic: expansion.expansionLogic, + commandTypes: commandTypes, + activityTypes: activityTypescript, + activityTypeName: expansion.activityType, + }, + { name: 'typecheckExpansion' }, + ) as ReturnType + ).then(Result.fromJSON); + }); - return result; + typeCheckingCache.set(hash, typeCheckResult); + return typeCheckResult; }), ); const errors = unwrapPromiseSettledResults(typecheckErrorPromises).reduce((accum, item) => { - if (item instanceof Error) { - accum.push(item); - } else if (item.isErr()) { - accum.push(...item.unwrapErr()); + if (item && (item instanceof Error || item.isErr)) { + // Check for item's existence before accessing properties + if (item instanceof Error) { + accum.push(item); + } else if (item.isErr()) { + try { + accum.push(...item.unwrapErr()); // Handle potential errors within unwrapErr + } catch (error) { + accum.push(new Error('Failed to unwrap error: ' + error)); // Log unwrapErr errors + } + } + } else { + accum.push(new Error('Unexpected result in resolved promises')); // Handle unexpected non-error values } + return accum; }, [] as (Error | ReturnType)[]); @@ -168,15 +203,10 @@ commandExpansionRouter.post('/expand-all-activity-instances', async (req, res, n context.expansionSetDataLoader.load({ expansionSetId }), context.simulatedActivitiesDataLoader.load({ simulationDatasetId }), ]); + const commandDictionaryId = expansionSet.commandDictionary.id; + const missionModelId = expansionSet.missionModel.id; const commandTypes = expansionSet.commandDictionary.commandTypesTypeScript; - // Note: We are keeping the Promise in the cache so that we don't have to wait for resolution to insert into - // the cache and consequently end up doing the compilation multiple times because of a cache miss. - const expansionBuildArtifactsCache = new Map< - number, - Promise[]>> - >(); - const settledExpansionResults = await Promise.allSettled( simulatedActivities.map(async simulatedActivity => { // The simulatedActivity's duration and endTime will be null if the effect model reaches across the plan end boundaries. @@ -205,21 +235,36 @@ commandExpansionRouter.post('/expand-all-activity-instances', async (req, res, n } const activityTypes = generateTypescriptForGraphQLActivitySchema(activitySchema); - if (!expansionBuildArtifactsCache.has(expansion.id)) { - const typecheckResult = ( - piscina.run( - { - expansionLogic: expansion.expansionLogic, - commandTypes: commandTypes, - activityTypes, - }, - { name: 'typecheckExpansion' }, - ) as ReturnType - ).then(Result.fromJSON); - expansionBuildArtifactsCache.set(expansion.id, typecheckResult); - } + const hash = crypto + .createHash('sha256') + .update( + JSON.stringify({ + commandDictionaryId, + missionModelId, + id: expansion.id, + expansionLogic: expansion.expansionLogic, + activityType: expansion.activityType, + }), + ) + .digest('hex'); + if (!typeCheckingCache.has(hash)) { + const typeCheckResult = promiseThrottler.run(() => { + return ( + piscina.run( + { + expansionLogic: expansion.expansionLogic, + commandTypes: commandTypes, + activityTypes: activityTypes, + activityTypeName: expansion.activityType, + }, + { name: 'typecheckExpansion' }, + ) as ReturnType + ).then(Result.fromJSON); + }); - const expansionBuildArtifacts = await expansionBuildArtifactsCache.get(expansion.id)!; + typeCheckingCache.set(hash, typeCheckResult); + } + const expansionBuildArtifacts = await typeCheckingCache.get(hash)!; if (expansionBuildArtifacts.isErr()) { return { diff --git a/sequencing-server/src/utils/PromiseThrottler.ts b/sequencing-server/src/utils/PromiseThrottler.ts new file mode 100644 index 0000000000..917004c0fa --- /dev/null +++ b/sequencing-server/src/utils/PromiseThrottler.ts @@ -0,0 +1,22 @@ +export class PromiseThrottler { + private runningPromises: Promise[] = []; + private promiseLimit: number; + + public constructor(promiseLimit: number) { + this.promiseLimit = promiseLimit; + } + + public async run(promiseFactory: () => Promise): Promise { + while (this.runningPromises.length >= this.promiseLimit) { + await Promise.race(this.runningPromises); + } + const promise = promiseFactory(); + this.runningPromises.push(promise); + return promise.finally(() => { + const index = this.runningPromises.indexOf(promise); + if (index !== -1) { + this.runningPromises.splice(index, 1); + } + }); + } +} diff --git a/sequencing-server/src/worker.ts b/sequencing-server/src/worker.ts index 44e040b3ea..37c9bbc0a0 100644 --- a/sequencing-server/src/worker.ts +++ b/sequencing-server/src/worker.ts @@ -26,7 +26,13 @@ export async function typecheckExpansion(opts: { expansionLogic: string; commandTypes: string; activityTypes: string; + activityTypeName?: string; }): Promise[]>> { + const startTime = Date.now(); + console.log( + `[ Worker ] started transpiling authoring logic ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}`, + ); + const result = await codeRunner.preProcess( opts.expansionLogic, 'ExpansionReturn', @@ -37,6 +43,14 @@ export async function typecheckExpansion(opts: { ts.createSourceFile('TemporalPolyfillTypes.ts', temporalPolyfillTypes, compilerTarget), ], ); + + const endTime = Date.now(); + console.log( + `[ Worker ] finished transpiling ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}, (${ + (endTime - startTime) / 1000 + } s)`, + ); + if (result.isOk()) { return Result.Ok(result.unwrap()).toJSON(); } else {