diff --git a/src/util/customTransformer-faas.js b/src/util/customTransformer-faas.js index d1fa48d2d1..f4b19be0ac 100644 --- a/src/util/customTransformer-faas.js +++ b/src/util/customTransformer-faas.js @@ -85,10 +85,10 @@ async function setOpenFaasUserTransform( ) { const tags = { transformerVersionId: userTransformation.versionId, - language: userTransformation.language, identifier: 'openfaas', testMode, }; + const trMetadata = events[0].metadata ? getTransformationMetadata(userTransformation) : {}; const functionName = pregeneratedFnName || generateFunctionName(userTransformation, libraryVersionIds, testMode); const setupTime = new Date(); @@ -106,6 +106,7 @@ async function setOpenFaasUserTransform( testMode, ), testMode, + trMetadata, ); stats.timing('creation_time', setupTime, tags); @@ -129,16 +130,22 @@ async function runOpenFaasUserTransform( const metaTags = events[0].metadata ? getMetadata(events[0].metadata) : {}; const tags = { transformerVersionId: userTransformation.versionId, - language: userTransformation.language, identifier: 'openfaas', testMode, ...metaTags, }; + const trMetadata = events[0].metadata ? getTransformationMetadata(userTransformation) : {}; // check and deploy faas function if not exists const functionName = generateFunctionName(userTransformation, libraryVersionIds, testMode); if (testMode) { - await setOpenFaasUserTransform(userTransformation, libraryVersionIds, functionName, testMode); + await setOpenFaasUserTransform( + userTransformation, + libraryVersionIds, + functionName, + testMode, + trMetadata, + ); } const invokeTime = new Date(); @@ -156,6 +163,7 @@ async function runOpenFaasUserTransform( testMode, ), testMode, + trMetadata, ); stats.timing('run_time', invokeTime, tags); return result; diff --git a/src/util/openfaas/index.js b/src/util/openfaas/index.js index 60ad316e1b..81d70d94fa 100644 --- a/src/util/openfaas/index.js +++ b/src/util/openfaas/index.js @@ -23,6 +23,8 @@ const CONFIG_BACKEND_URL = process.env.CONFIG_BACKEND_URL || 'https://api.rudder const GEOLOCATION_URL = process.env.GEOLOCATION_URL || ''; const FAAS_AST_VID = 'ast'; const FAAS_AST_FN_NAME = 'fn-ast'; +const LABEL_WORKSPACE_IDS = process.env.LABEL_WORKSPACES_IDS || ''; +const labelWorkspaceIds = LABEL_WORKSPACE_IDS.split(','); // Initialise node cache const functionListCache = new NodeCache(); @@ -111,7 +113,14 @@ const invalidateFnCache = () => { functionListCache.set(FUNC_LIST_KEY, []); }; -const deployFaasFunction = async (functionName, code, versionId, libraryVersionIDs, testMode) => { +const deployFaasFunction = async ( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata, +) => { try { logger.debug('[Faas] Deploying a faas function'); let envProcess = 'python index.py'; @@ -132,6 +141,19 @@ const deployFaasFunction = async (functionName, code, versionId, libraryVersionI if (GEOLOCATION_URL) { envVars.geolocation_url = GEOLOCATION_URL; } + // labels + const labels = { + 'openfaas-fn': 'true', + 'parent-component': 'openfaas', + 'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT, + 'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT, + transformationId: trMetadata.transformationId, + workspaceId: trMetadata.workspaceId, + }; + if (trMetadata.workspaceId && labelWorkspaceIds.includes(trMetadata.workspaceId)) { + labels.customPolicy = 'true'; + } + // TODO: investigate and add more required labels and annotations const payload = { service: functionName, @@ -139,12 +161,7 @@ const deployFaasFunction = async (functionName, code, versionId, libraryVersionI image: FAAS_BASE_IMG, envProcess, envVars, - labels: { - 'openfaas-fn': 'true', - 'parent-component': 'openfaas', - 'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT, - 'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT, - }, + labels, annotations: { 'prometheus.io.scrape': 'true', }, @@ -175,14 +192,28 @@ const deployFaasFunction = async (functionName, code, versionId, libraryVersionI } }; -async function setupFaasFunction(functionName, code, versionId, libraryVersionIDs, testMode) { +async function setupFaasFunction( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata, +) { try { if (!testMode && isFunctionDeployed(functionName)) { logger.debug(`[Faas] Function ${functionName} already deployed`); return; } // deploy faas function - await deployFaasFunction(functionName, code, versionId, libraryVersionIDs, testMode); + await deployFaasFunction( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata, + ); // This api call is only used to check if function is spinned correctly await awaitFunctionReadiness(functionName); @@ -201,6 +232,7 @@ const executeFaasFunction = async ( versionId, libraryVersionIDs, testMode, + trMetadata, ) => { try { logger.debug('[Faas] Invoking faas function'); @@ -217,7 +249,14 @@ const executeFaasFunction = async ( error.message.includes(`error finding function ${functionName}`) ) { removeFunctionFromCache(functionName); - await setupFaasFunction(functionName, null, versionId, libraryVersionIDs, testMode); + await setupFaasFunction( + functionName, + null, + versionId, + libraryVersionIDs, + testMode, + trMetadata, + ); throw new RetryRequestError(`${functionName} not found`); }