From 076478910de0e0112a007637fe2b5392ed7c1321 Mon Sep 17 00:00:00 2001 From: Abhimanyu Babbar Date: Wed, 24 Apr 2024 16:34:29 +0530 Subject: [PATCH] chore: added reconcile fn for openfaas fn pods --- src/util/customTransformer-faas.js | 26 ++- src/util/openfaas/faasApi.js | 26 ++- src/util/openfaas/index.js | 182 +++++++++++++-------- test/__tests__/user_transformation.test.js | 44 ++++- 4 files changed, 203 insertions(+), 75 deletions(-) diff --git a/src/util/customTransformer-faas.js b/src/util/customTransformer-faas.js index 2c0bbfd8c0..9ac9804097 100644 --- a/src/util/customTransformer-faas.js +++ b/src/util/customTransformer-faas.js @@ -11,9 +11,10 @@ const { } = require('./openfaas'); const { getLibraryCodeV1 } = require('./customTransforrmationsStore-v1'); +const HASH_SECRET = process.env.OPENFAAS_FN_HASH_SECRET || ''; const libVersionIdsCache = new NodeCache(); -function generateFunctionName(userTransformation, libraryVersionIds, testMode) { +function generateFunctionName(userTransformation, libraryVersionIds, testMode, hashSecret = '') { if (userTransformation.versionId === FAAS_AST_VID) return FAAS_AST_FN_NAME; if (testMode) { @@ -21,10 +22,15 @@ function generateFunctionName(userTransformation, libraryVersionIds, testMode) { return funcName.substring(0, 63).toLowerCase(); } - const ids = [userTransformation.workspaceId, userTransformation.versionId].concat( + let ids = [userTransformation.workspaceId, userTransformation.versionId].concat( (libraryVersionIds || []).sort(), ); + if (hashSecret !== '') { + ids = ids.concat([hashSecret]); + } + + // FIXME: Why the id's are sorted ?! const hash = crypto.createHash('md5').update(`${ids}`).digest('hex'); return `fn-${userTransformation.workspaceId}-${hash}`.substring(0, 63).toLowerCase(); } @@ -90,7 +96,13 @@ async function setOpenFaasUserTransform( testMode, }; const functionName = - pregeneratedFnName || generateFunctionName(userTransformation, libraryVersionIds, testMode); + pregeneratedFnName || + generateFunctionName( + userTransformation, + libraryVersionIds, + testMode, + process.env.OPENFAAS_FN_HASH_SECRET, + ); const setupTime = new Date(); await setupFaasFunction( @@ -130,7 +142,13 @@ async function runOpenFaasUserTransform( const trMetadata = events[0].metadata ? getTransformationMetadata(events[0].metadata) : {}; // check and deploy faas function if not exists - const functionName = generateFunctionName(userTransformation, libraryVersionIds, testMode); + const functionName = generateFunctionName( + userTransformation, + libraryVersionIds, + testMode, + process.env.OPENFAAS_FN_HASH_SECRET, + ); + if (testMode) { await setOpenFaasUserTransform( userTransformation, diff --git a/src/util/openfaas/faasApi.js b/src/util/openfaas/faasApi.js index f8f830f6e4..7edbca414b 100644 --- a/src/util/openfaas/faasApi.js +++ b/src/util/openfaas/faasApi.js @@ -1,6 +1,8 @@ const axios = require('axios'); const { RespStatusError, RetryRequestError } = require('../utils'); +const logger = require('../../logger'); + const OPENFAAS_GATEWAY_URL = process.env.OPENFAAS_GATEWAY_URL || 'http://localhost:8080'; const OPENFAAS_GATEWAY_USERNAME = process.env.OPENFAAS_GATEWAY_USERNAME || ''; const OPENFAAS_GATEWAY_PASSWORD = process.env.OPENFAAS_GATEWAY_PASSWORD || ''; @@ -61,6 +63,8 @@ const invokeFunction = async (functionName, payload) => }); const checkFunctionHealth = async (functionName) => { + logger.debug(`Checking function health: ${functionName}`); + return new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`; axios @@ -76,8 +80,10 @@ const checkFunctionHealth = async (functionName) => { }); }; -const deployFunction = async (payload) => - new Promise((resolve, reject) => { +const deployFunction = async (payload) => { + logger.debug(`Deploying function: ${payload?.name}`); + + return new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/system/functions`; axios .post(url, payload, { auth: basicAuth }) @@ -86,6 +92,21 @@ const deployFunction = async (payload) => reject(parseAxiosError(err)); }); }); +}; + +const updateFunction = async (fnName, payload) => { + logger.warn(`Updating function: ${fnName}`); + + return new Promise((resolve, reject) => { + const url = `${OPENFAAS_GATEWAY_URL}/system/functions`; + axios + .put(url, payload, { auth: basicAuth }) + .then((resp) => resolve(resp.data)) + .catch((err) => { + reject(parseAxiosError(err)); + }); + }); +}; module.exports = { deleteFunction, @@ -94,4 +115,5 @@ module.exports = { getFunctionList, invokeFunction, checkFunctionHealth, + updateFunction, }; diff --git a/src/util/openfaas/index.js b/src/util/openfaas/index.js index 7a1fce3cfa..6a175fd3cc 100644 --- a/src/util/openfaas/index.js +++ b/src/util/openfaas/index.js @@ -4,6 +4,7 @@ const { deployFunction, invokeFunction, checkFunctionHealth, + updateFunction, } = require('./faasApi'); const logger = require('../../logger'); const { RetryRequestError, RespStatusError } = require('../utils'); @@ -67,6 +68,8 @@ const awaitFunctionReadiness = async ( maxWaitInMs = 22000, waitBetweenIntervalsInMs = 250, ) => { + logger.debug(`Awaiting function readiness: ${functionName}`); + const executionPromise = new Promise(async (resolve) => { try { await callWithRetry( @@ -130,73 +133,17 @@ const deployFaasFunction = async ( trMetadata = {}, ) => { try { - logger.debug(`[Faas] Deploying a faas function: ${functionName}`); - let envProcess = 'python index.py'; - - const lvidsString = libraryVersionIDs.join(','); - - if (!testMode) { - envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; - } else { - envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; - } - - const envVars = {}; - if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') { - envVars.max_inflight = FAAS_MAX_INFLIGHT; - envVars.exec_timeout = FAAS_EXEC_TIMEOUT; - } - if (GEOLOCATION_URL) { - envVars.geolocation_url = GEOLOCATION_URL; - } - // labels - const labels = { - 'openfaas-fn': 'true', - 'parent-component': 'openfaas', - 'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT, - 'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT, - 'com.openfaas.scale.zero': FAAS_SCALE_ZERO, - 'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION, - 'com.openfaas.scale.target': FAAS_SCALE_TARGET, - 'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION, - 'com.openfaas.scale.type': FAAS_SCALE_TYPE, - transformationId: trMetadata.transformationId, - workspaceId: trMetadata.workspaceId, - team: 'data-management', - service: 'openfaas-fn', - customer: 'shared', - 'customer-tier': CUSTOMER_TIER, - }; - if ( - trMetadata.workspaceId && - customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId) - ) { - labels['custom-network-policy'] = 'true'; - } - - // TODO: investigate and add more required labels and annotations - const payload = { - service: functionName, - name: functionName, - image: FAAS_BASE_IMG, - envProcess, - envVars, - labels, - annotations: { - 'prometheus.io.scrape': 'true', - }, - limits: { - memory: FAAS_LIMITS_MEMORY, - cpu: FAAS_LIMITS_CPU, - }, - requests: { - memory: FAAS_REQUESTS_MEMORY, - cpu: FAAS_REQUESTS_CPU, - }, - }; + logger.debug(`Deploying faas fn: ${functionName}`); + const payload = buildOpenfaasFn( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata, + ); await deployFunction(payload); - logger.debug('[Faas] Deployed a faas function'); } catch (error) { logger.error(`[Faas] Error while deploying ${functionName}: ${error.message}`); // To handle concurrent create requests, @@ -246,6 +193,103 @@ async function setupFaasFunction( } } +// reconcileFn runs everytime the service boot's up +// trying to update the functions which are not in cache to the +// latest label and envVars +const reconcileFn = async (name, versionId, libraryVersionIDs, trMetadata) => { + logger.warn(`Reconciling faas function: ${name}`); + + try { + if (isFunctionDeployed(name)) { + return; + } + + await updateFunction( + name, + buildOpenfaasFn(name, null, versionId, libraryVersionIDs, false, trMetadata), + ); + // if the function is successfully updated, then + // simply set the function in cache. + setFunctionInCache(name); + } catch (error) { + if (error.statusCode !== 404) { + logger.error( + `unexpected error occurred when reconciling the function ${name}: ${error.message}`, + ); + // FIXME: We need to limit use of retryable errors which + // convert to 809's and choke the pipeline. + throw new RespStatusError(error.message, 500); + } + } +}; + +// buildOpenfaasFn is helper function to build openfaas fn CRUD payload +function buildOpenfaasFn(name, code, versionId, libraryVersionIDs, testMode, trMetadata = {}) { + logger.debug(`Building faas fn: ${name}`); + + let envProcess = 'python index.py'; + const lvidsString = libraryVersionIDs.join(','); + + if (!testMode) { + envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; + } else { + envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; + } + + const envVars = {}; + + if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') { + envVars.max_inflight = FAAS_MAX_INFLIGHT; + envVars.exec_timeout = FAAS_EXEC_TIMEOUT; + } + + if (GEOLOCATION_URL) { + envVars.geolocation_url = GEOLOCATION_URL; + } + + const labels = { + 'openfaas-fn': 'true', + 'parent-component': 'openfaas', + 'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT, + 'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT, + 'com.openfaas.scale.zero': FAAS_SCALE_ZERO, + 'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION, + 'com.openfaas.scale.target': FAAS_SCALE_TARGET, + 'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION, + 'com.openfaas.scale.type': FAAS_SCALE_TYPE, + transformationId: trMetadata.transformationId, + workspaceId: trMetadata.workspaceId, + team: 'data-management', + service: 'openfaas-fn', + customer: 'shared', + 'customer-tier': CUSTOMER_TIER, + }; + + if (trMetadata.workspaceId && customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)) { + labels['custom-network-policy'] = 'true'; + } + + return { + service: name, + name: name, + image: FAAS_BASE_IMG, + envProcess, + envVars, + labels, + annotations: { + 'prometheus.io.scrape': 'true', + }, + limits: { + memory: FAAS_LIMITS_MEMORY, + cpu: FAAS_LIMITS_CPU, + }, + requests: { + memory: FAAS_REQUESTS_MEMORY, + cpu: FAAS_REQUESTS_CPU, + }, + }; +} + const executeFaasFunction = async ( name, events, @@ -260,7 +304,11 @@ const executeFaasFunction = async ( let errorRaised; try { - if (testMode) await awaitFunctionReadiness(name); + if (testMode) { + await awaitFunctionReadiness(name); + } else { + await reconcileFn(name, versionId, libraryVersionIDs, trMetadata); + } return await invokeFunction(name, events); } catch (error) { logger.error(`Error while invoking ${name}: ${error.message}`); @@ -268,6 +316,7 @@ const executeFaasFunction = async ( if (error.statusCode === 404 && error.message.includes(`error finding function ${name}`)) { removeFunctionFromCache(name); + await setupFaasFunction(name, null, versionId, libraryVersionIDs, testMode, trMetadata); throw new RetryRequestError(`${name} not found`); } @@ -313,6 +362,7 @@ module.exports = { executeFaasFunction, setupFaasFunction, invalidateFnCache, + buildOpenfaasFn, FAAS_AST_VID, FAAS_AST_FN_NAME, }; diff --git a/test/__tests__/user_transformation.test.js b/test/__tests__/user_transformation.test.js index 924bf4f791..c4f4a8792a 100644 --- a/test/__tests__/user_transformation.test.js +++ b/test/__tests__/user_transformation.test.js @@ -7,9 +7,11 @@ jest.mock("axios", () => ({ ...jest.requireActual("axios"), get: jest.fn(), post: jest.fn(), - delete: jest.fn() + delete: jest.fn(), + put: jest.fn() })); +const { generateFunctionName } = require('../../src/util/customTransformer-faas.js'); const { Response, Headers } = jest.requireActual("node-fetch"); const lodashCore = require("lodash/core"); const _ = require("lodash"); @@ -35,6 +37,7 @@ const { } = require("../../src/util/customTransformer"); const { parserForImport } = require("../../src/util/parser"); const { RetryRequestError, RespStatusError } = require("../../src/util/utils"); +const { buildOpenfaasFn } = require("../../src/util/openfaas/index"); const OPENFAAS_GATEWAY_URL = "http://localhost:8080"; const defaultBasicAuth = { @@ -88,8 +91,12 @@ const pyLibCode = (name, versionId) => { } } -const pyfaasFuncName = (workspaceId, versionId, libraryVersionIds=[]) => { - const ids = [workspaceId, versionId].concat(libraryVersionIds.sort()); +const pyfaasFuncName = (workspaceId, versionId, libraryVersionIds=[], hashSecret="") => { + let ids = [workspaceId, versionId].concat(libraryVersionIds.sort()); + if (hashSecret !== "") { + ids = ids.concat([hashSecret]); + } + const hash = crypto.createHash('md5').update(`${ids}`).digest('hex'); return `fn-${workspaceId}-${hash}` @@ -105,6 +112,19 @@ const getfetchResponse = (resp, url) => let importNameLibraryVersionIdsMap; +describe("User transformation utils", () => { + + it("generates the openfaas-fn name correctly", () => { + const fnName = generateFunctionName( + {workspaceId: 'workspaceId', transformationId: 'transformationId'}, + [], + false, + 'hash-secret'); + expect(fnName).toEqual('fn-workspaceid-34a32ade07ebbc7bc5ea795b8200de9f'); + }); + +}); + describe("User transformation", () => { beforeEach(() => { jest.resetAllMocks(); @@ -1620,11 +1640,18 @@ describe("Python transformations", () => { json: jest.fn().mockResolvedValue(respBody) }); + axios.put.mockResolvedValue({}); axios.post.mockResolvedValue({ data: { transformedEvents: outputData } }); const output = await userTransformHandler(inputData, versionId, []); expect(output).toEqual(outputData); + + expect(axios.put).toHaveBeenCalledTimes(1); + expect(axios.put).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/system/functions`, + buildOpenfaasFn(funcName, null, versionId, [], false, {}), + { auth: defaultBasicAuth }); expect(axios.post).toHaveBeenCalledTimes(1); expect(axios.post).toHaveBeenCalledWith( `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, @@ -1648,6 +1675,11 @@ describe("Python transformations", () => { json: jest.fn().mockResolvedValue(respBody) }); + + axios.put.mockRejectedValueOnce({ + response: { status: 404, data: `deployment not found`} + }); + axios.post .mockRejectedValueOnce({ response: { status: 404, data: `error finding function ${funcName}` } // invoke function not found @@ -1659,6 +1691,12 @@ describe("Python transformations", () => { await userTransformHandler(inputData, versionId, []); }).rejects.toThrow(RetryRequestError); + expect(axios.put).toHaveBeenCalledTimes(1); + expect(axios.put).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/system/functions`, + buildOpenfaasFn(funcName, null, versionId, [], false, {}), + { auth: defaultBasicAuth }, + ); expect(axios.post).toHaveBeenCalledTimes(2); expect(axios.post).toHaveBeenCalledWith( `${OPENFAAS_GATEWAY_URL}/function/${funcName}`,