Skip to content

Commit

Permalink
Merge pull request #1332 from NASA-AMMOS/sequence_background_transpiler
Browse files Browse the repository at this point in the history
Sequence background transpiler
  • Loading branch information
goetzrrGit authored Mar 1, 2024
2 parents c4d4d76 + 3743ba7 commit b683cfe
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 6 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ services:
SEQUENCING_LOCAL_STORE: /usr/src/app/sequencing_file_store
SEQUENCING_WORKER_NUM: 8
SEQUENCING_MAX_WORKER_HEAP_MB: 1000
TRANSPILER_ENABLED : "true"
image: aerie_sequencing
ports: ["27184:27184"]
restart: always
Expand Down
35 changes: 30 additions & 5 deletions sequencing-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { getHasuraSession, canUserPerformAction, ENDPOINTS_WHITELIST } from './u
import type { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads';
import type { CacheItem, UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner';
import { PromiseThrottler } from './utils/PromiseThrottler.js';
import { backgroundTranspiler } from './backgroundTranspiler.js';

const logger = getLogger('app');

Expand All @@ -39,7 +40,9 @@ app.use(bodyParser.json({ limit: '100mb' }));

DbExpansion.init();
export const db = DbExpansion.getDb();

export let graphqlClient = new GraphQLClient(getEnv().MERLIN_GRAPHQL_URL, {
headers: { 'x-hasura-admin-secret': getEnv().HASURA_GRAPHQL_ADMIN_SECRET },
});
export const piscina = new Piscina({
filename: new URL('worker.js', import.meta.url).pathname,
minThreads: parseInt(getEnv().SEQUENCING_WORKER_NUM),
Expand All @@ -62,10 +65,6 @@ export type Context = {
};

app.use(async (req: Request, res: Response, next: NextFunction) => {
const graphqlClient = new GraphQLClient(getEnv().MERLIN_GRAPHQL_URL, {
headers: { 'x-hasura-admin-secret': getEnv().HASURA_GRAPHQL_ADMIN_SECRET },
});

// Check and make sure the user making the request has the required permissions.
if (
!ENDPOINTS_WHITELIST.has(req.url) &&
Expand Down Expand Up @@ -247,4 +246,30 @@ app.listen(PORT, () => {
logger.info(`Worker pool initialized:
Total workers started: ${piscina.threads.length},
Heap Size per Worker: ${getEnv().SEQUENCING_MAX_WORKER_HEAP_MB} MB`);

if (getEnv().TRANSPILER_ENABLED === 'true') {
//log that the tranpiler is on
logger.info(`Background Transpiler is 'on'`);

let transpilerPromise: Promise<void> | undefined; // Holds the transpilation promise
async function invokeTranspiler() {
try {
await backgroundTranspiler();
} catch (error) {
console.error('Error during transpilation:', error);
} finally {
transpilerPromise = undefined; // Reset promise after completion
}
}

// Immediately call the background transpiler
transpilerPromise = invokeTranspiler();

// Schedule next execution after 2 minutes, handling ongoing transpilation
setInterval(async () => {
if (!transpilerPromise) {
transpilerPromise = invokeTranspiler(); // Start a new transpilation
}
}, 60 * 2 * 1000);
}
});
143 changes: 143 additions & 0 deletions sequencing-server/src/backgroundTranspiler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { piscina, graphqlClient, typeCheckingCache, promiseThrottler } from './app.js';
import { objectCacheKeyFunction } from './lib/batchLoaders/index.js';
import crypto from 'crypto';
import { generateTypescriptForGraphQLActivitySchema } from './lib/codegen/ActivityTypescriptCodegen.js';
import DataLoader from 'dataloader';
import { activitySchemaBatchLoader } from './lib/batchLoaders/activitySchemaBatchLoader.js';
import { commandDictionaryTypescriptBatchLoader } from './lib/batchLoaders/commandDictionaryTypescriptBatchLoader.js';
import type { typecheckExpansion } from './worker';
import { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js';
import { getLatestCommandDictionary, getLatestMissionModel, getExpansionRule } from './utils/hasura.js';

export async function backgroundTranspiler(numberOfThreads: number = 2) {
if (graphqlClient === null) {
return;
}

// Fetch latest mission model
const { mission_model_aggregate } = await getLatestMissionModel(graphqlClient);
if (!mission_model_aggregate) {
console.log(
'[ Background Transpiler ] Unable to fetch the latest mission model. Aborting background transpiling...',
);
return;
}

// Fetch latest command dictionary
const { command_dictionary_aggregate } = await getLatestCommandDictionary(graphqlClient);
if (!command_dictionary_aggregate) {
console.log(
'[ Background Transpiler ] Unable to fetch the latest command dictionary. Aborting background transpiling...',
);
return;
}

const commandDictionaryId = command_dictionary_aggregate.aggregate.max.id;
const missionModelId = mission_model_aggregate.aggregate.max.id;
const { expansion_rule } = await getExpansionRule(graphqlClient, missionModelId, commandDictionaryId);

if (expansion_rule === null || expansion_rule.length === 0) {
console.log(`[ Background Transpiler ] No expansion rules to transpile.`);
return;
}

const commandTypescriptDataLoader = new DataLoader(commandDictionaryTypescriptBatchLoader({ graphqlClient }), {
cacheKeyFn: objectCacheKeyFunction,
name: null,
});
const activitySchemaDataLoader = new DataLoader(activitySchemaBatchLoader({ graphqlClient }), {
cacheKeyFn: objectCacheKeyFunction,
name: null,
});

const commandTypes = await commandTypescriptDataLoader.load({
dictionaryId: missionModelId,
});

if (commandTypes === null) {
console.log(`[ Background Transpiler ] Unable to fetch command ts lib.
Aborting transpiling...`);
return;
}

// only process 'numberOfThreads' worth at a time ex. transpile 2 logics at a time
// This allows for expansion set and sequence expansion to utilize the remaining workers
for (let i = 0; i < expansion_rule.length; i += numberOfThreads) {
await Promise.all(
expansion_rule.slice(i, i + numberOfThreads).map(async expansion => {
await promiseThrottler.run(async () => {
// Assuming expansion_rule elements have the same type
if (expansion instanceof Error) {
console.log(`[ Background Transpiler ] Expansion: ${expansion.name} could not be loaded`, expansion);
return Promise.reject(`Expansion: ${expansion.name} could not be loaded`);
}

const hash = crypto
.createHash('sha256')
.update(
JSON.stringify({
commandDictionaryId,
missionModelId,
id: expansion.id,
expansionLogic: expansion.expansion_logic,
activityType: expansion.activity_type,
}),
)
.digest('hex');

// ignore already transpiled hash info
if (typeCheckingCache.has(hash)) {
return Promise.resolve();
}

const activitySchema = await activitySchemaDataLoader.load({
missionModelId,
activityTypeName: expansion.activity_type,
});

// log error
if (!activitySchema) {
console.log(
`[ Background Transpiler ] Activity schema for ${expansion.activity_type} could not be loaded`,
activitySchema,
);
return Promise.reject('Activity schema for ${expansion.activity_type} could not be loaded');
}

const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema);

// log error
if (!activityTypescript) {
console.log(
`[ Background Transpiler ] Unable to generate typescript for activity ${expansion.activity_type}`,
activityTypescript,
);
return Promise.reject(`Unable to generate typescript for activity ${expansion.activity_type}`);
}

const typecheckingResult = (
piscina.run(
{
expansionLogic: expansion.expansion_logic,
commandTypes: commandTypes,
activityTypes: activityTypescript,
activityTypeName: expansion.activity_type,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);

//Display any errors
typecheckingResult.then(result => {
if (result.isErr()) {
console.log(`Error transpiling ${expansion.activity_type}:\n ${result.unwrapErr().map(e => e.message)}`);
}
});

typeCheckingCache.set(hash, typecheckingResult);
return typecheckingResult;
});
}),
);
}
}
4 changes: 4 additions & 0 deletions sequencing-server/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type Env = {
STORAGE: string;
SEQUENCING_WORKER_NUM: string;
SEQUENCING_MAX_WORKER_HEAP_MB: string;
TRANSPILER_ENABLED: string;
};

export const defaultEnv: Env = {
Expand All @@ -28,6 +29,7 @@ export const defaultEnv: Env = {
STORAGE: 'sequencing_file_store',
SEQUENCING_WORKER_NUM: '8',
SEQUENCING_MAX_WORKER_HEAP_MB: '1000',
TRANSPILER_ENABLED: 'true',
};

export function getEnv(): Env {
Expand All @@ -47,6 +49,7 @@ export function getEnv(): Env {
const SEQUENCING_WORKER_NUM = env['SEQUENCING_WORKER_NUM'] ?? defaultEnv.SEQUENCING_WORKER_NUM;
const SEQUENCING_MAX_WORKER_HEAP_MB =
env['SEQUENCING_MAX_WORKER_HEAP_MB'] ?? defaultEnv.SEQUENCING_MAX_WORKER_HEAP_MB;
const TRANSPILER_ENABLED = env['TRANSPILER_ENABLED'] ?? defaultEnv.TRANSPILER_ENABLED;
return {
HASURA_GRAPHQL_ADMIN_SECRET,
LOG_FILE,
Expand All @@ -61,5 +64,6 @@ export function getEnv(): Env {
STORAGE,
SEQUENCING_WORKER_NUM,
SEQUENCING_MAX_WORKER_HEAP_MB,
TRANSPILER_ENABLED,
};
}
96 changes: 95 additions & 1 deletion sequencing-server/src/utils/hasura.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const ENDPOINTS_WHITELIST = new Set([
'/seqjson/get-seqjson-for-sequence-standalone',
'/seqjson/get-seqjson-for-seqid-and-simulation-dataset',
'/seqjson/bulk-get-edsl-for-seqjson',
'/seqjson/get-edsl-for-seqjson'
'/seqjson/get-edsl-for-seqjson',
]);

/**
Expand Down Expand Up @@ -368,3 +368,97 @@ async function getMissionModelId(

return missionModelId;
}

export async function getLatestMissionModel(graphqlClient: GraphQLClient): Promise<{
mission_model_aggregate: {
aggregate: {
max: {
id: number;
};
};
};
}> {
return graphqlClient.request<{
mission_model_aggregate: {
aggregate: {
max: {
id: number;
};
};
};
}>(
gql`
query GetLatestMissionModel {
mission_model_aggregate(order_by: { uploaded_file: { modified_date: asc } }) {
aggregate {
max {
id
}
}
}
}
`,
);
}

export async function getLatestCommandDictionary(graphqlClient: GraphQLClient): Promise<{
command_dictionary_aggregate: {
aggregate: {
max: {
id: number;
};
};
};
}> {
return graphqlClient.request<{
command_dictionary_aggregate: {
aggregate: {
max: {
id: number;
};
};
};
}>(
gql`
query GetLatestCommandDictionary {
command_dictionary_aggregate(order_by: { created_at: asc }) {
aggregate {
max {
id
}
}
}
}
`,
);
}

export async function getExpansionRule(
graphqlClient: GraphQLClient,
missionModelId: number,
commandDictionaryId: number,
): Promise<{
expansion_rule: {
id: number;
activity_type: string;
expansion_logic: string;
}[];
}> {
return graphqlClient.request<{
expansion_rule: {
id: number;
activity_type: string;
expansion_logic: string;
}[];
}>(
gql`
query GetExpansonLogic {
expansion_rule(where: {authoring_command_dict_id: {_eq: ${missionModelId}}, authoring_mission_model_id: {_eq: ${commandDictionaryId} }}) {
id
activity_type
expansion_logic
}
}
`,
);
}

0 comments on commit b683cfe

Please sign in to comment.