Skip to content

Commit

Permalink
Optimize Performance: Cache TS transpiling, enhance worker management
Browse files Browse the repository at this point in the history
Introduce a TypeScript transpiling cache to significantly reduce build times for both expansion sets and sequence expansion. This will be an upfront cost.

Implement chunking to prevent overloading the worker queue with excessive jobs. This effectively manages worker resources and prevents potential the consumption of resources and heap size.
  • Loading branch information
goetzrrGit committed Feb 28, 2024
1 parent f281d43 commit b991ca4
Showing 1 changed file with 94 additions and 49 deletions.
143 changes: 94 additions & 49 deletions sequencing-server/src/routes/command-expansion.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { CacheItem, UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner';
import type { UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner';
import pgFormat from 'pg-format';
import { Context, db, piscina } from './../app.js';
import { Context, db, piscina, promiseThrottler, typeCheckingCache } from './../app.js';
import { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js';
import express from 'express';
import { serializeWithTemporal } from './../utils/temporalSerializers.js';
Expand All @@ -13,6 +13,7 @@ import { unwrapPromiseSettledResults } from '../lib/batchLoaders/index.js';
import { defaultSeqBuilder } from '../defaultSeqBuilder.js';
import { ActivateStep, CommandStem, LoadStep } from './../lib/codegen/CommandEDSLPreface.js';
import { getUsername } from '../utils/hasura.js';
import * as crypto from 'crypto';

const logger = getLogger('app');

Expand Down Expand Up @@ -54,17 +55,18 @@ commandExpansionRouter.post('/put-expansion', async (req, res, next) => {
activityTypeName,
});
const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema);

const result = Result.fromJSON(
await (piscina.run(
{
expansionLogic,
commandTypes: commandTypes,
activityTypes: activityTypescript,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>),
);
const result = await promiseThrottler.run(() => {
return (
piscina.run(
{
commandTypes: commandTypes,
activityTypes: activityTypescript,
activityTypeName: activityTypeName,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);
});

res.status(200).json({ id, errors: result.isErr() ? result.unwrapErr() : [] });
return next();
Expand All @@ -90,32 +92,65 @@ commandExpansionRouter.post('/put-expansion-set', async (req, res, next) => {
if (expansion instanceof Error) {
throw new InheritedError(`Expansion with id: ${expansionIds[index]} could not be loaded`, expansion);
}

const hash = crypto
.createHash('sha256')
.update(
JSON.stringify({
commandDictionaryId,
missionModelId,
id: expansion.id,
expansionLogic: expansion.expansionLogic,
activityType: expansion.activityType,
}),
)
.digest('hex');

if (typeCheckingCache.has(hash)) {
console.log(`Using cached typechecked data for ${expansion.activityType}`);
return typeCheckingCache.get(hash);
}

const activitySchema = await context.activitySchemaDataLoader.load({
missionModelId,
activityTypeName: expansion.activityType,
});
const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema);
const result = Result.fromJSON(
await (piscina.run(
{
expansionLogic: expansion.expansionLogic,
commandTypes: commandTypes,
activityTypes: activityTypescript,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>),
);
const typeCheckResult = promiseThrottler.run(() => {
return (
piscina.run(
{
expansionLogic: expansion.expansionLogic,
commandTypes: commandTypes,
activityTypes: activityTypescript,
activityTypeName: expansion.activityType,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);
});

return result;
typeCheckingCache.set(hash, typeCheckResult);
return typeCheckResult;
}),
);

const errors = unwrapPromiseSettledResults(typecheckErrorPromises).reduce((accum, item) => {
if (item instanceof Error) {
accum.push(item);
} else if (item.isErr()) {
accum.push(...item.unwrapErr());
if (item && (item instanceof Error || item.isErr)) {
// Check for item's existence before accessing properties
if (item instanceof Error) {
accum.push(item);
} else if (item.isErr()) {
try {
accum.push(...item.unwrapErr()); // Handle potential errors within unwrapErr
} catch (error) {
accum.push(new Error('Failed to unwrap error: ' + error)); // Log unwrapErr errors
}
}
} else {
accum.push(new Error('Unexpected result in resolved promises')); // Handle unexpected non-error values
}

return accum;
}, [] as (Error | ReturnType<UserCodeError['toJSON']>)[]);

Expand Down Expand Up @@ -168,15 +203,10 @@ commandExpansionRouter.post('/expand-all-activity-instances', async (req, res, n
context.expansionSetDataLoader.load({ expansionSetId }),
context.simulatedActivitiesDataLoader.load({ simulationDatasetId }),
]);
const commandDictionaryId = expansionSet.commandDictionary.id;
const missionModelId = expansionSet.missionModel.id;
const commandTypes = expansionSet.commandDictionary.commandTypesTypeScript;

// Note: We are keeping the Promise in the cache so that we don't have to wait for resolution to insert into
// the cache and consequently end up doing the compilation multiple times because of a cache miss.
const expansionBuildArtifactsCache = new Map<
number,
Promise<Result<CacheItem, ReturnType<UserCodeError['toJSON']>[]>>
>();

const settledExpansionResults = await Promise.allSettled(
simulatedActivities.map(async simulatedActivity => {
// The simulatedActivity's duration and endTime will be null if the effect model reaches across the plan end boundaries.
Expand Down Expand Up @@ -205,21 +235,36 @@ commandExpansionRouter.post('/expand-all-activity-instances', async (req, res, n
}
const activityTypes = generateTypescriptForGraphQLActivitySchema(activitySchema);

if (!expansionBuildArtifactsCache.has(expansion.id)) {
const typecheckResult = (
piscina.run(
{
expansionLogic: expansion.expansionLogic,
commandTypes: commandTypes,
activityTypes,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);
expansionBuildArtifactsCache.set(expansion.id, typecheckResult);
}
const hash = crypto
.createHash('sha256')
.update(
JSON.stringify({
commandDictionaryId,
missionModelId,
id: expansion.id,
expansionLogic: expansion.expansionLogic,
activityType: expansion.activityType,
}),
)
.digest('hex');
if (!typeCheckingCache.has(hash)) {
const typeCheckResult = promiseThrottler.run(() => {
return (
piscina.run(
{
expansionLogic: expansion.expansionLogic,
commandTypes: commandTypes,
activityTypes: activityTypes,
activityTypeName: expansion.activityType,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);
});

const expansionBuildArtifacts = await expansionBuildArtifactsCache.get(expansion.id)!;
typeCheckingCache.set(hash, typeCheckResult);
}
const expansionBuildArtifacts = await typeCheckingCache.get(hash)!;

if (expansionBuildArtifacts.isErr()) {
return {
Expand Down

0 comments on commit b991ca4

Please sign in to comment.