Skip to content

Commit

Permalink
chore: added reconcile fn for openfaas fn pods
Browse files Browse the repository at this point in the history
  • Loading branch information
abhimanyubabbar committed Apr 24, 2024
1 parent bac3cc5 commit 584d81c
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 69 deletions.
26 changes: 24 additions & 2 deletions src/util/openfaas/faasApi.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const axios = require('axios');
const { RespStatusError, RetryRequestError } = require('../utils');

const logger = require('../../logger');

const OPENFAAS_GATEWAY_URL = process.env.OPENFAAS_GATEWAY_URL || 'http://localhost:8080';
const OPENFAAS_GATEWAY_USERNAME = process.env.OPENFAAS_GATEWAY_USERNAME || '';
const OPENFAAS_GATEWAY_PASSWORD = process.env.OPENFAAS_GATEWAY_PASSWORD || '';
Expand Down Expand Up @@ -61,6 +63,8 @@ const invokeFunction = async (functionName, payload) =>
});

const checkFunctionHealth = async (functionName) => {
logger.debug(`Checking function health: ${functionName}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`;
axios
Expand All @@ -76,8 +80,10 @@ const checkFunctionHealth = async (functionName) => {
});
};

const deployFunction = async (payload) =>
new Promise((resolve, reject) => {
const deployFunction = async (payload) => {
logger.debug(`Deploying function: ${payload?.name}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/functions`;
axios
.post(url, payload, { auth: basicAuth })
Expand All @@ -86,6 +92,21 @@ const deployFunction = async (payload) =>
reject(parseAxiosError(err));
});
});
};

const updateFunction = async (fnName, payload) => {
logger.warn(`Updating function: ${fnName}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/functions`;
axios
.put(url, payload, { auth: basicAuth })
.then((resp) => resolve(resp.data))
.catch((err) => {
reject(parseAxiosError(err));
});
});
};

module.exports = {
deleteFunction,
Expand All @@ -94,4 +115,5 @@ module.exports = {
getFunctionList,
invokeFunction,
checkFunctionHealth,
updateFunction,
};
182 changes: 116 additions & 66 deletions src/util/openfaas/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const {
deployFunction,
invokeFunction,
checkFunctionHealth,
updateFunction,
} = require('./faasApi');
const logger = require('../../logger');
const { RetryRequestError, RespStatusError } = require('../utils');
Expand Down Expand Up @@ -67,6 +68,8 @@ const awaitFunctionReadiness = async (
maxWaitInMs = 22000,
waitBetweenIntervalsInMs = 250,
) => {
logger.debug(`Awaiting function readiness: ${functionName}`);

const executionPromise = new Promise(async (resolve) => {
try {
await callWithRetry(
Expand Down Expand Up @@ -130,73 +133,17 @@ const deployFaasFunction = async (
trMetadata = {},
) => {
try {
logger.debug(`[Faas] Deploying a faas function: ${functionName}`);
let envProcess = 'python index.py';

const lvidsString = libraryVersionIDs.join(',');

if (!testMode) {
envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
} else {
envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
}

const envVars = {};
if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') {
envVars.max_inflight = FAAS_MAX_INFLIGHT;
envVars.exec_timeout = FAAS_EXEC_TIMEOUT;
}
if (GEOLOCATION_URL) {
envVars.geolocation_url = GEOLOCATION_URL;
}
// labels
const labels = {
'openfaas-fn': 'true',
'parent-component': 'openfaas',
'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT,
'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT,
'com.openfaas.scale.zero': FAAS_SCALE_ZERO,
'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION,
'com.openfaas.scale.target': FAAS_SCALE_TARGET,
'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION,
'com.openfaas.scale.type': FAAS_SCALE_TYPE,
transformationId: trMetadata.transformationId,
workspaceId: trMetadata.workspaceId,
team: 'data-management',
service: 'openfaas-fn',
customer: 'shared',
'customer-tier': CUSTOMER_TIER,
};
if (
trMetadata.workspaceId &&
customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)
) {
labels['custom-network-policy'] = 'true';
}

// TODO: investigate and add more required labels and annotations
const payload = {
service: functionName,
name: functionName,
image: FAAS_BASE_IMG,
envProcess,
envVars,
labels,
annotations: {
'prometheus.io.scrape': 'true',
},
limits: {
memory: FAAS_LIMITS_MEMORY,
cpu: FAAS_LIMITS_CPU,
},
requests: {
memory: FAAS_REQUESTS_MEMORY,
cpu: FAAS_REQUESTS_CPU,
},
};
logger.debug(`Deploying faas fn: ${functionName}`);

const payload = buildOpenfaasFn(
functionName,
code,
versionId,
libraryVersionIDs,
testMode,
trMetadata,
);
await deployFunction(payload);
logger.debug('[Faas] Deployed a faas function');
} catch (error) {
logger.error(`[Faas] Error while deploying ${functionName}: ${error.message}`);
// To handle concurrent create requests,
Expand Down Expand Up @@ -246,6 +193,103 @@ async function setupFaasFunction(
}
}

// reconcileFn runs everytime the service boot's up
// trying to update the functions which are not in cache to the
// latest label and envVars
const reconcileFn = async (name, versionId, libraryVersionIDs, trMetadata) => {
logger.warn(`Reconciling faas function: ${name}`);

try {
if (isFunctionDeployed(name)) {
return;

Check warning on line 204 in src/util/openfaas/index.js

View check run for this annotation

Codecov / codecov/patch

src/util/openfaas/index.js#L204

Added line #L204 was not covered by tests
}

await updateFunction(
name,
buildOpenfaasFn(name, null, versionId, libraryVersionIDs, false, trMetadata),
);
// if the function is successfully updated, then
// simply set the function in cache.
setFunctionInCache(name);
} catch (error) {
if (error.statusCode !== 404) {
logger.error(
`unexpected error occurred when reconciling the function ${name}: ${error.message}`,
);
// FIXME: We need to limit use of retryable errors which
// convert to 809's and choke the pipeline.
throw new RespStatusError(error.message, 500);
}
}
};

// buildOpenfaasFn is helper function to build openfaas fn CRUD payload
function buildOpenfaasFn(name, code, versionId, libraryVersionIDs, testMode, trMetadata = {}) {
logger.debug(`Building faas fn: ${name}`);

let envProcess = 'python index.py';
const lvidsString = libraryVersionIDs.join(',');

if (!testMode) {
envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
} else {
envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
}

const envVars = {};

if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') {
envVars.max_inflight = FAAS_MAX_INFLIGHT;
envVars.exec_timeout = FAAS_EXEC_TIMEOUT;
}

if (GEOLOCATION_URL) {
envVars.geolocation_url = GEOLOCATION_URL;

Check warning on line 247 in src/util/openfaas/index.js

View check run for this annotation

Codecov / codecov/patch

src/util/openfaas/index.js#L247

Added line #L247 was not covered by tests
}

const labels = {
'openfaas-fn': 'true',
'parent-component': 'openfaas',
'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT,
'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT,
'com.openfaas.scale.zero': FAAS_SCALE_ZERO,
'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION,
'com.openfaas.scale.target': FAAS_SCALE_TARGET,
'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION,
'com.openfaas.scale.type': FAAS_SCALE_TYPE,
transformationId: trMetadata.transformationId,
workspaceId: trMetadata.workspaceId,
team: 'data-management',
service: 'openfaas-fn',
customer: 'shared',
'customer-tier': CUSTOMER_TIER,
};

if (trMetadata.workspaceId && customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)) {
labels['custom-network-policy'] = 'true';

Check warning on line 269 in src/util/openfaas/index.js

View check run for this annotation

Codecov / codecov/patch

src/util/openfaas/index.js#L269

Added line #L269 was not covered by tests
}

return {
service: name,
name: name,
image: FAAS_BASE_IMG,
envProcess,
envVars,
labels,
annotations: {
'prometheus.io.scrape': 'true',
},
limits: {
memory: FAAS_LIMITS_MEMORY,
cpu: FAAS_LIMITS_CPU,
},
requests: {
memory: FAAS_REQUESTS_MEMORY,
cpu: FAAS_REQUESTS_CPU,
},
};
}

const executeFaasFunction = async (
name,
events,
Expand All @@ -260,14 +304,19 @@ const executeFaasFunction = async (
let errorRaised;

try {
if (testMode) await awaitFunctionReadiness(name);
if (testMode) {
await awaitFunctionReadiness(name);
} else {
await reconcileFn(name, versionId, libraryVersionIDs, trMetadata);
}
return await invokeFunction(name, events);
} catch (error) {
logger.error(`Error while invoking ${name}: ${error.message}`);
errorRaised = error;

if (error.statusCode === 404 && error.message.includes(`error finding function ${name}`)) {
removeFunctionFromCache(name);

await setupFaasFunction(name, null, versionId, libraryVersionIDs, testMode, trMetadata);
throw new RetryRequestError(`${name} not found`);
}
Expand Down Expand Up @@ -313,6 +362,7 @@ module.exports = {
executeFaasFunction,
setupFaasFunction,
invalidateFnCache,
buildOpenfaasFn,
FAAS_AST_VID,
FAAS_AST_FN_NAME,
};
22 changes: 21 additions & 1 deletion test/__tests__/user_transformation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ jest.mock("axios", () => ({
...jest.requireActual("axios"),
get: jest.fn(),
post: jest.fn(),
delete: jest.fn()
delete: jest.fn(),
put: jest.fn()
}));

const { Response, Headers } = jest.requireActual("node-fetch");
Expand Down Expand Up @@ -35,6 +36,7 @@ const {
} = require("../../src/util/customTransformer");
const { parserForImport } = require("../../src/util/parser");
const { RetryRequestError, RespStatusError } = require("../../src/util/utils");
const { buildOpenfaasFn } = require("../../src/util/openfaas/index");

const OPENFAAS_GATEWAY_URL = "http://localhost:8080";
const defaultBasicAuth = {
Expand Down Expand Up @@ -1620,11 +1622,18 @@ describe("Python transformations", () => {
json: jest.fn().mockResolvedValue(respBody)
});

axios.put.mockResolvedValue({});
axios.post.mockResolvedValue({ data: { transformedEvents: outputData } });

const output = await userTransformHandler(inputData, versionId, []);
expect(output).toEqual(outputData);


expect(axios.put).toHaveBeenCalledTimes(1);
expect(axios.put).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/system/functions`,
buildOpenfaasFn(funcName, null, versionId, [], false, {}),
{ auth: defaultBasicAuth });
expect(axios.post).toHaveBeenCalledTimes(1);
expect(axios.post).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/function/${funcName}`,
Expand All @@ -1648,6 +1657,11 @@ describe("Python transformations", () => {
json: jest.fn().mockResolvedValue(respBody)
});


axios.put.mockRejectedValueOnce({
response: { status: 404, data: `deployment not found`}
});

axios.post
.mockRejectedValueOnce({
response: { status: 404, data: `error finding function ${funcName}` } // invoke function not found
Expand All @@ -1659,6 +1673,12 @@ describe("Python transformations", () => {
await userTransformHandler(inputData, versionId, []);
}).rejects.toThrow(RetryRequestError);

expect(axios.put).toHaveBeenCalledTimes(1);
expect(axios.put).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/system/functions`,
buildOpenfaasFn(funcName, null, versionId, [], false, {}),
{ auth: defaultBasicAuth },
);
expect(axios.post).toHaveBeenCalledTimes(2);
expect(axios.post).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/function/${funcName}`,
Expand Down

0 comments on commit 584d81c

Please sign in to comment.