diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c04835cd4..90a2152b90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,34 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +## [1.68.0](https://github.com/rudderlabs/rudder-transformer/compare/v1.67.0...v1.68.0) (2024-05-27) + + +### Features + +* add json-data type support in redis ([#3336](https://github.com/rudderlabs/rudder-transformer/issues/3336)) ([0196f20](https://github.com/rudderlabs/rudder-transformer/commit/0196f20cc79e1f470d96a649dd9404c3c9284329)) +* facebook custom audience app secret support ([#3357](https://github.com/rudderlabs/rudder-transformer/issues/3357)) ([fce4ef9](https://github.com/rudderlabs/rudder-transformer/commit/fce4ef973500411c7ad812e7949bb1b73dabc3ba)) +* filtering unknown events in awin ([#3392](https://github.com/rudderlabs/rudder-transformer/issues/3392)) ([d842da8](https://github.com/rudderlabs/rudder-transformer/commit/d842da87a34cb63023eba288e0c5258e29997dcf)) +* **ga4:** component test refactor ([#3220](https://github.com/rudderlabs/rudder-transformer/issues/3220)) ([3ff9a5e](https://github.com/rudderlabs/rudder-transformer/commit/3ff9a5e8e955b929a1b04a89dcf0ccbc49e18648)) +* **integrations/auth0:** include Auth0 event type in Rudderstack message ([#3370](https://github.com/rudderlabs/rudder-transformer/issues/3370)) ([e9409fd](https://github.com/rudderlabs/rudder-transformer/commit/e9409fde6063d7eaa8558396b85b5fdf99f964e1)) +* onboard koddi destination ([88b2d57](https://github.com/rudderlabs/rudder-transformer/commit/88b2d5709da00445ffae54f5a36de855cb5f8479)) +* onboard koddi destination ([#3359](https://github.com/rudderlabs/rudder-transformer/issues/3359)) ([f74c4a0](https://github.com/rudderlabs/rudder-transformer/commit/f74c4a0bc92ae6ccb0c00ac5b21745e496a015bc)) +* onboarding adjust source ([#3395](https://github.com/rudderlabs/rudder-transformer/issues/3395)) ([668d331](https://github.com/rudderlabs/rudder-transformer/commit/668d3311aadacbb92b1873bf43919db7d341afbb)) + + +### Bug Fixes + +* added componenet test ([189cf93](https://github.com/rudderlabs/rudder-transformer/commit/189cf9367a907dc1848257733e13713245458579)) +* added conversions bidders validation and improved implementation ([ddf8d46](https://github.com/rudderlabs/rudder-transformer/commit/ddf8d46fed980204c561f95daa12fc740302e6e3)) +* config ([847e3e0](https://github.com/rudderlabs/rudder-transformer/commit/847e3e04d3ef67b9a7b5e35127251f3fc34ba3bf)) +* fb custom audience html response ([#3402](https://github.com/rudderlabs/rudder-transformer/issues/3402)) ([d1a2bd6](https://github.com/rudderlabs/rudder-transformer/commit/d1a2bd61468c75f944135cf61cbf2464f08404ed)) +* fixed some issue and added unit test ([bc7970c](https://github.com/rudderlabs/rudder-transformer/commit/bc7970c4f0a70e9fe8ad06ffd92f8f4b2a4ec910)) +* fixed unit test issue ([d4a82e2](https://github.com/rudderlabs/rudder-transformer/commit/d4a82e2d2df7ee86c4b149bca7e0c12be3a6a545)) +* minor mapping issue in conversions ([31e6460](https://github.com/rudderlabs/rudder-transformer/commit/31e6460ccc0c18014ebf67eab23b59abe5d81ef6)) +* resolving comments ([7c0d963](https://github.com/rudderlabs/rudder-transformer/commit/7c0d963d3ee87a3ed5712492300dc50768c529de)) +* standardise hashing for all CAPI integrations ([#3379](https://github.com/rudderlabs/rudder-transformer/issues/3379)) ([c249a69](https://github.com/rudderlabs/rudder-transformer/commit/c249a694d735f6d241a35b6e21f493c54890ac84)) +* tiktok_v2 remove default value for content-type for custom events ([#3383](https://github.com/rudderlabs/rudder-transformer/issues/3383)) ([6e7b5a0](https://github.com/rudderlabs/rudder-transformer/commit/6e7b5a0d8bf2c859dfb15b9cad7ed6070bd0892b)) + ## [1.67.0](https://github.com/rudderlabs/rudder-transformer/compare/v1.66.1...v1.67.0) (2024-05-23) diff --git a/package-lock.json b/package-lock.json index bfeb00963a..b8e2c81a29 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "rudder-transformer", - "version": "1.67.0", + "version": "1.68.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "rudder-transformer", - "version": "1.67.0", + "version": "1.68.0", "license": "ISC", "dependencies": { "@amplitude/ua-parser-js": "0.7.24", diff --git a/package.json b/package.json index 7fa3a7330c..ed15683d4f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rudder-transformer", - "version": "1.67.0", + "version": "1.68.0", "description": "", "homepage": "https://github.com/rudderlabs/rudder-transformer#readme", "bugs": { diff --git a/src/util/customTransformer-faas.js b/src/util/customTransformer-faas.js index 2c0bbfd8c0..9ac9804097 100644 --- a/src/util/customTransformer-faas.js +++ b/src/util/customTransformer-faas.js @@ -11,9 +11,10 @@ const { } = require('./openfaas'); const { getLibraryCodeV1 } = require('./customTransforrmationsStore-v1'); +const HASH_SECRET = process.env.OPENFAAS_FN_HASH_SECRET || ''; const libVersionIdsCache = new NodeCache(); -function generateFunctionName(userTransformation, libraryVersionIds, testMode) { +function generateFunctionName(userTransformation, libraryVersionIds, testMode, hashSecret = '') { if (userTransformation.versionId === FAAS_AST_VID) return FAAS_AST_FN_NAME; if (testMode) { @@ -21,10 +22,15 @@ function generateFunctionName(userTransformation, libraryVersionIds, testMode) { return funcName.substring(0, 63).toLowerCase(); } - const ids = [userTransformation.workspaceId, userTransformation.versionId].concat( + let ids = [userTransformation.workspaceId, userTransformation.versionId].concat( (libraryVersionIds || []).sort(), ); + if (hashSecret !== '') { + ids = ids.concat([hashSecret]); + } + + // FIXME: Why the id's are sorted ?! const hash = crypto.createHash('md5').update(`${ids}`).digest('hex'); return `fn-${userTransformation.workspaceId}-${hash}`.substring(0, 63).toLowerCase(); } @@ -90,7 +96,13 @@ async function setOpenFaasUserTransform( testMode, }; const functionName = - pregeneratedFnName || generateFunctionName(userTransformation, libraryVersionIds, testMode); + pregeneratedFnName || + generateFunctionName( + userTransformation, + libraryVersionIds, + testMode, + process.env.OPENFAAS_FN_HASH_SECRET, + ); const setupTime = new Date(); await setupFaasFunction( @@ -130,7 +142,13 @@ async function runOpenFaasUserTransform( const trMetadata = events[0].metadata ? getTransformationMetadata(events[0].metadata) : {}; // check and deploy faas function if not exists - const functionName = generateFunctionName(userTransformation, libraryVersionIds, testMode); + const functionName = generateFunctionName( + userTransformation, + libraryVersionIds, + testMode, + process.env.OPENFAAS_FN_HASH_SECRET, + ); + if (testMode) { await setOpenFaasUserTransform( userTransformation, diff --git a/src/util/openfaas/faasApi.js b/src/util/openfaas/faasApi.js index f8f830f6e4..b932b70032 100644 --- a/src/util/openfaas/faasApi.js +++ b/src/util/openfaas/faasApi.js @@ -1,6 +1,8 @@ const axios = require('axios'); const { RespStatusError, RetryRequestError } = require('../utils'); +const logger = require('../../logger'); + const OPENFAAS_GATEWAY_URL = process.env.OPENFAAS_GATEWAY_URL || 'http://localhost:8080'; const OPENFAAS_GATEWAY_USERNAME = process.env.OPENFAAS_GATEWAY_USERNAME || ''; const OPENFAAS_GATEWAY_PASSWORD = process.env.OPENFAAS_GATEWAY_PASSWORD || ''; @@ -12,7 +14,7 @@ const basicAuth = { const parseAxiosError = (error) => { if (error.response) { - const status = error.response.status || 400; + const status = error.response.status || 500; const errorData = error.response?.data; const message = (errorData && (errorData.message || errorData.error || errorData)) || error.message; @@ -61,6 +63,8 @@ const invokeFunction = async (functionName, payload) => }); const checkFunctionHealth = async (functionName) => { + logger.debug(`Checking function health: ${functionName}`); + return new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`; axios @@ -76,8 +80,10 @@ const checkFunctionHealth = async (functionName) => { }); }; -const deployFunction = async (payload) => - new Promise((resolve, reject) => { +const deployFunction = async (payload) => { + logger.debug(`Deploying function: ${payload?.name}`); + + return new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/system/functions`; axios .post(url, payload, { auth: basicAuth }) @@ -86,6 +92,21 @@ const deployFunction = async (payload) => reject(parseAxiosError(err)); }); }); +}; + +const updateFunction = async (fnName, payload) => { + logger.debug(`Updating function: ${fnName}`); + + return new Promise((resolve, reject) => { + const url = `${OPENFAAS_GATEWAY_URL}/system/functions`; + axios + .put(url, payload, { auth: basicAuth }) + .then((resp) => resolve(resp.data)) + .catch((err) => { + reject(parseAxiosError(err)); + }); + }); +}; module.exports = { deleteFunction, @@ -94,4 +115,5 @@ module.exports = { getFunctionList, invokeFunction, checkFunctionHealth, + updateFunction, }; diff --git a/src/util/openfaas/index.js b/src/util/openfaas/index.js index 3cf3525e6f..c0369deb81 100644 --- a/src/util/openfaas/index.js +++ b/src/util/openfaas/index.js @@ -4,6 +4,7 @@ const { deployFunction, invokeFunction, checkFunctionHealth, + updateFunction, } = require('./faasApi'); const logger = require('../../logger'); const { RetryRequestError, RespStatusError } = require('../utils'); @@ -33,6 +34,7 @@ const FAAS_AST_FN_NAME = 'fn-ast'; const CUSTOM_NETWORK_POLICY_WORKSPACE_IDS = process.env.CUSTOM_NETWORK_POLICY_WORKSPACE_IDS || ''; const customNetworkPolicyWorkspaceIds = CUSTOM_NETWORK_POLICY_WORKSPACE_IDS.split(','); const CUSTOMER_TIER = process.env.CUSTOMER_TIER || 'shared'; +const DISABLE_RECONCILE_FN = process.env.DISABLE_RECONCILE_FN == 'true' || false; // Initialise node cache const functionListCache = new NodeCache(); @@ -67,6 +69,8 @@ const awaitFunctionReadiness = async ( maxWaitInMs = 22000, waitBetweenIntervalsInMs = 250, ) => { + logger.debug(`Awaiting function readiness: ${functionName}`); + const executionPromise = new Promise(async (resolve) => { try { await callWithRetry( @@ -121,7 +125,7 @@ const invalidateFnCache = () => { functionListCache.set(FUNC_LIST_KEY, []); }; -const deployFaasFunction = async ( +const updateFaasFunction = async ( functionName, code, versionId, @@ -130,73 +134,50 @@ const deployFaasFunction = async ( trMetadata = {}, ) => { try { - logger.debug(`[Faas] Deploying a faas function: ${functionName}`); - let envProcess = 'python index.py'; - - const lvidsString = libraryVersionIDs.join(','); + logger.debug(`Updating faas fn: ${functionName}`); - if (!testMode) { - envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; - } else { - envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; - } - - const envVars = {}; - if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') { - envVars.max_inflight = FAAS_MAX_INFLIGHT; - envVars.exec_timeout = FAAS_EXEC_TIMEOUT; - } - if (GEOLOCATION_URL) { - envVars.geolocation_url = GEOLOCATION_URL; - } - // labels - const labels = { - 'openfaas-fn': 'true', - 'parent-component': 'openfaas', - 'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT, - 'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT, - 'com.openfaas.scale.zero': FAAS_SCALE_ZERO, - 'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION, - 'com.openfaas.scale.target': FAAS_SCALE_TARGET, - 'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION, - 'com.openfaas.scale.type': FAAS_SCALE_TYPE, - transformationId: trMetadata.transformationId, - workspaceId: trMetadata.workspaceId, - team: 'data-management', - service: 'openfaas-fn', - customer: 'shared', - 'customer-tier': CUSTOMER_TIER, - }; - if ( - trMetadata.workspaceId && - customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId) - ) { - labels['custom-network-policy'] = 'true'; + const payload = buildOpenfaasFn( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata, + ); + await updateFunction(functionName, payload); + // wait for function to be ready and then set it in cache + await awaitFunctionReadiness(functionName); + setFunctionInCache(functionName); + } catch (error) { + // 404 is statuscode returned from openfaas community edition + // when the function don't exist, so we can safely ignore this error + // and let the function be created in the next step. + if (error.statusCode !== 404) { + throw error; } + } +}; - // TODO: investigate and add more required labels and annotations - const payload = { - service: functionName, - name: functionName, - image: FAAS_BASE_IMG, - envProcess, - envVars, - labels, - annotations: { - 'prometheus.io.scrape': 'true', - }, - limits: { - memory: FAAS_LIMITS_MEMORY, - cpu: FAAS_LIMITS_CPU, - }, - requests: { - memory: FAAS_REQUESTS_MEMORY, - cpu: FAAS_REQUESTS_CPU, - }, - }; +const deployFaasFunction = async ( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata = {}, +) => { + try { + logger.debug(`Deploying faas fn: ${functionName}`); + const payload = buildOpenfaasFn( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata, + ); await deployFunction(payload); - logger.debug('[Faas] Deployed a faas function'); } catch (error) { logger.error(`[Faas] Error while deploying ${functionName}: ${error.message}`); // To handle concurrent create requests, @@ -246,6 +227,95 @@ async function setupFaasFunction( } } +// reconcileFn runs everytime the service boot's up +// trying to update the functions which are not in cache to the +// latest label and envVars +const reconcileFn = async (name, versionId, libraryVersionIDs, trMetadata) => { + if (DISABLE_RECONCILE_FN) { + return; + } + + logger.debug(`Reconciling faas function: ${name}`); + try { + if (isFunctionDeployed(name)) { + return; + } + await updateFaasFunction(name, null, versionId, libraryVersionIDs, false, trMetadata); + } catch (error) { + logger.error( + `unexpected error occurred when reconciling the function ${name}: ${error.message}`, + ); + throw error; + } +}; + +// buildOpenfaasFn is helper function to build openfaas fn CRUD payload +function buildOpenfaasFn(name, code, versionId, libraryVersionIDs, testMode, trMetadata = {}) { + logger.debug(`Building faas fn: ${name}`); + + let envProcess = 'python index.py'; + const lvidsString = libraryVersionIDs.join(','); + + if (!testMode) { + envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; + } else { + envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; + } + + const envVars = {}; + + if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') { + envVars.max_inflight = FAAS_MAX_INFLIGHT; + envVars.exec_timeout = FAAS_EXEC_TIMEOUT; + } + + if (GEOLOCATION_URL) { + envVars.geolocation_url = GEOLOCATION_URL; + } + + const labels = { + 'openfaas-fn': 'true', + 'parent-component': 'openfaas', + 'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT, + 'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT, + 'com.openfaas.scale.zero': FAAS_SCALE_ZERO, + 'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION, + 'com.openfaas.scale.target': FAAS_SCALE_TARGET, + 'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION, + 'com.openfaas.scale.type': FAAS_SCALE_TYPE, + transformationId: trMetadata.transformationId, + workspaceId: trMetadata.workspaceId, + team: 'data-management', + service: 'openfaas-fn', + customer: 'shared', + 'customer-tier': CUSTOMER_TIER, + }; + + if (trMetadata.workspaceId && customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)) { + labels['custom-network-policy'] = 'true'; + } + + return { + service: name, + name: name, + image: FAAS_BASE_IMG, + envProcess, + envVars, + labels, + annotations: { + 'prometheus.io.scrape': 'true', + }, + limits: { + memory: FAAS_LIMITS_MEMORY, + cpu: FAAS_LIMITS_CPU, + }, + requests: { + memory: FAAS_REQUESTS_MEMORY, + cpu: FAAS_REQUESTS_CPU, + }, + }; +} + const executeFaasFunction = async ( name, events, @@ -260,7 +330,11 @@ const executeFaasFunction = async ( let errorRaised; try { - if (testMode) await awaitFunctionReadiness(name); + if (testMode) { + await awaitFunctionReadiness(name); + } else { + await reconcileFn(name, versionId, libraryVersionIDs, trMetadata); + } return await invokeFunction(name, events); } catch (error) { logger.error(`Error while invoking ${name}: ${error.message}`); @@ -268,6 +342,7 @@ const executeFaasFunction = async ( if (error.statusCode === 404 && error.message.includes(`error finding function ${name}`)) { removeFunctionFromCache(name); + await setupFaasFunction(name, null, versionId, libraryVersionIDs, testMode, trMetadata); throw new RetryRequestError(`${name} not found`); } @@ -314,6 +389,8 @@ module.exports = { executeFaasFunction, setupFaasFunction, invalidateFnCache, + buildOpenfaasFn, FAAS_AST_VID, FAAS_AST_FN_NAME, + setFunctionInCache, }; diff --git a/test/__tests__/user_transformation.test.js b/test/__tests__/user_transformation.test.js index 924bf4f791..ffb53de16b 100644 --- a/test/__tests__/user_transformation.test.js +++ b/test/__tests__/user_transformation.test.js @@ -7,9 +7,11 @@ jest.mock("axios", () => ({ ...jest.requireActual("axios"), get: jest.fn(), post: jest.fn(), - delete: jest.fn() + delete: jest.fn(), + put: jest.fn() })); +const { generateFunctionName } = require('../../src/util/customTransformer-faas.js'); const { Response, Headers } = jest.requireActual("node-fetch"); const lodashCore = require("lodash/core"); const _ = require("lodash"); @@ -35,6 +37,7 @@ const { } = require("../../src/util/customTransformer"); const { parserForImport } = require("../../src/util/parser"); const { RetryRequestError, RespStatusError } = require("../../src/util/utils"); +const { buildOpenfaasFn, setFunctionInCache, invalidateFnCache } = require("../../src/util/openfaas/index"); const OPENFAAS_GATEWAY_URL = "http://localhost:8080"; const defaultBasicAuth = { @@ -88,8 +91,12 @@ const pyLibCode = (name, versionId) => { } } -const pyfaasFuncName = (workspaceId, versionId, libraryVersionIds=[]) => { - const ids = [workspaceId, versionId].concat(libraryVersionIds.sort()); +const pyfaasFuncName = (workspaceId, versionId, libraryVersionIds=[], hashSecret="") => { + let ids = [workspaceId, versionId].concat(libraryVersionIds.sort()); + if (hashSecret !== "") { + ids = ids.concat([hashSecret]); + } + const hash = crypto.createHash('md5').update(`${ids}`).digest('hex'); return `fn-${workspaceId}-${hash}` @@ -105,6 +112,19 @@ const getfetchResponse = (resp, url) => let importNameLibraryVersionIdsMap; +describe("User transformation utils", () => { + + it("generates the openfaas-fn name correctly", () => { + const fnName = generateFunctionName( + {workspaceId: 'workspaceId', transformationId: 'transformationId'}, + [], + false, + 'hash-secret'); + expect(fnName).toEqual('fn-workspaceid-34a32ade07ebbc7bc5ea795b8200de9f'); + }); + +}); + describe("User transformation", () => { beforeEach(() => { jest.resetAllMocks(); @@ -1386,6 +1406,7 @@ describe("Geolocation function", () => { // Running tests for python transformations with openfaas mocks describe("Python transformations", () => { beforeEach(() => { + invalidateFnCache(); jest.resetAllMocks(); }); afterAll(() => {}); @@ -1421,6 +1442,7 @@ describe("Python transformations", () => { const expectedData = { success: true, publishedVersion: funcName }; + setFunctionInCache(funcName); const output = await setupUserTransformHandler([], trRevCode); expect(output).toEqual(expectedData); expect(axios.post).toHaveBeenCalledTimes(0); @@ -1604,7 +1626,7 @@ describe("Python transformations", () => { expect(axios.delete).toHaveBeenCalledTimes(1); }); - it("Simple transformation run - invokes faas function", async () => { + it("Simple transformation run with function in cache - invokes faas function", async () => { const inputData = require(`./data/${integration}_input.json`); const outputData = require(`./data/${integration}_output.json`); @@ -1612,6 +1634,8 @@ describe("Python transformations", () => { const respBody = pyTrRevCode(versionId); const funcName = pyfaasFuncName(respBody.workspaceId, versionId); + setFunctionInCache(funcName); + const transformerUrl = `https://api.rudderlabs.com/transformation/getByVersionId?versionId=${versionId}`; when(fetch) .calledWith(transformerUrl) @@ -1625,7 +1649,7 @@ describe("Python transformations", () => { const output = await userTransformHandler(inputData, versionId, []); expect(output).toEqual(outputData); - expect(axios.post).toHaveBeenCalledTimes(1); + expect(axios.post).toHaveBeenCalledWith( `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, inputData, @@ -1633,12 +1657,16 @@ describe("Python transformations", () => { ); }); - it("Simple transformation run - function not found", async () => { + + it("Simple transformation run with clean cache - reconciles fn with 200OK and then invokes faas function", async () => { + const inputData = require(`./data/${integration}_input.json`); + const outputData = require(`./data/${integration}_output.json`); const versionId = randomID(); const respBody = pyTrRevCode(versionId); - const funcName = pyfaasFuncName(respBody.workspaceId, respBody.versionId); + const funcName = pyfaasFuncName(respBody.workspaceId, versionId); + const transformerUrl = `https://api.rudderlabs.com/transformation/getByVersionId?versionId=${versionId}`; when(fetch) @@ -1648,28 +1676,13 @@ describe("Python transformations", () => { json: jest.fn().mockResolvedValue(respBody) }); - axios.post - .mockRejectedValueOnce({ - response: { status: 404, data: `error finding function ${funcName}` } // invoke function not found - }) - .mockResolvedValueOnce({}); // create function + axios.put.mockResolvedValue({}); axios.get.mockResolvedValue({}); // awaitFunctionReadiness() + axios.post.mockResolvedValue({ data: { transformedEvents: outputData } }); - await expect(async () => { - await userTransformHandler(inputData, versionId, []); - }).rejects.toThrow(RetryRequestError); + const output = await userTransformHandler(inputData, versionId, []); + expect(output).toEqual(outputData); - expect(axios.post).toHaveBeenCalledTimes(2); - expect(axios.post).toHaveBeenCalledWith( - `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, - inputData, - { auth: defaultBasicAuth }, - ); - expect(axios.post).toHaveBeenCalledWith( - `${OPENFAAS_GATEWAY_URL}/system/functions`, - expect.objectContaining({ name: funcName, service: funcName }), - { auth: defaultBasicAuth }, - ); expect(axios.get).toHaveBeenCalledTimes(1); expect(axios.get).toHaveBeenCalledWith( @@ -1677,6 +1690,154 @@ describe("Python transformations", () => { {"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}}, { auth: defaultBasicAuth }, ); + expect(axios.put).toHaveBeenCalledTimes(1); + expect(axios.put).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/system/functions`, + buildOpenfaasFn(funcName, null, versionId, [], false, {}), + { auth: defaultBasicAuth }); + expect(axios.post).toHaveBeenCalledTimes(1); + expect(axios.post).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, + inputData, + { auth: defaultBasicAuth }, + ); + }); + + describe("Simple transformation run with clean cache - function not found", () => { + + it('eventually sets up the function on 404 from update and then invokes it', async () => { + const inputData = require(`./data/${integration}_input.json`); + + const versionId = randomID(); + const respBody = pyTrRevCode(versionId); + const funcName = pyfaasFuncName(respBody.workspaceId, respBody.versionId); + + const transformerUrl = `https://api.rudderlabs.com/transformation/getByVersionId?versionId=${versionId}`; + when(fetch) + .calledWith(transformerUrl) + .mockResolvedValue({ + status: 200, + json: jest.fn().mockResolvedValue(respBody) + }); + + + axios.put.mockRejectedValueOnce({ + response: { status: 404, data: `deployment not found`} + }); + + axios.post + .mockRejectedValueOnce({ + response: { status: 404, data: `error finding function ${funcName}` } // invoke function not found + }) + .mockResolvedValueOnce({}); // create function + axios.get.mockResolvedValue({}); // awaitFunctionReadiness() + + await expect(async () => { + await userTransformHandler(inputData, versionId, []); + }).rejects.toThrow(RetryRequestError); + + expect(axios.put).toHaveBeenCalledTimes(1); + expect(axios.put).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/system/functions`, + buildOpenfaasFn(funcName, null, versionId, [], false, {}), + { auth: defaultBasicAuth }, + ); + expect(axios.post).toHaveBeenCalledTimes(2); + expect(axios.post).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, + inputData, + { auth: defaultBasicAuth }, + ); + expect(axios.post).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/system/functions`, + expect.objectContaining({ name: funcName, service: funcName }), + { auth: defaultBasicAuth }, + ); + + expect(axios.get).toHaveBeenCalledTimes(1); + expect(axios.get).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, + {"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}}, + { auth: defaultBasicAuth }, + ); + }); + + it('sets up the function on 202 from update and then invokes it', async() => { + const inputData = require(`./data/${integration}_input.json`); + const outputData = require(`./data/${integration}_output.json`); + + const versionId = randomID(); + const respBody = pyTrRevCode(versionId); + const funcName = pyfaasFuncName(respBody.workspaceId, respBody.versionId); + + const transformerUrl = `https://api.rudderlabs.com/transformation/getByVersionId?versionId=${versionId}`; + when(fetch) + .calledWith(transformerUrl) + .mockResolvedValue({ + status: 200, + json: jest.fn().mockResolvedValue(respBody) + }); + + + axios.put.mockResolvedValueOnce({ + response: { status: 202, data: `deployment created`} + }); + axios.get.mockResolvedValue({}); // awaitFunctionReadiness() + axios.post.mockResolvedValue({ data: { transformedEvents: outputData } }); + + const output = await userTransformHandler(inputData, versionId, []); + expect(output).toEqual(outputData); + + expect(axios.put).toHaveBeenCalledTimes(1); + expect(axios.put).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/system/functions`, + buildOpenfaasFn(funcName, null, versionId, [], false, {}), + { auth: defaultBasicAuth }, + ); + expect(axios.post).toHaveBeenCalledTimes(1); + expect(axios.post).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, + inputData, + { auth: defaultBasicAuth }, + ); + expect(axios.get).toHaveBeenCalledTimes(1); + expect(axios.get).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/function/${funcName}`, + {"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}}, + { auth: defaultBasicAuth }, + ); + }); + + it('throws from the userTransform handler when reconciles errors with anything other than 404', async() => { + const inputData = require(`./data/${integration}_input.json`); + const outputData = require(`./data/${integration}_output.json`); + + const versionId = randomID(); + const respBody = pyTrRevCode(versionId); + const funcName = pyfaasFuncName(respBody.workspaceId, respBody.versionId); + + const transformerUrl = `https://api.rudderlabs.com/transformation/getByVersionId?versionId=${versionId}`; + when(fetch) + .calledWith(transformerUrl) + .mockResolvedValue({ + status: 200, + json: jest.fn().mockResolvedValue(respBody) + }); + + + axios.put.mockRejectedValueOnce({response: {status: 400, data: 'bad request'}}); + await expect(async () => { + await userTransformHandler(inputData, versionId, []); + }).rejects.toThrow(RespStatusError); + + expect(axios.put).toHaveBeenCalledTimes(1); + expect(axios.put).toHaveBeenCalledWith( + `${OPENFAAS_GATEWAY_URL}/system/functions`, + buildOpenfaasFn(funcName, null, versionId, [], false, {}), + { auth: defaultBasicAuth }, + ); + }); + }); it("Simple transformation run - error requests", async () => { @@ -1694,6 +1855,8 @@ describe("Python transformations", () => { json: jest.fn().mockResolvedValue(respBody) }); + setFunctionInCache(funcName); + axios.post .mockRejectedValueOnce({ response: { status: 429, data: `Rate limit exceeded` } // invoke function with rate limit