Skip to content

Commit

Permalink
chore: setup transformer code with openfaas pro deployment changes
Browse files Browse the repository at this point in the history
  • Loading branch information
abhimanyubabbar committed Apr 12, 2024
1 parent 3399c47 commit a486ad3
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 19 deletions.
41 changes: 30 additions & 11 deletions src/util/openfaas/faasApi.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
const axios = require('axios');
const { RespStatusError, RetryRequestError } = require('../utils');

//const logger = require('../../logger');

const OPENFAAS_GATEWAY_URL = process.env.OPENFAAS_GATEWAY_URL || 'http://localhost:8080';
const OPENFAAS_GATEWAY_USERNAME = process.env.OPENFAAS_GATEWAY_USERNAME || '';
const OPENFAAS_GATEWAY_PASSWORD = process.env.OPENFAAS_GATEWAY_PASSWORD || '';

const basicAuth = {
username: OPENFAAS_GATEWAY_USERNAME,
password: OPENFAAS_GATEWAY_PASSWORD,
};

const parseAxiosError = (error) => {
if (error.response) {
Expand All @@ -21,7 +30,7 @@ const deleteFunction = async (functionName) =>
new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/functions`;
axios
.delete(url, { data: { functionName } })
.delete(url, { data: { functionName }, auth: basicAuth })
.then(() => resolve())
.catch((err) => reject(parseAxiosError(err)));
});
Expand All @@ -30,7 +39,7 @@ const getFunction = async (functionName) =>
new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/function/${functionName}`;
axios
.get(url)
.get(url, { auth: basicAuth })
.then((resp) => resolve(resp.data))
.catch((err) => reject(parseAxiosError(err)));
});
Expand All @@ -39,7 +48,7 @@ const getFunctionList = async () =>
new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/functions`;
axios
.get(url)
.get(url, { auth: basicAuth })
.then((resp) => resolve(resp.data))
.catch((err) => reject(parseAxiosError(err)));
});
Expand All @@ -48,29 +57,39 @@ const invokeFunction = async (functionName, payload) =>
new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`;
axios
.post(url, payload)
.post(url, payload, { auth: basicAuth })
.then((resp) => resolve(resp.data))
.catch((err) => reject(parseAxiosError(err)));
});

const checkFunctionHealth = async (functionName) =>
new Promise((resolve, reject) => {
const checkFunctionHealth = async (functionName) => {
//logger.error(`checking function health: ${functionName}`);

return new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`;
axios
.get(url, {
headers: { 'X-REQUEST-TYPE': 'HEALTH-CHECK' },
})
.get(
url,
{
headers: { 'X-REQUEST-TYPE': 'HEALTH-CHECK' },
},
{ auth: basicAuth },
)
.then((resp) => resolve(resp))
.catch((err) => reject(parseAxiosError(err)));
});
};

const deployFunction = async (payload) =>
new Promise((resolve, reject) => {
const url = `${OPENFAAS_GATEWAY_URL}/system/functions`;
axios
.post(url, payload)
.post(url, payload, { auth: basicAuth })
.then((resp) => resolve(resp.data))
.catch((err) => reject(parseAxiosError(err)));
.catch((err) => {
//logger.error(err);
reject(parseAxiosError(err));
});
});

module.exports = {
Expand Down
28 changes: 26 additions & 2 deletions src/util/openfaas/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ const stats = require('../stats');
const { getMetadata, getTransformationMetadata } = require('../../v0/util');
const { HTTP_STATUS_CODES } = require('../../v0/util/constant');

const FAAS_SCALE_TYPE = process.env.FAAS_SCALE_TYPE || 'capacity';
const FAAS_SCALE_TARGET = process.env.FAAS_SCALE_TARGET || '4';
const FAAS_SCALE_TARGET_PROPORTION = process.env.FAAS_SCALE_TARGET_PROPORTION || '0.70';
const FAAS_SCALE_ZERO = process.env.FAAS_SCALE_ZERO || 'false';
const FAAS_SCALE_ZERO_DURATION = process.env.FAAS_SCALE_ZERO_DURATION || '15m';
const FAAS_BASE_IMG = process.env.FAAS_BASE_IMG || 'rudderlabs/openfaas-flask:main';
const FAAS_MAX_PODS_IN_TEXT = process.env.FAAS_MAX_PODS_IN_TEXT || '40';
const FAAS_MIN_PODS_IN_TEXT = process.env.FAAS_MIN_PODS_IN_TEXT || '1';
Expand Down Expand Up @@ -62,6 +67,8 @@ const awaitFunctionReadiness = async (
maxWaitInMs = 22000,
waitBetweenIntervalsInMs = 250,
) => {
//logger.error(`awaiting function readiness: ${functionName}`);

const executionPromise = new Promise(async (resolve) => {
try {
await callWithRetry(
Expand Down Expand Up @@ -125,7 +132,7 @@ const deployFaasFunction = async (
trMetadata = {},
) => {
try {
logger.debug('[Faas] Deploying a faas function');
logger.debug(`[Faas] Deploying a faas function: ${functionName}`);
let envProcess = 'python index.py';

const lvidsString = libraryVersionIDs.join(',');
Expand All @@ -150,6 +157,11 @@ const deployFaasFunction = async (
'parent-component': 'openfaas',
'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT,
'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT,
'com.openfaas.scale.zero': FAAS_SCALE_ZERO,
'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION,
'com.openfaas.scale.target': FAAS_SCALE_TARGET,
'com.openfaas.scale.target.proportion': FAAS_SCALE_TARGET_PROPORTION,
'com.openfaas.scale.type': FAAS_SCALE_TYPE,
transformationId: trMetadata.transformationId,
workspaceId: trMetadata.workspaceId,
team: 'data-management',
Expand Down Expand Up @@ -202,6 +214,18 @@ const deployFaasFunction = async (
}
};

async function removeFaasFunction(fname) {
logger.debug(`[Faas] Removing faas function: ${fname}`);
try {
await deleteFunction(fname);

Check warning on line 220 in src/util/openfaas/index.js

View check run for this annotation

Codecov / codecov/patch

src/util/openfaas/index.js#L217-L220

Added lines #L217 - L220 were not covered by tests
} catch (error) {
if (error.statusCode !== 404) {
logger.error(`[Faas] Error while removing ${fname}: ${error.message}`);
throw error;

Check warning on line 224 in src/util/openfaas/index.js

View check run for this annotation

Codecov / codecov/patch

src/util/openfaas/index.js#L223-L224

Added lines #L223 - L224 were not covered by tests
}
}
}

async function setupFaasFunction(
functionName,
code,
Expand All @@ -212,7 +236,7 @@ async function setupFaasFunction(
) {
try {
if (!testMode && isFunctionDeployed(functionName)) {
logger.debug(`[Faas] Function ${functionName} already deployed`);
logger.error(`[Faas] Function ${functionName} already deployed`);
return;
}
// deploy faas function
Expand Down
26 changes: 20 additions & 6 deletions test/__tests__/user_transformation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const { parserForImport } = require("../../src/util/parser");
const { RetryRequestError, RespStatusError } = require("../../src/util/utils");

const OPENFAAS_GATEWAY_URL = "http://localhost:8080";
const defaultBasicAuth = {
"username": "",
"password": ""
};

const randomID = () =>
Math.random()
Expand Down Expand Up @@ -1400,12 +1404,14 @@ describe("Python transformations", () => {
expect(axios.post).toHaveBeenCalledTimes(1);
expect(axios.post).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/system/functions`,
expect.objectContaining({ name: funcName, service: funcName })
expect.objectContaining({ name: funcName, service: funcName }),
{ auth: defaultBasicAuth },
);
expect(axios.get).toHaveBeenCalledTimes(1);
expect(axios.get).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/function/${funcName}`,
{"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}}
{"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}},
{ auth: defaultBasicAuth },
);
});

Expand Down Expand Up @@ -1622,7 +1628,8 @@ describe("Python transformations", () => {
expect(axios.post).toHaveBeenCalledTimes(1);
expect(axios.post).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/function/${funcName}`,
inputData
inputData,
{ auth: defaultBasicAuth },
);
});

Expand All @@ -1641,6 +1648,10 @@ describe("Python transformations", () => {
json: jest.fn().mockResolvedValue(respBody)
});

axios.delete.mockResolvedValue({
response: { status: 404, data: `error finding function ${funcName}` }
}); // delete function

axios.post
.mockRejectedValueOnce({
response: { status: 404, data: `error finding function ${funcName}` } // invoke function not found
Expand All @@ -1655,17 +1666,20 @@ describe("Python transformations", () => {
expect(axios.post).toHaveBeenCalledTimes(2);
expect(axios.post).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/function/${funcName}`,
inputData
inputData,
{ auth: defaultBasicAuth },
);
expect(axios.post).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/system/functions`,
expect.objectContaining({ name: funcName, service: funcName })
expect.objectContaining({ name: funcName, service: funcName }),
{ auth: defaultBasicAuth },
);

expect(axios.get).toHaveBeenCalledTimes(1);
expect(axios.get).toHaveBeenCalledWith(
`${OPENFAAS_GATEWAY_URL}/function/${funcName}`,
{"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}}
{"headers": {"X-REQUEST-TYPE": "HEALTH-CHECK"}},
{ auth: defaultBasicAuth },
);
});

Expand Down

0 comments on commit a486ad3

Please sign in to comment.