Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequence background transpiler #1332

Merged
merged 4 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ services:
SEQUENCING_LOCAL_STORE: /usr/src/app/sequencing_file_store
SEQUENCING_WORKER_NUM: 8
SEQUENCING_MAX_WORKER_HEAP_MB: 1000
TRANSPILER_ENABLED : "true"
image: aerie_sequencing
ports: ["27184:27184"]
restart: always
Expand Down
35 changes: 30 additions & 5 deletions sequencing-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { getHasuraSession, canUserPerformAction, ENDPOINTS_WHITELIST } from './u
import type { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads';
import type { CacheItem, UserCodeError } from '@nasa-jpl/aerie-ts-user-code-runner';
import { PromiseThrottler } from './utils/PromiseThrottler.js';
import { backgroundTranspiler } from './backgroundTranspiler.js';

const logger = getLogger('app');

Expand All @@ -39,7 +40,9 @@ app.use(bodyParser.json({ limit: '100mb' }));

DbExpansion.init();
export const db = DbExpansion.getDb();

export let graphqlClient = new GraphQLClient(getEnv().MERLIN_GRAPHQL_URL, {
headers: { 'x-hasura-admin-secret': getEnv().HASURA_GRAPHQL_ADMIN_SECRET },
});
export const piscina = new Piscina({
filename: new URL('worker.js', import.meta.url).pathname,
minThreads: parseInt(getEnv().SEQUENCING_WORKER_NUM),
Expand All @@ -62,10 +65,6 @@ export type Context = {
};

app.use(async (req: Request, res: Response, next: NextFunction) => {
const graphqlClient = new GraphQLClient(getEnv().MERLIN_GRAPHQL_URL, {
headers: { 'x-hasura-admin-secret': getEnv().HASURA_GRAPHQL_ADMIN_SECRET },
});

// Check and make sure the user making the request has the required permissions.
if (
!ENDPOINTS_WHITELIST.has(req.url) &&
Expand Down Expand Up @@ -247,4 +246,30 @@ app.listen(PORT, () => {
logger.info(`Worker pool initialized:
Total workers started: ${piscina.threads.length},
Heap Size per Worker: ${getEnv().SEQUENCING_MAX_WORKER_HEAP_MB} MB`);

if (getEnv().TRANSPILER_ENABLED === 'true') {
//log that the tranpiler is on
logger.info(`Background Transpiler is 'on'`);

let transpilerPromise: Promise<void> | undefined; // Holds the transpilation promise
async function invokeTranspiler() {
try {
await backgroundTranspiler();
} catch (error) {
console.error('Error during transpilation:', error);
} finally {
transpilerPromise = undefined; // Reset promise after completion
}
}

// Immediately call the background transpiler
transpilerPromise = invokeTranspiler();

// Schedule next execution after 2 minutes, handling ongoing transpilation
setInterval(async () => {
if (!transpilerPromise) {
transpilerPromise = invokeTranspiler(); // Start a new transpilation
}
}, 60 * 2 * 1000);
}
});
143 changes: 143 additions & 0 deletions sequencing-server/src/backgroundTranspiler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { piscina, graphqlClient, typeCheckingCache, promiseThrottler } from './app.js';
goetzrrGit marked this conversation as resolved.
Show resolved Hide resolved
import { objectCacheKeyFunction } from './lib/batchLoaders/index.js';
import crypto from 'crypto';
import { generateTypescriptForGraphQLActivitySchema } from './lib/codegen/ActivityTypescriptCodegen.js';
import DataLoader from 'dataloader';
import { activitySchemaBatchLoader } from './lib/batchLoaders/activitySchemaBatchLoader.js';
import { commandDictionaryTypescriptBatchLoader } from './lib/batchLoaders/commandDictionaryTypescriptBatchLoader.js';
import type { typecheckExpansion } from './worker';
import { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js';
import { getLatestCommandDictionary, getLatestMissionModel, getExpansionRule } from './utils/hasura.js';

export async function backgroundTranspiler(numberOfThreads: number = 2) {
if (graphqlClient === null) {
return;
}

// Fetch latest mission model
const { mission_model_aggregate } = await getLatestMissionModel(graphqlClient);
if (!mission_model_aggregate) {
console.log(
'[ Background Transpiler ] Unable to fetch the latest mission model. Aborting background transpiling...',
);
return;
}

// Fetch latest command dictionary
const { command_dictionary_aggregate } = await getLatestCommandDictionary(graphqlClient);
if (!command_dictionary_aggregate) {
console.log(
'[ Background Transpiler ] Unable to fetch the latest command dictionary. Aborting background transpiling...',
);
return;
}

const commandDictionaryId = command_dictionary_aggregate.aggregate.max.id;
const missionModelId = mission_model_aggregate.aggregate.max.id;
const { expansion_rule } = await getExpansionRule(graphqlClient, missionModelId, commandDictionaryId);

if (expansion_rule === null || expansion_rule.length === 0) {
console.log(`[ Background Transpiler ] No expansion rules to transpile.`);
return;
}

const commandTypescriptDataLoader = new DataLoader(commandDictionaryTypescriptBatchLoader({ graphqlClient }), {
cacheKeyFn: objectCacheKeyFunction,
name: null,
});
const activitySchemaDataLoader = new DataLoader(activitySchemaBatchLoader({ graphqlClient }), {
cacheKeyFn: objectCacheKeyFunction,
name: null,
});

const commandTypes = await commandTypescriptDataLoader.load({
dictionaryId: missionModelId,
});

if (commandTypes === null) {
console.log(`[ Background Transpiler ] Unable to fetch command ts lib.
Aborting transpiling...`);
return;
}

// only process 'numberOfThreads' worth at a time ex. transpile 2 logics at a time
// This allows for expansion set and sequence expansion to utilize the remaining workers
for (let i = 0; i < expansion_rule.length; i += numberOfThreads) {
goetzrrGit marked this conversation as resolved.
Show resolved Hide resolved
await Promise.all(
expansion_rule.slice(i, i + numberOfThreads).map(async expansion => {
await promiseThrottler.run(async () => {
// Assuming expansion_rule elements have the same type
if (expansion instanceof Error) {
console.log(`[ Background Transpiler ] Expansion: ${expansion.name} could not be loaded`, expansion);
return Promise.reject(`Expansion: ${expansion.name} could not be loaded`);
}

const hash = crypto
.createHash('sha256')
.update(
JSON.stringify({
commandDictionaryId,
missionModelId,
id: expansion.id,
expansionLogic: expansion.expansion_logic,
activityType: expansion.activity_type,
}),
)
.digest('hex');

// ignore already transpiled hash info
if (typeCheckingCache.has(hash)) {
return Promise.resolve();
}

const activitySchema = await activitySchemaDataLoader.load({
missionModelId,
activityTypeName: expansion.activity_type,
});

// log error
if (!activitySchema) {
console.log(
`[ Background Transpiler ] Activity schema for ${expansion.activity_type} could not be loaded`,
activitySchema,
);
return Promise.reject('Activity schema for ${expansion.activity_type} could not be loaded');
}

const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema);

// log error
if (!activityTypescript) {
console.log(
`[ Background Transpiler ] Unable to generate typescript for activity ${expansion.activity_type}`,
activityTypescript,
);
return Promise.reject(`Unable to generate typescript for activity ${expansion.activity_type}`);
}

const typecheckingResult = (
piscina.run(
{
expansionLogic: expansion.expansion_logic,
commandTypes: commandTypes,
activityTypes: activityTypescript,
activityTypeName: expansion.activity_type,
},
{ name: 'typecheckExpansion' },
) as ReturnType<typeof typecheckExpansion>
).then(Result.fromJSON);

//Display any errors
typecheckingResult.then(result => {
if (result.isErr()) {
console.log(`Error transpiling ${expansion.activity_type}:\n ${result.unwrapErr().map(e => e.message)}`);
}
});

typeCheckingCache.set(hash, typecheckingResult);
return typecheckingResult;
});
}),
);
}
}
4 changes: 4 additions & 0 deletions sequencing-server/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type Env = {
STORAGE: string;
SEQUENCING_WORKER_NUM: string;
SEQUENCING_MAX_WORKER_HEAP_MB: string;
TRANSPILER_ENABLED: string;
};

export const defaultEnv: Env = {
Expand All @@ -28,6 +29,7 @@ export const defaultEnv: Env = {
STORAGE: 'sequencing_file_store',
SEQUENCING_WORKER_NUM: '8',
SEQUENCING_MAX_WORKER_HEAP_MB: '1000',
TRANSPILER_ENABLED: 'true',
};

export function getEnv(): Env {
Expand All @@ -47,6 +49,7 @@ export function getEnv(): Env {
const SEQUENCING_WORKER_NUM = env['SEQUENCING_WORKER_NUM'] ?? defaultEnv.SEQUENCING_WORKER_NUM;
const SEQUENCING_MAX_WORKER_HEAP_MB =
env['SEQUENCING_MAX_WORKER_HEAP_MB'] ?? defaultEnv.SEQUENCING_MAX_WORKER_HEAP_MB;
const TRANSPILER_ENABLED = env['TRANSPILER_ENABLED'] ?? defaultEnv.TRANSPILER_ENABLED;
return {
HASURA_GRAPHQL_ADMIN_SECRET,
LOG_FILE,
Expand All @@ -61,5 +64,6 @@ export function getEnv(): Env {
STORAGE,
SEQUENCING_WORKER_NUM,
SEQUENCING_MAX_WORKER_HEAP_MB,
TRANSPILER_ENABLED,
};
}
96 changes: 95 additions & 1 deletion sequencing-server/src/utils/hasura.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const ENDPOINTS_WHITELIST = new Set([
'/seqjson/get-seqjson-for-sequence-standalone',
'/seqjson/get-seqjson-for-seqid-and-simulation-dataset',
'/seqjson/bulk-get-edsl-for-seqjson',
'/seqjson/get-edsl-for-seqjson'
'/seqjson/get-edsl-for-seqjson',
]);

/**
Expand Down Expand Up @@ -368,3 +368,97 @@ async function getMissionModelId(

return missionModelId;
}

export async function getLatestMissionModel(graphqlClient: GraphQLClient): Promise<{
mission_model_aggregate: {
aggregate: {
max: {
id: number;
};
};
};
}> {
return graphqlClient.request<{
mission_model_aggregate: {
aggregate: {
max: {
id: number;
};
};
};
}>(
gql`
query GetLatestMissionModel {
mission_model_aggregate(order_by: { uploaded_file: { modified_date: asc } }) {
aggregate {
max {
id
}
}
}
}
`,
);
}

export async function getLatestCommandDictionary(graphqlClient: GraphQLClient): Promise<{
command_dictionary_aggregate: {
aggregate: {
max: {
id: number;
};
};
};
}> {
return graphqlClient.request<{
command_dictionary_aggregate: {
aggregate: {
max: {
id: number;
};
};
};
}>(
gql`
query GetLatestCommandDictionary {
command_dictionary_aggregate(order_by: { created_at: asc }) {
aggregate {
max {
id
}
}
}
}
`,
);
}

export async function getExpansionRule(
graphqlClient: GraphQLClient,
missionModelId: number,
commandDictionaryId: number,
): Promise<{
expansion_rule: {
id: number;
activity_type: string;
expansion_logic: string;
}[];
}> {
return graphqlClient.request<{
expansion_rule: {
id: number;
activity_type: string;
expansion_logic: string;
}[];
}>(
gql`
query GetExpansonLogic {
expansion_rule(where: {authoring_command_dict_id: {_eq: ${missionModelId}}, authoring_mission_model_id: {_eq: ${commandDictionaryId} }}) {
id
activity_type
expansion_logic
}
}
`,
);
}