Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dm): added stats for the user transformation batch request #2788

Merged
merged 10 commits into from
Nov 8, 2023
10 changes: 0 additions & 10 deletions src/legacy/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -571,14 +571,10 @@
destEvents.length > 0 && destEvents[0].metadata
? getMetadata(destEvents[0].metadata)
: {};
const userFuncStartTime = new Date();

Check failure on line 574 in src/legacy/router.js

View workflow job for this annotation

GitHub Actions / Code Coverage

'userFuncStartTime' is assigned a value but never used
if (transformationVersionId) {
let destTransformedEvents;
try {
stats.counter('user_transform_function_input_events', destEvents.length, {
processSessions,
...metaTags,
});
destTransformedEvents = await userTransformHandler()(
destEvents,
transformationVersionId,
Expand Down Expand Up @@ -630,12 +626,6 @@
processSessions,
...metaTags,
});
} finally {
stats.timing('user_transform_function_latency', userFuncStartTime, {
transformationVersionId,
processSessions,
...metaTags,
});
}
} else {
const errorMessage = 'Transformation VersionID not found';
Expand Down
3 changes: 0 additions & 3 deletions src/services/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ export default class UserTransformService {
}
const userFuncStartTime = new Date();
try {
stats.counter('user_transform_function_input_events', eventsToProcess.length, {
...metaTags,
});
const destTransformedEvents: UserTransformationResponse[] = await userTransformHandler()(
eventsToProcess,
transformationVersionId,
Expand Down
17 changes: 3 additions & 14 deletions src/util/customTransformer-faas.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ function generateFunctionName(userTransformation, libraryVersionIds, testMode) {
const ids = [userTransformation.workspaceId, userTransformation.versionId].concat(
(libraryVersionIds || []).sort(),
);
const hash = crypto.createHash('md5').update(`${ids}`).digest('hex');

const hash = crypto.createHash('md5').update(`${ids}`).digest('hex');
return `fn-${userTransformation.workspaceId}-${hash}`.substring(0, 63).toLowerCase();
}

Expand Down Expand Up @@ -127,15 +127,8 @@ async function runOpenFaasUserTransform(
if (events.length === 0) {
throw new Error('Invalid payload. No events');
}
const metaTags = events[0].metadata ? getMetadata(events[0].metadata) : {};
const tags = {
transformerVersionId: userTransformation.versionId,
identifier: 'openfaas',
testMode,
...metaTags,
};
const trMetadata = events[0].metadata ? getTransformationMetadata(events[0].metadata) : {};

const trMetadata = events[0].metadata ? getTransformationMetadata(events[0].metadata) : {};
// check and deploy faas function if not exists
const functionName = generateFunctionName(userTransformation, libraryVersionIds, testMode);
if (testMode) {
Expand All @@ -148,9 +141,7 @@ async function runOpenFaasUserTransform(
);
}

const invokeTime = new Date();
stats.counter('events_to_process', events.length, tags);
const result = await executeFaasFunction(
return await executeFaasFunction(
functionName,
events,
userTransformation.versionId,
Expand All @@ -165,8 +156,6 @@ async function runOpenFaasUserTransform(
testMode,
trMetadata,
);
stats.timing('run_time', invokeTime, tags);
abhimanyubabbar marked this conversation as resolved.
Show resolved Hide resolved
return result;
}

module.exports = {
Expand Down
89 changes: 37 additions & 52 deletions src/util/customTransformer-v1.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const ivm = require('isolated-vm');

abhimanyubabbar marked this conversation as resolved.
Show resolved Hide resolved
const { getFactory } = require('./ivmFactory');
const { getMetadata } = require('../v0/util');
const { getMetadata, getTransformationMetadata } = require('../v0/util');
const logger = require('../logger');
const stats = require('./stats');

Expand Down Expand Up @@ -54,63 +54,48 @@ async function userTransformHandlerV1(
libraryVersionIds,
testMode = false,
) {
/*
Removing pool usage to address memory leaks
Env variable ON_DEMAND_ISOLATE_VM is not being used anymore
*/
if (userTransformation.versionId) {
const metaTags = events.length && events[0].metadata ? getMetadata(events[0].metadata) : {};
const tags = {
transformerVersionId: userTransformation.versionId,
identifier: 'v1',
...metaTags,
};

logger.debug(`Isolate VM being created... `);
const isolatevmFactory = await getFactory(
userTransformation.code,
libraryVersionIds,
userTransformation.versionId,
userTransformation.secrets || {},
testMode,
);
const isolatevm = await isolatevmFactory.create();
logger.debug(`Isolate VM created... `);
if (!userTransformation.versionId) {
return { transformedEvents : events };
}

// Transform the event...
stats.counter('events_to_process', events.length, tags);
const isolateStartWallTime = calculateMsFromIvmTime(isolatevm.isolateStartWallTime);
const isolateStartCPUTime = calculateMsFromIvmTime(isolatevm.isolateStartCPUTime);
const isolatevmFactory = await getFactory(
userTransformation.code,
libraryVersionIds,
userTransformation.versionId,
userTransformation.secrets || {},
testMode,
);

const invokeTime = new Date();
let transformedEvents;
// Destroy isolatevm in case of execution errors
try {
transformedEvents = await transform(isolatevm, events);
} catch (err) {
logger.error(`Error encountered while executing transformation: ${err.message}`);
isolatevmFactory.destroy(isolatevm);
throw err;
}
const { logs } = isolatevm;
stats.timing('run_time', invokeTime, tags);
const isolateEndWallTime = calculateMsFromIvmTime(isolatevm.isolate.wallTime);
const isolateEndCPUTime = calculateMsFromIvmTime(isolatevm.isolate.cpuTime);
logger.debug(`Creating IsolateVM`);
const isolatevm = await isolatevmFactory.create();

//TODO: fix "Value is not a valid number: NaN" error and uncomment
//stats.timing('isolate_wall_time', isolateEndWallTime - isolateStartWallTime, tags);
//stats.timing('isolate_cpu_time', isolateEndCPUTime - isolateStartCPUTime, tags);
const invokeTime = new Date();
let transformedEvents;
let logs;
let transformationError;

// Destroy the isolated vm resources created
logger.debug(`Isolate VM being destroyed... `);
try {
transformedEvents = await transform(isolatevm, events);
logs = isolatevm.logs;
} catch (err) {
logger.error(`Error encountered while executing transformation: ${err.message}`);
transformationError = err;
throw err;
} finally {
logger.debug(`Destroying IsolateVM`);
isolatevmFactory.destroy(isolatevm);
logger.debug(`Isolate VM destroyed... `);

return { transformedEvents, logs };
// Events contain message and destination. We take the message part of event and run transformation on it.
// And put back the destination after transforrmation
// send the observability stats
const tags = {
identifier: 'v1',
errored: transformationError ? true : false,
abhimanyubabbar marked this conversation as resolved.
Show resolved Hide resolved
...events.length && events[0].metadata ? getMetadata(events[0].metadata) : {},
...events.length && events[0].metadata ? getTransformationMetadata(events[0].metadata) : {}
}
stats.counter('user_transform_function_input_events', events.length, tags);
stats.timing('user_transform_function_latency', invokeTime, tags);
}
return { transformedEvents: events };

return { transformedEvents, logs };
}

async function setUserTransformHandlerV1() {
Expand Down
21 changes: 13 additions & 8 deletions src/util/customTransformer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const { UserTransformHandlerFactory } = require('./customTransformerFactory');
const { parserForImport } = require('./parser');
const stats = require('./stats');
const { fetchWithDnsWrapper } = require('./utils');
const { getMetadata, getTransformationMetadata } = require('../v0/util');

const ISOLATE_VM_MEMORY = parseInt(process.env.ISOLATE_VM_MEMORY || '128', 10);
const GEOLOCATION_TIMEOUT_IN_MS = parseInt(process.env.GEOLOCATION_TIMEOUT_IN_MS || '1000', 10);
Expand All @@ -19,10 +20,6 @@ async function runUserTransform(
versionId,
testMode = false,
) {
const tags = {
transformerVersionId: versionId,
identifier: 'v0',
};
// TODO: Decide on the right value for memory limit
const isolate = new ivm.Isolate({ memoryLimit: ISOLATE_VM_MEMORY });
const context = await isolate.createContext();
Expand Down Expand Up @@ -214,9 +211,6 @@ async function runUserTransform(
const customScript = await isolate.compileScript(`${code}`);
await customScript.run(context);
const fnRef = await jail.get('transform', { reference: true });
// stat
stats.counter('events_to_process', events.length, tags);
// TODO : check if we can resolve this
// eslint-disable-next-line no-async-promise-executor
const executionPromise = new Promise(async (resolve, reject) => {
const sharedMessagesList = new ivm.ExternalCopy(events).copyInto({
Expand All @@ -233,6 +227,7 @@ async function runUserTransform(
}
});
let result;
let transformationError;
const invokeTime = new Date();
try {
const timeoutPromise = new Promise((resolve) => {
Expand All @@ -245,8 +240,8 @@ async function runUserTransform(
if (result === 'Timedout') {
throw new Error('Timed out');
}
stats.timing('run_time', invokeTime, tags);
} catch (error) {
transformationError = error;
throw error;
} finally {
// release function, script, context and isolate
Expand All @@ -255,6 +250,16 @@ async function runUserTransform(
bootstrapScriptResult.release();
context.release();
isolate.dispose();

const tags = {
identifier: 'v0',
errored: transformationError ? true : false,
...events.length && events[0].metadata ? getMetadata(events[0].metadata) : {},
...events.length && events[0].metadata ? getTransformationMetadata(events[0].metadata) : {}
}

stats.counter('user_transform_function_input_events', events.length, tags);
stats.timing('user_transform_function_latency', invokeTime, tags);
}

return {
Expand Down
28 changes: 22 additions & 6 deletions src/util/customTransformerFactory.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
const { setOpenFaasUserTransform, runOpenFaasUserTransform } = require('./customTransformer-faas');
const {
setOpenFaasUserTransform,
runOpenFaasUserTransform,
} = require('./customTransformer-faas');

const { userTransformHandlerV1, setUserTransformHandlerV1 } = require('./customTransformer-v1');
const {
userTransformHandlerV1,
setUserTransformHandlerV1,
} = require('./customTransformer-v1');

const UserTransformHandlerFactory = (userTransformation) => {
const transformHandler = {
return {
setUserTransform: async (libraryVersionIds) => {
switch (userTransformation.language) {
case 'pythonfaas':
Expand All @@ -17,13 +23,23 @@ const UserTransformHandlerFactory = (userTransformation) => {
switch (userTransformation.language) {
case 'pythonfaas':
case 'python':
return runOpenFaasUserTransform(events, userTransformation, libraryVersionIds, testMode);
return runOpenFaasUserTransform(
events,
userTransformation,
libraryVersionIds,
testMode
);

default:
return userTransformHandlerV1(events, userTransformation, libraryVersionIds, testMode);
return userTransformHandlerV1(
events,
userTransformation,
libraryVersionIds,
testMode
);
}
},
};
return transformHandler;
};

exports.UserTransformHandlerFactory = UserTransformHandlerFactory;
Loading
Loading