Skip to content

Commit

Permalink
chore: added reconcile fn for openfaas fn pods (#3420)
Browse files Browse the repository at this point in the history
  • Loading branch information
abhimanyubabbar authored May 28, 2024
1 parent 402aa2c commit 7a2ab63
Show file tree
Hide file tree
Showing 4 changed files with 377 additions and 97 deletions.
26 changes: 22 additions & 4 deletions src/util/customTransformer-faas.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,26 @@ const {
} = require('./openfaas');
const { getLibraryCodeV1 } = require('./customTransforrmationsStore-v1');

const HASH_SECRET = process.env.OPENFAAS_FN_HASH_SECRET || '';
const libVersionIdsCache = new NodeCache();

function generateFunctionName(userTransformation, libraryVersionIds, testMode) {
function generateFunctionName(userTransformation, libraryVersionIds, testMode, hashSecret = '') {
if (userTransformation.versionId === FAAS_AST_VID) return FAAS_AST_FN_NAME;

if (testMode) {
const funcName = `fn-test-${uuidv4()}`;
return funcName.substring(0, 63).toLowerCase();
}

const ids = [userTransformation.workspaceId, userTransformation.versionId].concat(
let ids = [userTransformation.workspaceId, userTransformation.versionId].concat(
(libraryVersionIds || []).sort(),
);

if (hashSecret !== '') {
ids = ids.concat([hashSecret]);
}

// FIXME: Why the id's are sorted ?!
const hash = crypto.createHash('md5').update(`${ids}`).digest('hex');
return `fn-${userTransformation.workspaceId}-${hash}`.substring(0, 63).toLowerCase();
}
Expand Down Expand Up @@ -90,7 +96,13 @@ async function setOpenFaasUserTransform(
testMode,
};
const functionName =
pregeneratedFnName || generateFunctionName(userTransformation, libraryVersionIds, testMode);
pregeneratedFnName ||
generateFunctionName(
userTransformation,
libraryVersionIds,
testMode,
process.env.OPENFAAS_FN_HASH_SECRET,
);
const setupTime = new Date();

await setupFaasFunction(
Expand Down Expand Up @@ -130,7 +142,13 @@ async function runOpenFaasUserTransform(

const trMetadata = events[0].metadata ? getTransformationMetadata(events[0].metadata) : {};
// check and deploy faas function if not exists
const functionName = generateFunctionName(userTransformation, libraryVersionIds, testMode);
const functionName = generateFunctionName(
userTransformation,
libraryVersionIds,
testMode,
process.env.OPENFAAS_FN_HASH_SECRET,
);

if (testMode) {
await setOpenFaasUserTransform(
userTransformation,
Expand Down
28 changes: 25 additions & 3 deletions src/util/openfaas/faasApi.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const axios = require('axios');
const { RespStatusError, RetryRequestError } = require('../utils');

const logger = require('../../logger');

const OPENFAAS_GATEWAY_URL = process.env.OPENFAAS_GATEWAY_URL || 'http://localhost:8080';
const OPENFAAS_GATEWAY_USERNAME = process.env.OPENFAAS_GATEWAY_USERNAME || '';
const OPENFAAS_GATEWAY_PASSWORD = process.env.OPENFAAS_GATEWAY_PASSWORD || '';
Expand All @@ -12,7 +14,7 @@ const basicAuth = {

const parseAxiosError = (error) => {
if (error.response) {
const status = error.response.status || 400;
const status = error.response.status || 500;
const errorData = error.response?.data;
const message =
(errorData && (errorData.message || errorData.error || errorData)) || error.message;
Expand Down Expand Up @@ -61,6 +63,8 @@ const invokeFunction = async (functionName, payload) =>
});

const checkFunctionHealth = async (functionName) => {
logger.debug(`Checking function health: ${functionName}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`;
axios
Expand All @@ -76,8 +80,10 @@ const checkFunctionHealth = async (functionName) => {
});
};

const deployFunction = async (payload) =>
new Promise((resolve, reject) => {
const deployFunction = async (payload) => {
logger.debug(`Deploying function: ${payload?.name}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/functions`;
axios
.post(url, payload, { auth: basicAuth })
Expand All @@ -86,6 +92,21 @@ const deployFunction = async (payload) =>
reject(parseAxiosError(err));
});
});
};

const updateFunction = async (fnName, payload) => {
logger.debug(`Updating function: ${fnName}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/functions`;
axios
.put(url, payload, { auth: basicAuth })
.then((resp) => resolve(resp.data))
.catch((err) => {
reject(parseAxiosError(err));
});
});
};

module.exports = {
deleteFunction,
Expand All @@ -94,4 +115,5 @@ module.exports = {
getFunctionList,
invokeFunction,
checkFunctionHealth,
updateFunction,
};
205 changes: 141 additions & 64 deletions src/util/openfaas/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const {
deployFunction,
invokeFunction,
checkFunctionHealth,
updateFunction,
} = require('./faasApi');
const logger = require('../../logger');
const { RetryRequestError, RespStatusError } = require('../utils');
Expand Down Expand Up @@ -33,6 +34,7 @@ const FAAS_AST_FN_NAME = 'fn-ast';
const CUSTOM_NETWORK_POLICY_WORKSPACE_IDS = process.env.CUSTOM_NETWORK_POLICY_WORKSPACE_IDS || '';
const customNetworkPolicyWorkspaceIds = CUSTOM_NETWORK_POLICY_WORKSPACE_IDS.split(',');
const CUSTOMER_TIER = process.env.CUSTOMER_TIER || 'shared';
const DISABLE_RECONCILE_FN = process.env.DISABLE_RECONCILE_FN == 'true' || false;

// Initialise node cache
const functionListCache = new NodeCache();
Expand Down Expand Up @@ -67,6 +69,8 @@ const awaitFunctionReadiness = async (
maxWaitInMs = 22000,
waitBetweenIntervalsInMs = 250,
) => {
logger.debug(`Awaiting function readiness: ${functionName}`);

const executionPromise = new Promise(async (resolve) => {
try {
await callWithRetry(
Expand Down Expand Up @@ -121,7 +125,7 @@ const invalidateFnCache = () => {
functionListCache.set(FUNC_LIST_KEY, []);
};

const deployFaasFunction = async (
const updateFaasFunction = async (
functionName,
code,
versionId,
Expand All @@ -130,73 +134,50 @@ const deployFaasFunction = async (
trMetadata = {},
) => {
try {
logger.debug(`[Faas] Deploying a faas function: ${functionName}`);
let envProcess = 'python index.py';

const lvidsString = libraryVersionIDs.join(',');
logger.debug(`Updating faas fn: ${functionName}`);

if (!testMode) {
envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
} else {
envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
}

const envVars = {};
if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') {
envVars.max_inflight = FAAS_MAX_INFLIGHT;
envVars.exec_timeout = FAAS_EXEC_TIMEOUT;
}
if (GEOLOCATION_URL) {
envVars.geolocation_url = GEOLOCATION_URL;
}
// labels
const labels = {
'openfaas-fn': 'true',
'parent-component': 'openfaas',
'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT,
'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT,
'com.openfaas.scale.zero': FAAS_SCALE_ZERO,
'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION,
'com.openfaas.scale.target': FAAS_SCALE_TARGET,
'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION,
'com.openfaas.scale.type': FAAS_SCALE_TYPE,
transformationId: trMetadata.transformationId,
workspaceId: trMetadata.workspaceId,
team: 'data-management',
service: 'openfaas-fn',
customer: 'shared',
'customer-tier': CUSTOMER_TIER,
};
if (
trMetadata.workspaceId &&
customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)
) {
labels['custom-network-policy'] = 'true';
const payload = buildOpenfaasFn(
functionName,
code,
versionId,
libraryVersionIDs,
testMode,
trMetadata,
);
await updateFunction(functionName, payload);
// wait for function to be ready and then set it in cache
await awaitFunctionReadiness(functionName);
setFunctionInCache(functionName);
} catch (error) {
// 404 is statuscode returned from openfaas community edition
// when the function don't exist, so we can safely ignore this error
// and let the function be created in the next step.
if (error.statusCode !== 404) {
throw error;
}
}
};

// TODO: investigate and add more required labels and annotations
const payload = {
service: functionName,
name: functionName,
image: FAAS_BASE_IMG,
envProcess,
envVars,
labels,
annotations: {
'prometheus.io.scrape': 'true',
},
limits: {
memory: FAAS_LIMITS_MEMORY,
cpu: FAAS_LIMITS_CPU,
},
requests: {
memory: FAAS_REQUESTS_MEMORY,
cpu: FAAS_REQUESTS_CPU,
},
};
const deployFaasFunction = async (
functionName,
code,
versionId,
libraryVersionIDs,
testMode,
trMetadata = {},

Check warning on line 167 in src/util/openfaas/index.js

View check run for this annotation

Codecov / codecov/patch

src/util/openfaas/index.js#L167

Added line #L167 was not covered by tests
) => {
try {
logger.debug(`Deploying faas fn: ${functionName}`);

const payload = buildOpenfaasFn(
functionName,
code,
versionId,
libraryVersionIDs,
testMode,
trMetadata,
);
await deployFunction(payload);
logger.debug('[Faas] Deployed a faas function');
} catch (error) {
logger.error(`[Faas] Error while deploying ${functionName}: ${error.message}`);
// To handle concurrent create requests,
Expand Down Expand Up @@ -246,6 +227,95 @@ async function setupFaasFunction(
}
}

// reconcileFn runs everytime the service boot's up
// trying to update the functions which are not in cache to the
// latest label and envVars
const reconcileFn = async (name, versionId, libraryVersionIDs, trMetadata) => {
if (DISABLE_RECONCILE_FN) {
return;

Check warning on line 235 in src/util/openfaas/index.js

View check run for this annotation

Codecov / codecov/patch

src/util/openfaas/index.js#L235

Added line #L235 was not covered by tests
}

logger.debug(`Reconciling faas function: ${name}`);
try {
if (isFunctionDeployed(name)) {
return;
}
await updateFaasFunction(name, null, versionId, libraryVersionIDs, false, trMetadata);
} catch (error) {
logger.error(
`unexpected error occurred when reconciling the function ${name}: ${error.message}`,
);
throw error;
}
};

// buildOpenfaasFn is helper function to build openfaas fn CRUD payload
function buildOpenfaasFn(name, code, versionId, libraryVersionIDs, testMode, trMetadata = {}) {
logger.debug(`Building faas fn: ${name}`);

let envProcess = 'python index.py';
const lvidsString = libraryVersionIDs.join(',');

if (!testMode) {
envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
} else {
envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
}

const envVars = {};

if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') {
envVars.max_inflight = FAAS_MAX_INFLIGHT;
envVars.exec_timeout = FAAS_EXEC_TIMEOUT;
}

if (GEOLOCATION_URL) {
envVars.geolocation_url = GEOLOCATION_URL;

Check warning on line 273 in src/util/openfaas/index.js

View check run for this annotation

Codecov / codecov/patch

src/util/openfaas/index.js#L273

Added line #L273 was not covered by tests
}

const labels = {
'openfaas-fn': 'true',
'parent-component': 'openfaas',
'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT,
'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT,
'com.openfaas.scale.zero': FAAS_SCALE_ZERO,
'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION,
'com.openfaas.scale.target': FAAS_SCALE_TARGET,
'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION,
'com.openfaas.scale.type': FAAS_SCALE_TYPE,
transformationId: trMetadata.transformationId,
workspaceId: trMetadata.workspaceId,
team: 'data-management',
service: 'openfaas-fn',
customer: 'shared',
'customer-tier': CUSTOMER_TIER,
};

if (trMetadata.workspaceId && customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)) {
labels['custom-network-policy'] = 'true';

Check warning on line 295 in src/util/openfaas/index.js

View check run for this annotation

Codecov / codecov/patch

src/util/openfaas/index.js#L295

Added line #L295 was not covered by tests
}

return {
service: name,
name: name,
image: FAAS_BASE_IMG,
envProcess,
envVars,
labels,
annotations: {
'prometheus.io.scrape': 'true',
},
limits: {
memory: FAAS_LIMITS_MEMORY,
cpu: FAAS_LIMITS_CPU,
},
requests: {
memory: FAAS_REQUESTS_MEMORY,
cpu: FAAS_REQUESTS_CPU,
},
};
}

const executeFaasFunction = async (
name,
events,
Expand All @@ -260,14 +330,19 @@ const executeFaasFunction = async (
let errorRaised;

try {
if (testMode) await awaitFunctionReadiness(name);
if (testMode) {
await awaitFunctionReadiness(name);
} else {
await reconcileFn(name, versionId, libraryVersionIDs, trMetadata);
}
return await invokeFunction(name, events);
} catch (error) {
logger.error(`Error while invoking ${name}: ${error.message}`);
errorRaised = error;

if (error.statusCode === 404 && error.message.includes(`error finding function ${name}`)) {
removeFunctionFromCache(name);

await setupFaasFunction(name, null, versionId, libraryVersionIDs, testMode, trMetadata);
throw new RetryRequestError(`${name} not found`);
}
Expand Down Expand Up @@ -314,6 +389,8 @@ module.exports = {
executeFaasFunction,
setupFaasFunction,
invalidateFnCache,
buildOpenfaasFn,
FAAS_AST_VID,
FAAS_AST_FN_NAME,
setFunctionInCache,
};
Loading

0 comments on commit 7a2ab63

Please sign in to comment.