Skip to content

Commit

Permalink
Merge pull request #1324 from NASA-AMMOS/improve-large-expansion-sets
Browse files Browse the repository at this point in the history
Improve large expansion sets
  • Loading branch information
goetzrrGit authored Feb 28, 2024
2 parents af5e597 + 3b39cd4 commit 6deb94f
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 50 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ services:
SEQUENCING_DB_SERVER: postgres
SEQUENCING_DB_USER: "${AERIE_USERNAME}"
SEQUENCING_LOCAL_STORE: /usr/src/app/sequencing_file_store
SEQUENCING_WORKER_NUM: 8
SEQUENCING_MAX_WORKER_HEAP_MB: 1000
image: aerie_sequencing
ports: ["27184:27184"]
restart: always
Expand Down
15 changes: 14 additions & 1 deletion sequencing-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import getLogger from './utils/logger.js';
import { commandExpansionRouter } from './routes/command-expansion.js';
import { seqjsonRouter } from './routes/seqjson.js';
import { getHasuraSession, canUserPerformAction, ENDPOINTS_WHITELIST } from './utils/hasura.js';
import type { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads';
import type { CacheItem, UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner';
import { PromiseThrottler } from './utils/PromiseThrottler.js';

const logger = getLogger('app');

Expand All @@ -37,7 +40,14 @@ app.use(bodyParser.json({ limit: '100mb' }));
DbExpansion.init();
export const db = DbExpansion.getDb();

export const piscina = new Piscina({ filename: new URL('worker.js', import.meta.url).pathname });
export const piscina = new Piscina({
filename: new URL('worker.js', import.meta.url).pathname,
minThreads: parseInt(getEnv().SEQUENCING_WORKER_NUM),
resourceLimits: { maxOldGenerationSizeMb: parseInt(getEnv().SEQUENCING_MAX_WORKER_HEAP_MB) },
});
export const promiseThrottler = new PromiseThrottler(parseInt(getEnv().SEQUENCING_WORKER_NUM) - 2);
export const typeCheckingCache = new Map<string, Promise<Result<CacheItem, ReturnType<UserCodeError['toJSON']>[]>>>();

const temporalPolyfillTypes = fs.readFileSync(new URL('TemporalPolyfillTypes.ts', import.meta.url).pathname, 'utf-8');

export type Context = {
Expand Down Expand Up @@ -234,4 +244,7 @@ app.use((err: any, _: Request, res: Response, next: NextFunction) => {

app.listen(PORT, () => {
logger.info(`connected to port ${PORT}`);
logger.info(`Worker pool initialized:
Total workers started: ${piscina.threads.length},
Heap Size per Worker: ${getEnv().SEQUENCING_MAX_WORKER_HEAP_MB} MB`);
});
9 changes: 9 additions & 0 deletions sequencing-server/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export type Env = {
POSTGRES_PORT: string;
POSTGRES_USER: string;
STORAGE: string;
SEQUENCING_WORKER_NUM: string;
SEQUENCING_MAX_WORKER_HEAP_MB: string;
};

export const defaultEnv: Env = {
Expand All @@ -24,6 +26,8 @@ export const defaultEnv: Env = {
POSTGRES_PORT: '5432',
POSTGRES_USER: '',
STORAGE: 'sequencing_file_store',
SEQUENCING_WORKER_NUM: '8',
SEQUENCING_MAX_WORKER_HEAP_MB: '1000',
};

export function getEnv(): Env {
Expand All @@ -40,6 +44,9 @@ export function getEnv(): Env {
const POSTGRES_PORT = env['SEQUENCING_DB_PORT'] ?? defaultEnv.POSTGRES_PORT;
const POSTGRES_USER = env['SEQUENCING_DB_USER'] ?? defaultEnv.POSTGRES_USER;
const STORAGE = env['SEQUENCING_LOCAL_STORE'] ?? defaultEnv.STORAGE;
const SEQUENCING_WORKER_NUM = env['SEQUENCING_WORKER_NUM'] ?? defaultEnv.SEQUENCING_WORKER_NUM;
const SEQUENCING_MAX_WORKER_HEAP_MB =
env['SEQUENCING_MAX_WORKER_HEAP_MB'] ?? defaultEnv.SEQUENCING_MAX_WORKER_HEAP_MB;
return {
HASURA_GRAPHQL_ADMIN_SECRET,
LOG_FILE,
Expand All @@ -52,5 +59,7 @@ export function getEnv(): Env {
POSTGRES_PORT,
POSTGRES_USER,
STORAGE,
SEQUENCING_WORKER_NUM,
SEQUENCING_MAX_WORKER_HEAP_MB,
};
}
143 changes: 94 additions & 49 deletions sequencing-server/src/routes/command-expansion.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { CacheItem, UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner';
import type { UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner';
import pgFormat from 'pg-format';
import { Context, db, piscina } from './../app.js';
import { Context, db, piscina, promiseThrottler, typeCheckingCache } from './../app.js';
import { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js';
import express from 'express';
import { serializeWithTemporal } from './../utils/temporalSerializers.js';
Expand All @@ -13,6 +13,7 @@ import { unwrapPromiseSettledResults } from '../lib/batchLoaders/index.js';
import { defaultSeqBuilder } from '../defaultSeqBuilder.js';
import { ActivateStep, CommandStem, LoadStep } from './../lib/codegen/CommandEDSLPreface.js';
import { getUsername } from '../utils/hasura.js';
import * as crypto from 'crypto';

const logger = getLogger('app');

Expand Down Expand Up @@ -54,17 +55,18 @@ commandExpansionRouter.post('/put-expansion', async (req, res, next) => {
activityTypeName,
});
const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema);

const result = Result.fromJSON(
await (piscina.run(
{
expansionLogic,
commandTypes: commandTypes,
activityTypes: activityTypescript,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>),
);
const result = await promiseThrottler.run(() => {
return (
piscina.run(
{
commandTypes: commandTypes,
activityTypes: activityTypescript,
activityTypeName: activityTypeName,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);
});

res.status(200).json({ id, errors: result.isErr() ? result.unwrapErr() : [] });
return next();
Expand All @@ -90,32 +92,65 @@ commandExpansionRouter.post('/put-expansion-set', async (req, res, next) => {
if (expansion instanceof Error) {
throw new InheritedError(`Expansion with id: ${expansionIds[index]} could not be loaded`, expansion);
}

const hash = crypto
.createHash('sha256')
.update(
JSON.stringify({
commandDictionaryId,
missionModelId,
id: expansion.id,
expansionLogic: expansion.expansionLogic,
activityType: expansion.activityType,
}),
)
.digest('hex');

if (typeCheckingCache.has(hash)) {
console.log(`Using cached typechecked data for ${expansion.activityType}`);
return typeCheckingCache.get(hash);
}

const activitySchema = await context.activitySchemaDataLoader.load({
missionModelId,
activityTypeName: expansion.activityType,
});
const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema);
const result = Result.fromJSON(
await (piscina.run(
{
expansionLogic: expansion.expansionLogic,
commandTypes: commandTypes,
activityTypes: activityTypescript,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>),
);
const typeCheckResult = promiseThrottler.run(() => {
return (
piscina.run(
{
expansionLogic: expansion.expansionLogic,
commandTypes: commandTypes,
activityTypes: activityTypescript,
activityTypeName: expansion.activityType,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);
});

return result;
typeCheckingCache.set(hash, typeCheckResult);
return typeCheckResult;
}),
);

const errors = unwrapPromiseSettledResults(typecheckErrorPromises).reduce((accum, item) => {
if (item instanceof Error) {
accum.push(item);
} else if (item.isErr()) {
accum.push(...item.unwrapErr());
if (item && (item instanceof Error || item.isErr)) {
// Check for item's existence before accessing properties
if (item instanceof Error) {
accum.push(item);
} else if (item.isErr()) {
try {
accum.push(...item.unwrapErr()); // Handle potential errors within unwrapErr
} catch (error) {
accum.push(new Error('Failed to unwrap error: ' + error)); // Log unwrapErr errors
}
}
} else {
accum.push(new Error('Unexpected result in resolved promises')); // Handle unexpected non-error values
}

return accum;
}, [] as (Error | ReturnType<UserCodeError['toJSON']>)[]);

Expand Down Expand Up @@ -168,15 +203,10 @@ commandExpansionRouter.post('/expand-all-activity-instances', async (req, res, n
context.expansionSetDataLoader.load({ expansionSetId }),
context.simulatedActivitiesDataLoader.load({ simulationDatasetId }),
]);
const commandDictionaryId = expansionSet.commandDictionary.id;
const missionModelId = expansionSet.missionModel.id;
const commandTypes = expansionSet.commandDictionary.commandTypesTypeScript;

// Note: We are keeping the Promise in the cache so that we don't have to wait for resolution to insert into
// the cache and consequently end up doing the compilation multiple times because of a cache miss.
const expansionBuildArtifactsCache = new Map<
number,
Promise<Result<CacheItem, ReturnType<UserCodeError['toJSON']>[]>>
>();

const settledExpansionResults = await Promise.allSettled(
simulatedActivities.map(async simulatedActivity => {
// The simulatedActivity's duration and endTime will be null if the effect model reaches across the plan end boundaries.
Expand Down Expand Up @@ -205,21 +235,36 @@ commandExpansionRouter.post('/expand-all-activity-instances', async (req, res, n
}
const activityTypes = generateTypescriptForGraphQLActivitySchema(activitySchema);

if (!expansionBuildArtifactsCache.has(expansion.id)) {
const typecheckResult = (
piscina.run(
{
expansionLogic: expansion.expansionLogic,
commandTypes: commandTypes,
activityTypes,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);
expansionBuildArtifactsCache.set(expansion.id, typecheckResult);
}
const hash = crypto
.createHash('sha256')
.update(
JSON.stringify({
commandDictionaryId,
missionModelId,
id: expansion.id,
expansionLogic: expansion.expansionLogic,
activityType: expansion.activityType,
}),
)
.digest('hex');
if (!typeCheckingCache.has(hash)) {
const typeCheckResult = promiseThrottler.run(() => {
return (
piscina.run(
{
expansionLogic: expansion.expansionLogic,
commandTypes: commandTypes,
activityTypes: activityTypes,
activityTypeName: expansion.activityType,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);
});

const expansionBuildArtifacts = await expansionBuildArtifactsCache.get(expansion.id)!;
typeCheckingCache.set(hash, typeCheckResult);
}
const expansionBuildArtifacts = await typeCheckingCache.get(hash)!;

if (expansionBuildArtifacts.isErr()) {
return {
Expand Down
22 changes: 22 additions & 0 deletions sequencing-server/src/utils/PromiseThrottler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export class PromiseThrottler {
private runningPromises: Promise<unknown>[] = [];
private promiseLimit: number;

public constructor(promiseLimit: number) {
this.promiseLimit = promiseLimit;
}

public async run<T>(promiseFactory: () => Promise<T>): Promise<T> {
while (this.runningPromises.length >= this.promiseLimit) {
await Promise.race(this.runningPromises);
}
const promise = promiseFactory();
this.runningPromises.push(promise);
return promise.finally(() => {
const index = this.runningPromises.indexOf(promise);
if (index !== -1) {
this.runningPromises.splice(index, 1);
}
});
}
}
14 changes: 14 additions & 0 deletions sequencing-server/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ export async function typecheckExpansion(opts: {
expansionLogic: string;
commandTypes: string;
activityTypes: string;
activityTypeName?: string;
}): Promise<SerializedResult<CacheItem, ReturnType<UserCodeError['toJSON']>[]>> {
const startTime = Date.now();
console.log(
`[ Worker ] started transpiling authoring logic ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}`,
);

const result = await codeRunner.preProcess(
opts.expansionLogic,
'ExpansionReturn',
Expand All @@ -37,6 +43,14 @@ export async function typecheckExpansion(opts: {
ts.createSourceFile('TemporalPolyfillTypes.ts', temporalPolyfillTypes, compilerTarget),
],
);

const endTime = Date.now();
console.log(
`[ Worker ] finished transpiling ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}, (${
(endTime - startTime) / 1000
} s)`,
);

if (result.isOk()) {
return Result.Ok(result.unwrap()).toJSON();
} else {
Expand Down

0 comments on commit 6deb94f

Please sign in to comment.