Skip to content

Commit

Permalink
chore: added reconcile fn for openfaas fn pods
Browse files Browse the repository at this point in the history
  • Loading branch information
abhimanyubabbar committed May 2, 2024
1 parent 616683f commit 0764789
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 75 deletions.
26 changes: 22 additions & 4 deletions src/util/customTransformer-faas.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,26 @@ const {
} = require('./openfaas');
const { getLibraryCodeV1 } = require('./customTransforrmationsStore-v1');

const HASH_SECRET = process.env.OPENFAAS_FN_HASH_SECRET || '';
const libVersionIdsCache = new NodeCache();

function generateFunctionName(userTransformation, libraryVersionIds, testMode) {
function generateFunctionName(userTransformation, libraryVersionIds, testMode, hashSecret = '') {
if (userTransformation.versionId === FAAS_AST_VID) return FAAS_AST_FN_NAME;

if (testMode) {
const funcName = `fn-test-${uuidv4()}`;
return funcName.substring(0, 63).toLowerCase();
}

const ids = [userTransformation.workspaceId, userTransformation.versionId].concat(
let ids = [userTransformation.workspaceId, userTransformation.versionId].concat(
(libraryVersionIds || []).sort(),
);

if (hashSecret !== '') {
ids = ids.concat([hashSecret]);
}

// FIXME: Why the id's are sorted ?!
const hash = crypto.createHash('md5').update(`${ids}`).digest('hex');
return `fn-${userTransformation.workspaceId}-${hash}`.substring(0, 63).toLowerCase();
}
Expand Down Expand Up @@ -90,7 +96,13 @@ async function setOpenFaasUserTransform(
testMode,
};
const functionName =
pregeneratedFnName || generateFunctionName(userTransformation, libraryVersionIds, testMode);
pregeneratedFnName ||
generateFunctionName(
userTransformation,
libraryVersionIds,
testMode,
process.env.OPENFAAS_FN_HASH_SECRET,
);
const setupTime = new Date();

await setupFaasFunction(
Expand Down Expand Up @@ -130,7 +142,13 @@ async function runOpenFaasUserTransform(

const trMetadata = events[0].metadata ? getTransformationMetadata(events[0].metadata) : {};
// check and deploy faas function if not exists
const functionName = generateFunctionName(userTransformation, libraryVersionIds, testMode);
const functionName = generateFunctionName(
userTransformation,
libraryVersionIds,
testMode,
process.env.OPENFAAS_FN_HASH_SECRET,
);

if (testMode) {
await setOpenFaasUserTransform(
userTransformation,
Expand Down
26 changes: 24 additions & 2 deletions src/util/openfaas/faasApi.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const axios = require('axios');
const { RespStatusError, RetryRequestError } = require('../utils');

const logger = require('../../logger');

const OPENFAAS_GATEWAY_URL = process.env.OPENFAAS_GATEWAY_URL || 'http://localhost:8080';
const OPENFAAS_GATEWAY_USERNAME = process.env.OPENFAAS_GATEWAY_USERNAME || '';
const OPENFAAS_GATEWAY_PASSWORD = process.env.OPENFAAS_GATEWAY_PASSWORD || '';
Expand Down Expand Up @@ -61,6 +63,8 @@ const invokeFunction = async (functionName, payload) =>
});

const checkFunctionHealth = async (functionName) => {
logger.debug(`Checking function health: ${functionName}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`;
axios
Expand All @@ -76,8 +80,10 @@ const checkFunctionHealth = async (functionName) => {
});
};

const deployFunction = async (payload) =>
new Promise((resolve, reject) => {
const deployFunction = async (payload) => {
logger.debug(`Deploying function: ${payload?.name}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/functions`;
axios
.post(url, payload, { auth: basicAuth })
Expand All @@ -86,6 +92,21 @@ const deployFunction = async (payload) =>
reject(parseAxiosError(err));
});
});
};

const updateFunction = async (fnName, payload) => {
logger.warn(`Updating function: ${fnName}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/functions`;
axios
.put(url, payload, { auth: basicAuth })
.then((resp) => resolve(resp.data))
.catch((err) => {
reject(parseAxiosError(err));
});
});
};

module.exports = {
deleteFunction,
Expand All @@ -94,4 +115,5 @@ module.exports = {
getFunctionList,
invokeFunction,
checkFunctionHealth,
updateFunction,
};
182 changes: 116 additions & 66 deletions src/util/openfaas/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const {
deployFunction,
invokeFunction,
checkFunctionHealth,
updateFunction,
} = require('./faasApi');
const logger = require('../../logger');
const { RetryRequestError, RespStatusError } = require('../utils');
Expand Down Expand Up @@ -67,6 +68,8 @@ const awaitFunctionReadiness = async (
maxWaitInMs = 22000,
waitBetweenIntervalsInMs = 250,
) => {
logger.debug(`Awaiting function readiness: ${functionName}`);

const executionPromise = new Promise(async (resolve) => {
try {
await callWithRetry(
Expand Down Expand Up @@ -130,73 +133,17 @@ const deployFaasFunction = async (
trMetadata = {},
) => {
try {
logger.debug(`[Faas] Deploying a faas function: ${functionName}`);
let envProcess = 'python index.py';

const lvidsString = libraryVersionIDs.join(',');

if (!testMode) {
envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
} else {
envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
}

const envVars = {};
if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') {
envVars.max_inflight = FAAS_MAX_INFLIGHT;
envVars.exec_timeout = FAAS_EXEC_TIMEOUT;
}
if (GEOLOCATION_URL) {
envVars.geolocation_url = GEOLOCATION_URL;
}
// labels
const labels = {
'openfaas-fn': 'true',
'parent-component': 'openfaas',
'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT,
'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT,
'com.openfaas.scale.zero': FAAS_SCALE_ZERO,
'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION,
'com.openfaas.scale.target': FAAS_SCALE_TARGET,
'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION,
'com.openfaas.scale.type': FAAS_SCALE_TYPE,
transformationId: trMetadata.transformationId,
workspaceId: trMetadata.workspaceId,
team: 'data-management',
service: 'openfaas-fn',
customer: 'shared',
'customer-tier': CUSTOMER_TIER,
};
if (
trMetadata.workspaceId &&
customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)
) {
labels['custom-network-policy'] = 'true';
}

// TODO: investigate and add more required labels and annotations
const payload = {
service: functionName,
name: functionName,
image: FAAS_BASE_IMG,
envProcess,
envVars,
labels,
annotations: {
'prometheus.io.scrape': 'true',
},
limits: {
memory: FAAS_LIMITS_MEMORY,
cpu: FAAS_LIMITS_CPU,
},
requests: {
memory: FAAS_REQUESTS_MEMORY,
cpu: FAAS_REQUESTS_CPU,
},
};
logger.debug(`Deploying faas fn: ${functionName}`);

const payload = buildOpenfaasFn(
functionName,
code,
versionId,
libraryVersionIDs,
testMode,
trMetadata,
);
await deployFunction(payload);
logger.debug('[Faas] Deployed a faas function');
} catch (error) {
logger.error(`[Faas] Error while deploying ${functionName}: ${error.message}`);
// To handle concurrent create requests,
Expand Down Expand Up @@ -246,6 +193,103 @@ async function setupFaasFunction(
}
}

// reconcileFn runs everytime the service boot's up
// trying to update the functions which are not in cache to the
// latest label and envVars
const reconcileFn = async (name, versionId, libraryVersionIDs, trMetadata) => {
logger.warn(`Reconciling faas function: ${name}`);

try {
if (isFunctionDeployed(name)) {
return;
}

await updateFunction(
name,
buildOpenfaasFn(name, null, versionId, libraryVersionIDs, false, trMetadata),
);
// if the function is successfully updated, then
// simply set the function in cache.
setFunctionInCache(name);
} catch (error) {
if (error.statusCode !== 404) {
logger.error(
`unexpected error occurred when reconciling the function ${name}: ${error.message}`,
);
// FIXME: We need to limit use of retryable errors which
// convert to 809's and choke the pipeline.
throw new RespStatusError(error.message, 500);
}
}
};

// buildOpenfaasFn is helper function to build openfaas fn CRUD payload
function buildOpenfaasFn(name, code, versionId, libraryVersionIDs, testMode, trMetadata = {}) {
logger.debug(`Building faas fn: ${name}`);

let envProcess = 'python index.py';
const lvidsString = libraryVersionIDs.join(',');

if (!testMode) {
envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
} else {
envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`;
}

const envVars = {};

if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') {
envVars.max_inflight = FAAS_MAX_INFLIGHT;
envVars.exec_timeout = FAAS_EXEC_TIMEOUT;
}

if (GEOLOCATION_URL) {
envVars.geolocation_url = GEOLOCATION_URL;
}

const labels = {
'openfaas-fn': 'true',
'parent-component': 'openfaas',
'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT,
'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT,
'com.openfaas.scale.zero': FAAS_SCALE_ZERO,
'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION,
'com.openfaas.scale.target': FAAS_SCALE_TARGET,
'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION,
'com.openfaas.scale.type': FAAS_SCALE_TYPE,
transformationId: trMetadata.transformationId,
workspaceId: trMetadata.workspaceId,
team: 'data-management',
service: 'openfaas-fn',
customer: 'shared',
'customer-tier': CUSTOMER_TIER,
};

if (trMetadata.workspaceId && customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)) {
labels['custom-network-policy'] = 'true';
}

return {
service: name,
name: name,
image: FAAS_BASE_IMG,
envProcess,
envVars,
labels,
annotations: {
'prometheus.io.scrape': 'true',
},
limits: {
memory: FAAS_LIMITS_MEMORY,
cpu: FAAS_LIMITS_CPU,
},
requests: {
memory: FAAS_REQUESTS_MEMORY,
cpu: FAAS_REQUESTS_CPU,
},
};
}

const executeFaasFunction = async (
name,
events,
Expand All @@ -260,14 +304,19 @@ const executeFaasFunction = async (
let errorRaised;

try {
if (testMode) await awaitFunctionReadiness(name);
if (testMode) {
await awaitFunctionReadiness(name);
} else {
await reconcileFn(name, versionId, libraryVersionIDs, trMetadata);
}
return await invokeFunction(name, events);
} catch (error) {
logger.error(`Error while invoking ${name}: ${error.message}`);
errorRaised = error;

if (error.statusCode === 404 && error.message.includes(`error finding function ${name}`)) {
removeFunctionFromCache(name);

await setupFaasFunction(name, null, versionId, libraryVersionIDs, testMode, trMetadata);
throw new RetryRequestError(`${name} not found`);
}
Expand Down Expand Up @@ -313,6 +362,7 @@ module.exports = {
executeFaasFunction,
setupFaasFunction,
invalidateFnCache,
buildOpenfaasFn,
FAAS_AST_VID,
FAAS_AST_FN_NAME,
};
Loading

0 comments on commit 0764789

Please sign in to comment.