Skip to content

Commit

Permalink
refactor: migrate cdk v2 handler to ts (#2313)
Browse files Browse the repository at this point in the history
* refactor: migrate cdk handler to ts

* refactor: clean up handleV0Destination

* refactor: clean up handleV0Destination

* feat: add step tests for bingads audience

* feat: add step tests for bingads audience
  • Loading branch information
koladilip authored Jun 28, 2023
1 parent 2c76fdd commit c91dadc
Show file tree
Hide file tree
Showing 15 changed files with 3,835 additions and 428 deletions.
72 changes: 53 additions & 19 deletions src/cdk/v2/handler.js → src/cdk/v2/handler.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
const { WorkflowEngineFactory, TemplateType } = require('rudder-workflow-engine');
import {
WorkflowEngine,
WorkflowEngineFactory,
TemplateType,
ExecutionBindings,
StepOutput,
} from 'rudder-workflow-engine';

const tags = require('../../v0/util/tags');
import tags from '../../v0/util/tags';

const defTags = {
[tags.TAG_NAMES.IMPLEMENTATION]: tags.IMPLEMENTATIONS.CDK_V2,
};

const {
import {
getErrorInfo,
getRootPathForDestination,
getWorkflowPath,
getPlatformBindingsPaths,
isCdkV2Destination,
} = require('./utils');
} from './utils';

const defTags = {
[tags.TAG_NAMES.IMPLEMENTATION]: tags.IMPLEMENTATIONS.CDK_V2,
};

export function getEmptyExecutionBindings() {
const context = {};
return {
outputs: {},
context,
setContext: (key: string, value: any) => {
context[key] = value;
},
};
}

async function getWorkflowEngine(destName, feature, bindings = {}) {
export async function getWorkflowEngine(
destName: string,
feature: string,
bindings: Record<string, any> = {},
) {
const destRootDir = getRootPathForDestination(destName);
const workflowPath = await getWorkflowPath(destRootDir, feature);
const platformBindingsPaths = await getPlatformBindingsPaths();
Expand All @@ -28,7 +49,11 @@ async function getWorkflowEngine(destName, feature, bindings = {}) {

const workflowEnginePromiseMap = new Map();

function getCachedWorkflowEngine(destName, feature, bindings = {}) {
export function getCachedWorkflowEngine(
destName: string,
feature: string,
bindings: Record<string, any> = {},
): WorkflowEngine {
// Create a new instance of the engine for the destination if needed
// TODO: Use cache to avoid long living engine objects
workflowEnginePromiseMap[destName] = workflowEnginePromiseMap[destName] || new Map();
Expand All @@ -38,7 +63,7 @@ function getCachedWorkflowEngine(destName, feature, bindings = {}) {
return workflowEnginePromiseMap[destName][feature];
}

async function process(workflowEngine, parsedEvent) {
export async function executeWorkflow(workflowEngine: WorkflowEngine, parsedEvent: any) {
try {
const result = await workflowEngine.execute(parsedEvent);
// TODO: Handle remaining output scenarios
Expand All @@ -48,18 +73,27 @@ async function process(workflowEngine, parsedEvent) {
}
}

async function processCdkV2Workflow(destType, parsedEvent, feature, bindings = {}) {
export async function processCdkV2Workflow(
destType: string,
parsedEvent: any,
feature: string,
bindings: Record<string, any> = {},
) {
try {
const workflowEngine = await getCachedWorkflowEngine(destType, feature, bindings);
return await process(workflowEngine, parsedEvent);
return await executeWorkflow(workflowEngine, parsedEvent);
} catch (error) {
throw getErrorInfo(error, isCdkV2Destination(parsedEvent), defTags);
}
}

module.exports = {
getWorkflowEngine,
getCachedWorkflowEngine,
processCdkV2Workflow,
process,
};
export function executeStep(
workflowEngine: WorkflowEngine,
stepName: string,
input: any,
bindings?: ExecutionBindings,
): Promise<StepOutput> {
return workflowEngine
.getStepExecutor(stepName)
.execute(input, Object.assign(workflowEngine.bindings, getEmptyExecutionBindings(), bindings));
}
50 changes: 19 additions & 31 deletions src/cdk/v2/utils.js → src/cdk/v2/utils.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
const path = require('path');
const fs = require('fs/promises');
const { WorkflowExecutionError, WorkflowCreationError } = require('rudder-workflow-engine');
const logger = require('../../logger');
const { generateErrorObject } = require('../../v0/util');
const { PlatformError } = require('../../v0/util/errorTypes');
const tags = require('../../v0/util/tags');
import path from 'path';
import fs from 'fs/promises';
import { WorkflowExecutionError, WorkflowCreationError, StatusError } from 'rudder-workflow-engine';
import logger from '../../logger';
import { generateErrorObject } from '../../v0/util';
import { PlatformError } from '../../v0/util/errorTypes';
import tags from '../../v0/util/tags';

const CDK_V2_ROOT_DIR = __dirname;

async function getWorkflowPath(destDir, feature) {
export async function getWorkflowPath(destDir, feature) {
// The values are of array type to support aliases
const featureWorkflowMap = {
// Didn't add any prefix as processor transformation is the default
Expand All @@ -34,15 +34,15 @@ async function getWorkflowPath(destDir, feature) {
return validWorkflowFilepath;
}

function getRootPathForDestination(destName) {
export function getRootPathForDestination(destName) {
// TODO: Resolve the CDK v2 destination directory
// path from the root directory
return path.join(CDK_V2_ROOT_DIR, 'destinations', destName);
}

async function getPlatformBindingsPaths() {
export async function getPlatformBindingsPaths() {
const allowedExts = ['.js'];
const bindingsPaths = [];
const bindingsPaths: string[] = [];
const bindingsDir = path.join(CDK_V2_ROOT_DIR, 'bindings');
const files = await fs.readdir(bindingsDir);
files.forEach((fileName) => {
Expand All @@ -59,14 +59,14 @@ async function getPlatformBindingsPaths() {
* Return message with workflow engine metadata
* @param {*} err
*/
function getWorkflowEngineErrorMessage(err) {
export function getWorkflowEngineErrorMessage(err) {
let errMsg = err instanceof Error ? err.message : '';

if (err instanceof WorkflowCreationError || err instanceof WorkflowExecutionError) {
errMsg = `${err.message}: Workflow: ${err.workflowName}, Step: ${err.stepName}, ChildStep: ${err.childStepName}`;

if (err instanceof WorkflowExecutionError) {
errMsg = `${errMsg}, OriginalError: ${err.originalError.message}`;
errMsg = `${errMsg}, OriginalError: ${err.originalError?.message}`;
}
}

Expand All @@ -79,22 +79,19 @@ function getWorkflowEngineErrorMessage(err) {
* @param {*} defTags default stat tags
* @returns Error type object
*/
function getErrorInfo(err, isProd, defTags) {
export function getErrorInfo(err: any, isProd: boolean, defTags) {
// Handle various CDK error types
const message = isProd ? getWorkflowEngineErrorMessage(err) : err.message;

if (err instanceof WorkflowExecutionError) {
logger.error(
`Error occurred during workflow step execution: ${getWorkflowEngineErrorMessage(err)}`,
err,
);
logger.error(`Error occurred during workflow step execution: ${message}`, err);

// Determine the error instance
let errInstance = err;
let errInstance: any = err;
if (err.originalError) {
errInstance = err.originalError;
errInstance.message = message;
errInstance.status = err.originalError.status || err.status;
errInstance.status = errInstance.status || err.status;
}

return generateErrorObject(errInstance, defTags);
Expand All @@ -111,19 +108,10 @@ function getErrorInfo(err, isProd, defTags) {
return generateErrorObject(new PlatformError(message), defTags);
}

function isCdkV2Destination(event) {
export function isCdkV2Destination(event) {
return Boolean(event?.destination?.DestinationDefinition?.Config?.cdkV2Enabled);
}

function getCdkV2TestThreshold(event) {
export function getCdkV2TestThreshold(event) {
return event.destination?.DestinationDefinition?.Config?.cdkV2TestThreshold || 0;
}

module.exports = {
getRootPathForDestination,
getWorkflowPath,
getPlatformBindingsPaths,
getErrorInfo,
isCdkV2Destination,
getCdkV2TestThreshold,
};
Loading

0 comments on commit c91dadc

Please sign in to comment.