diff --git a/docker-compose.yml b/docker-compose.yml index aa46392887..9df4a80fea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -95,6 +95,7 @@ services: SEQUENCING_LOCAL_STORE: /usr/src/app/sequencing_file_store SEQUENCING_WORKER_NUM: 8 SEQUENCING_MAX_WORKER_HEAP_MB: 1000 + TRANSPILER_ENABLED : "true" image: aerie_sequencing ports: ["27184:27184"] restart: always diff --git a/sequencing-server/src/app.ts b/sequencing-server/src/app.ts index dd1370f406..975259333f 100644 --- a/sequencing-server/src/app.ts +++ b/sequencing-server/src/app.ts @@ -27,6 +27,7 @@ import { getHasuraSession, canUserPerformAction, ENDPOINTS_WHITELIST } from './u import type { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads'; import type { CacheItem, UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner'; import { PromiseThrottler } from './utils/PromiseThrottler.js'; +import { backgroundTranspiler } from './backgroundTranspiler.js'; const logger = getLogger('app'); @@ -39,7 +40,9 @@ app.use(bodyParser.json({ limit: '100mb' })); DbExpansion.init(); export const db = DbExpansion.getDb(); - +export let graphqlClient = new GraphQLClient(getEnv().MERLIN_GRAPHQL_URL, { + headers: { 'x-hasura-admin-secret': getEnv().HASURA_GRAPHQL_ADMIN_SECRET }, +}); export const piscina = new Piscina({ filename: new URL('worker.js', import.meta.url).pathname, minThreads: parseInt(getEnv().SEQUENCING_WORKER_NUM), @@ -62,10 +65,6 @@ export type Context = { }; app.use(async (req: Request, res: Response, next: NextFunction) => { - const graphqlClient = new GraphQLClient(getEnv().MERLIN_GRAPHQL_URL, { - headers: { 'x-hasura-admin-secret': getEnv().HASURA_GRAPHQL_ADMIN_SECRET }, - }); - // Check and make sure the user making the request has the required permissions. if ( !ENDPOINTS_WHITELIST.has(req.url) && @@ -247,4 +246,30 @@ app.listen(PORT, () => { logger.info(`Worker pool initialized: Total workers started: ${piscina.threads.length}, Heap Size per Worker: ${getEnv().SEQUENCING_MAX_WORKER_HEAP_MB} MB`); + + if (getEnv().TRANSPILER_ENABLED === 'true') { + //log that the tranpiler is on + logger.info(`Background Transpiler is 'on'`); + + let transpilerPromise: Promise | undefined; // Holds the transpilation promise + async function invokeTranspiler() { + try { + await backgroundTranspiler(); + } catch (error) { + console.error('Error during transpilation:', error); + } finally { + transpilerPromise = undefined; // Reset promise after completion + } + } + + // Immediately call the background transpiler + transpilerPromise = invokeTranspiler(); + + // Schedule next execution after 2 minutes, handling ongoing transpilation + setInterval(async () => { + if (!transpilerPromise) { + transpilerPromise = invokeTranspiler(); // Start a new transpilation + } + }, 60 * 2 * 1000); + } }); diff --git a/sequencing-server/src/backgroundTranspiler.ts b/sequencing-server/src/backgroundTranspiler.ts new file mode 100644 index 0000000000..f78863f882 --- /dev/null +++ b/sequencing-server/src/backgroundTranspiler.ts @@ -0,0 +1,143 @@ +import { piscina, graphqlClient, typeCheckingCache, promiseThrottler } from './app.js'; +import { objectCacheKeyFunction } from './lib/batchLoaders/index.js'; +import crypto from 'crypto'; +import { generateTypescriptForGraphQLActivitySchema } from './lib/codegen/ActivityTypescriptCodegen.js'; +import DataLoader from 'dataloader'; +import { activitySchemaBatchLoader } from './lib/batchLoaders/activitySchemaBatchLoader.js'; +import { commandDictionaryTypescriptBatchLoader } from './lib/batchLoaders/commandDictionaryTypescriptBatchLoader.js'; +import type { typecheckExpansion } from './worker'; +import { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js'; +import { getLatestCommandDictionary, getLatestMissionModel, getExpansionRule } from './utils/hasura.js'; + +export async function backgroundTranspiler(numberOfThreads: number = 2) { + if (graphqlClient === null) { + return; + } + + // Fetch latest mission model + const { mission_model_aggregate } = await getLatestMissionModel(graphqlClient); + if (!mission_model_aggregate) { + console.log( + '[ Background Transpiler ] Unable to fetch the latest mission model. Aborting background transpiling...', + ); + return; + } + + // Fetch latest command dictionary + const { command_dictionary_aggregate } = await getLatestCommandDictionary(graphqlClient); + if (!command_dictionary_aggregate) { + console.log( + '[ Background Transpiler ] Unable to fetch the latest command dictionary. Aborting background transpiling...', + ); + return; + } + + const commandDictionaryId = command_dictionary_aggregate.aggregate.max.id; + const missionModelId = mission_model_aggregate.aggregate.max.id; + const { expansion_rule } = await getExpansionRule(graphqlClient, missionModelId, commandDictionaryId); + + if (expansion_rule === null || expansion_rule.length === 0) { + console.log(`[ Background Transpiler ] No expansion rules to transpile.`); + return; + } + + const commandTypescriptDataLoader = new DataLoader(commandDictionaryTypescriptBatchLoader({ graphqlClient }), { + cacheKeyFn: objectCacheKeyFunction, + name: null, + }); + const activitySchemaDataLoader = new DataLoader(activitySchemaBatchLoader({ graphqlClient }), { + cacheKeyFn: objectCacheKeyFunction, + name: null, + }); + + const commandTypes = await commandTypescriptDataLoader.load({ + dictionaryId: missionModelId, + }); + + if (commandTypes === null) { + console.log(`[ Background Transpiler ] Unable to fetch command ts lib. + Aborting transpiling...`); + return; + } + + // only process 'numberOfThreads' worth at a time ex. transpile 2 logics at a time + // This allows for expansion set and sequence expansion to utilize the remaining workers + for (let i = 0; i < expansion_rule.length; i += numberOfThreads) { + await Promise.all( + expansion_rule.slice(i, i + numberOfThreads).map(async expansion => { + await promiseThrottler.run(async () => { + // Assuming expansion_rule elements have the same type + if (expansion instanceof Error) { + console.log(`[ Background Transpiler ] Expansion: ${expansion.name} could not be loaded`, expansion); + return Promise.reject(`Expansion: ${expansion.name} could not be loaded`); + } + + const hash = crypto + .createHash('sha256') + .update( + JSON.stringify({ + commandDictionaryId, + missionModelId, + id: expansion.id, + expansionLogic: expansion.expansion_logic, + activityType: expansion.activity_type, + }), + ) + .digest('hex'); + + // ignore already transpiled hash info + if (typeCheckingCache.has(hash)) { + return Promise.resolve(); + } + + const activitySchema = await activitySchemaDataLoader.load({ + missionModelId, + activityTypeName: expansion.activity_type, + }); + + // log error + if (!activitySchema) { + console.log( + `[ Background Transpiler ] Activity schema for ${expansion.activity_type} could not be loaded`, + activitySchema, + ); + return Promise.reject('Activity schema for ${expansion.activity_type} could not be loaded'); + } + + const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema); + + // log error + if (!activityTypescript) { + console.log( + `[ Background Transpiler ] Unable to generate typescript for activity ${expansion.activity_type}`, + activityTypescript, + ); + return Promise.reject(`Unable to generate typescript for activity ${expansion.activity_type}`); + } + + const typecheckingResult = ( + piscina.run( + { + expansionLogic: expansion.expansion_logic, + commandTypes: commandTypes, + activityTypes: activityTypescript, + activityTypeName: expansion.activity_type, + }, + { name: 'typecheckExpansion' }, + ) as ReturnType + ).then(Result.fromJSON); + + //Display any errors + typecheckingResult.then(result => { + if (result.isErr()) { + console.log(`Error transpiling ${expansion.activity_type}:\n ${result.unwrapErr().map(e => e.message)}`); + } + }); + + typeCheckingCache.set(hash, typecheckingResult); + return typecheckingResult; + }); + }), + ); + } +} diff --git a/sequencing-server/src/env.ts b/sequencing-server/src/env.ts index ee8564799f..0f2d5e8c06 100644 --- a/sequencing-server/src/env.ts +++ b/sequencing-server/src/env.ts @@ -12,6 +12,7 @@ export type Env = { STORAGE: string; SEQUENCING_WORKER_NUM: string; SEQUENCING_MAX_WORKER_HEAP_MB: string; + TRANSPILER_ENABLED: string; }; export const defaultEnv: Env = { @@ -28,6 +29,7 @@ export const defaultEnv: Env = { STORAGE: 'sequencing_file_store', SEQUENCING_WORKER_NUM: '8', SEQUENCING_MAX_WORKER_HEAP_MB: '1000', + TRANSPILER_ENABLED: 'true', }; export function getEnv(): Env { @@ -47,6 +49,7 @@ export function getEnv(): Env { const SEQUENCING_WORKER_NUM = env['SEQUENCING_WORKER_NUM'] ?? defaultEnv.SEQUENCING_WORKER_NUM; const SEQUENCING_MAX_WORKER_HEAP_MB = env['SEQUENCING_MAX_WORKER_HEAP_MB'] ?? defaultEnv.SEQUENCING_MAX_WORKER_HEAP_MB; + const TRANSPILER_ENABLED = env['TRANSPILER_ENABLED'] ?? defaultEnv.TRANSPILER_ENABLED; return { HASURA_GRAPHQL_ADMIN_SECRET, LOG_FILE, @@ -61,5 +64,6 @@ export function getEnv(): Env { STORAGE, SEQUENCING_WORKER_NUM, SEQUENCING_MAX_WORKER_HEAP_MB, + TRANSPILER_ENABLED, }; } diff --git a/sequencing-server/src/utils/hasura.ts b/sequencing-server/src/utils/hasura.ts index 8b92280007..29beeb6f58 100644 --- a/sequencing-server/src/utils/hasura.ts +++ b/sequencing-server/src/utils/hasura.ts @@ -32,7 +32,7 @@ export const ENDPOINTS_WHITELIST = new Set([ '/seqjson/get-seqjson-for-sequence-standalone', '/seqjson/get-seqjson-for-seqid-and-simulation-dataset', '/seqjson/bulk-get-edsl-for-seqjson', - '/seqjson/get-edsl-for-seqjson' + '/seqjson/get-edsl-for-seqjson', ]); /** @@ -368,3 +368,97 @@ async function getMissionModelId( return missionModelId; } + +export async function getLatestMissionModel(graphqlClient: GraphQLClient): Promise<{ + mission_model_aggregate: { + aggregate: { + max: { + id: number; + }; + }; + }; +}> { + return graphqlClient.request<{ + mission_model_aggregate: { + aggregate: { + max: { + id: number; + }; + }; + }; + }>( + gql` + query GetLatestMissionModel { + mission_model_aggregate(order_by: { uploaded_file: { modified_date: asc } }) { + aggregate { + max { + id + } + } + } + } + `, + ); +} + +export async function getLatestCommandDictionary(graphqlClient: GraphQLClient): Promise<{ + command_dictionary_aggregate: { + aggregate: { + max: { + id: number; + }; + }; + }; +}> { + return graphqlClient.request<{ + command_dictionary_aggregate: { + aggregate: { + max: { + id: number; + }; + }; + }; + }>( + gql` + query GetLatestCommandDictionary { + command_dictionary_aggregate(order_by: { created_at: asc }) { + aggregate { + max { + id + } + } + } + } + `, + ); +} + +export async function getExpansionRule( + graphqlClient: GraphQLClient, + missionModelId: number, + commandDictionaryId: number, +): Promise<{ + expansion_rule: { + id: number; + activity_type: string; + expansion_logic: string; + }[]; +}> { + return graphqlClient.request<{ + expansion_rule: { + id: number; + activity_type: string; + expansion_logic: string; + }[]; + }>( + gql` + query GetExpansonLogic { + expansion_rule(where: {authoring_command_dict_id: {_eq: ${missionModelId}}, authoring_mission_model_id: {_eq: ${commandDictionaryId} }}) { + id + activity_type + expansion_logic + } + } + `, + ); +}