Skip to content

Commit

Permalink
chore: code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir-4116 committed Nov 6, 2023
1 parent bcfa4b7 commit 0ea8b2b
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 65 deletions.
89 changes: 64 additions & 25 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"@koa/router": "^12.0.0",
"@ndhoule/extend": "^2.0.0",
"@pyroscope/nodejs": "^0.2.6",
"@rudderstack/workflow-engine": "^0.5.7",
"@rudderstack/workflow-engine": "^0.6.8",
"ajv": "^8.12.0",
"ajv-draft-04": "^1.0.0",
"ajv-formats": "^2.1.1",
Expand Down
5 changes: 2 additions & 3 deletions src/cdk/v2/destinations/gladly/procWorkflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ bindings:
path: ../../../../constants
- path: ./utils
exportAll: true
- name: isDefinedAndNotNull
path: ../../../../v0/util
- name: defaultRequestConfig
path: ../../../../v0/util
- name: removeUndefinedAndNullValues
Expand Down Expand Up @@ -45,7 +43,7 @@ steps:
image: .message.context.traits.avatar || .message.traits.avatar,
address: .message.context.traits.address || .message.traits.address
}
$.context.payload.address ? $.context.payload.address = $.context.payload.address.toString()
$.context.payload.address && typeof $.context.payload.address === "object" ? $.context.payload.address = JSON.stringify($.context.payload.address)
$.context.payload.emails = $.formatField(.message, "email")
$.context.payload.phones = $.formatField(.message, "phone")
$.context.payload.customAttributes = $.getCustomAttributes(.message)
Expand All @@ -62,6 +60,7 @@ steps:
const endpoint = $.getEndpoint(.destination)
const rawResponse = await $.httpPOST(endpoint, requestPayload, requestOptions)
const processedResponse = $.processAxiosResponse(rawResponse)
processedResponse.status == 400 ? $.assertHttpResp(processedResponse, "Unable to create or update user due to " + JSON.stringify(processedResponse.response));
processedResponse
- name: buildResponseForProcessTransformation
Expand Down
6 changes: 4 additions & 2 deletions src/cdk/v2/destinations/gladly/rtWorkflow.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
bindings:
- name: handleRtTfSingleEventError
path: ../../../../v0/util/index
- path: ./utils
exportAll: true

steps:
- name: validateInput
Expand All @@ -15,11 +17,11 @@ steps:
- name: successfulEvents
template: |
$.outputs.transform#idx.output.({
"batchedRequest": .,
"batchedRequest": $.getPayload(.),
"batched": false,
"destination": ^[idx].destination,
"metadata": ^[idx].metadata[],
"statusCode": 200
"statusCode": $.getStatusCode($.requestMetadata, $.outputs.transform#idx.output.status)
})[]
- name: failedEvents
template: |
Expand Down
44 changes: 16 additions & 28 deletions src/cdk/v2/destinations/gladly/utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { getFieldValueFromMessage, base64Convertor, getDestinationExternalID } = require('../../../../v0/util');
const { getFieldValueFromMessage, base64Convertor, isNewStatusCodesAccepted } = require('../../../../v0/util');
const { HTTP_STATUS_CODES } = require('../../../../v0/util/constant');

const lookUpFields = ['email', 'phone', 'externalCustomerId'];
const reservedCustomAttributes = [
'email',
'phone',
Expand Down Expand Up @@ -36,31 +36,6 @@ const formatField = (message, fieldName) => {
return undefined;
};

const getQueryParams = (message, integrationsObj) => {
let queryParamKey;
let queryParamValue;
if (integrationsObj && integrationsObj.lookup){
queryParamKey = integrationsObj.lookup;
}
if (!queryParamKey) {
queryParamKey = 'email';
}
if (!lookUpFields.includes(queryParamKey)) {
queryParamKey = 'email';
}

if (queryParamKey === 'email' || queryParamKey === 'phone') {
queryParamValue = getFieldValueFromMessage(message,queryParamKey);
if (!queryParamValue) return undefined;
} else{
queryParamValue = getDestinationExternalID(message, 'gladlyExternalCustomerId');
if (!queryParamValue) return undefined;
}

queryParamKey = queryParamKey === 'phone' ? 'phoneNumber' : queryParamKey;
return { key: queryParamKey, value: queryParamValue };
};

const getCustomAttributes = (message) => {
const customAttributes = message.context.traits;
reservedCustomAttributes.forEach((customAttribute) => {
Expand All @@ -71,4 +46,17 @@ const getCustomAttributes = (message) => {
return customAttributes;
};

module.exports = { formatField, getCustomAttributes, getEndpoint, getQueryParams, getHeaders };
const getStatusCode = (requestMetadata, statusCode) => {
if(isNewStatusCodesAccepted(requestMetadata) && statusCode === 200){
return HTTP_STATUS_CODES.SUPPRESS_EVENTS;
}
return statusCode;
}

const getPayload = (eventPayload) => {
const payload = eventPayload;
delete payload.status;
return payload;
}

module.exports = { formatField, getCustomAttributes, getEndpoint, getHeaders, getStatusCode, getPayload };
7 changes: 4 additions & 3 deletions src/cdk/v2/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ export function getCachedWorkflowEngine(
return workflowEnginePromiseMap[destName][feature];
}

export async function executeWorkflow(workflowEngine: WorkflowEngine, parsedEvent: FixMe) {
export async function executeWorkflow(workflowEngine: WorkflowEngine, parsedEvent: FixMe, requestMetadata: NonNullable<unknown>){
try {
const result = await workflowEngine.execute(parsedEvent);
const result = await workflowEngine.execute(parsedEvent, {requestMetadata});
// TODO: Handle remaining output scenarios
return result.output;
} catch (error) {
Expand All @@ -78,11 +78,12 @@ export async function processCdkV2Workflow(
destType: string,
parsedEvent: FixMe,
feature: string,
requestMetadata: NonNullable<unknown> = {},
bindings: Record<string, FixMe> = {},
) {
try {
const workflowEngine = await getCachedWorkflowEngine(destType, feature, bindings);
return await executeWorkflow(workflowEngine, parsedEvent);
return await executeWorkflow(workflowEngine, parsedEvent, requestMetadata);
} catch (error) {
throw getErrorInfo(error, isCdkV2Destination(parsedEvent), defTags);
}
Expand Down
7 changes: 4 additions & 3 deletions src/services/destination/cdkV2Integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export default class CDKV2DestinationService implements IntegrationDestinationSe
events: ProcessorTransformationRequest[],
destinationType: string,
_version: string,
_requestMetadata: NonNullable<unknown>,
requestMetadata: NonNullable<unknown>,
): Promise<ProcessorTransformationResponse[]> {
// TODO: Change the promise type
const respList: ProcessorTransformationResponse[][] = await Promise.all(
Expand All @@ -64,6 +64,7 @@ export default class CDKV2DestinationService implements IntegrationDestinationSe
destinationType,
event,
tags.FEATURES.PROCESSOR,
requestMetadata
);

stats.increment('event_transform_success', {
Expand Down Expand Up @@ -108,7 +109,7 @@ export default class CDKV2DestinationService implements IntegrationDestinationSe
events: RouterTransformationRequestData[],
destinationType: string,
_version: string,
_requestMetadata: NonNullable<unknown>,
requestMetadata: NonNullable<unknown>,
): Promise<RouterTransformationResponse[]> {
const allDestEvents: object = groupBy(
events,
Expand All @@ -126,7 +127,7 @@ export default class CDKV2DestinationService implements IntegrationDestinationSe
metaTo.metadata = destInputArray[0].metadata;
try {
const doRouterTransformationResponse: RouterTransformationResponse[] =
await processCdkV2Workflow(destinationType, destInputArray, tags.FEATURES.ROUTER);
await processCdkV2Workflow(destinationType, destInputArray, tags.FEATURES.ROUTER, requestMetadata);
return DestinationPostTransformationService.handleRouterTransformSuccessEvents(
doRouterTransformationResponse,
undefined,
Expand Down

0 comments on commit 0ea8b2b

Please sign in to comment.