Skip to content

Commit

Permalink
fix: edits after e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed Jun 27, 2024
1 parent b97bac1 commit bab17e6
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 105 deletions.
9 changes: 6 additions & 3 deletions src/controllers/util/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ import {
RudderMessage,
SourceInput,
} from '../../types';
import { getValueFromMessage } from '../../v0/util';
import { getValueFromMessage, getDestinationExternalIDInfoForRetl } from '../../v0/util';
import genericFieldMap from '../../v0/util/data/GenericFieldMapping.json';
import { EventType, MappedToDestinationKey } from '../../constants';
import { event } from '../../warehouse/config/WHExtractEventTableConfig';
import { getDestinationExternalIDInfoForRetl } from '../../v0/util';

type RECORD_EVENT = {

Check failure on line 18 in src/controllers/util/index.ts

View workflow job for this annotation

GitHub Actions / Check for formatting & lint errors

Type Alias name `RECORD_EVENT` must match one of the following formats: PascalCase
type: 'record';
Expand Down Expand Up @@ -114,6 +112,7 @@ export class ControllerUtility {
return { ...event, message: newMsg };
});
}

public static transformToRecordEvent(
events: Array<ProcessorTransformationRequest | RouterTransformationRequestData>,
) {
Expand Down Expand Up @@ -155,6 +154,7 @@ export class ControllerUtility {
});
return events;
}

public static createExternalId(eventMessage: RudderMessage, destName: string) {
const type = eventMessage.type;
if (!eventMessage.context['externalId']) {

Check failure on line 160 in src/controllers/util/index.ts

View workflow job for this annotation

GitHub Actions / Check for formatting & lint errors

Use destructured variables over properties
Expand All @@ -167,6 +167,7 @@ export class ControllerUtility {
}
return eventMessage.context['externalId'];

Check failure on line 168 in src/controllers/util/index.ts

View workflow job for this annotation

GitHub Actions / Check for formatting & lint errors

Use destructured variables over properties
}

public static getActionForRecordEvent(eventMessage: RudderMessage): string {
const type = eventMessage.type;
if (type === EventType.RECORD) {
Expand Down Expand Up @@ -202,6 +203,7 @@ export class ControllerUtility {
// get the fields from the agnostic config
return ControllerUtility.translateFromAgnosticConfig(eventTypeName, destName, eventMessage);
}

public static translateFromAgnosticConfig(
eventTypeName: string,
destName: string,
Expand All @@ -222,6 +224,7 @@ export class ControllerUtility {
const mappedEvent: any = {};

agnosticConfig[object].forEach((fieldMapping) => {
// eslint-disable-next-line no-restricted-syntax
for (const sourceKey of fieldMapping.sourceKeys) {
const value = ControllerUtility.getNestedValue(eventMessage, sourceKey);
if (value !== undefined) {
Expand Down
39 changes: 20 additions & 19 deletions src/v0/destinations/hs/HSTransform-v3.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,27 @@ const {
BATCH_IDENTIFY_CRM_UPDATE_CONTACT,
TRACK_CRM_ENDPOINT,
BATCH_CREATE_CUSTOM_OBJECTS,
BATCH_UPDATE_CUSTOM_OBJECTS
} = require('./config')
BATCH_UPDATE_CUSTOM_OBJECTS,
} = require('./config');

const endpointMapping = {
'identify': {
'insert': BATCH_IDENTIFY_CRM_CREATE_NEW_CONTACT,
'update': BATCH_IDENTIFY_CRM_UPDATE_CONTACT
identify: {
insert: BATCH_IDENTIFY_CRM_CREATE_NEW_CONTACT,
update: BATCH_IDENTIFY_CRM_UPDATE_CONTACT,
},
'contacts': {
'insert': BATCH_IDENTIFY_CRM_CREATE_NEW_CONTACT,
'update': BATCH_IDENTIFY_CRM_UPDATE_CONTACT
contacts: {
insert: BATCH_IDENTIFY_CRM_CREATE_NEW_CONTACT,
update: BATCH_IDENTIFY_CRM_UPDATE_CONTACT,
},
'track' : {
'insert' : TRACK_CRM_ENDPOINT,
'update' : TRACK_CRM_ENDPOINT
track: {
insert: TRACK_CRM_ENDPOINT,
update: TRACK_CRM_ENDPOINT,
},
'others' : {
'insert' : BATCH_CREATE_CUSTOM_OBJECTS,
'update' : BATCH_UPDATE_CUSTOM_OBJECTS
}
}
others: {
insert: BATCH_CREATE_CUSTOM_OBJECTS,
update: BATCH_UPDATE_CUSTOM_OBJECTS,
},
};
const getDestinationExternalIDInfoForRetl = (message, destination) => {
let externalIdArray = [];
let destinationExternalId = null;
Expand Down Expand Up @@ -73,11 +74,11 @@ export const processSingleAgnosticEvent = (message) => {
let operation;
const { action, fields } = message;
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'HS');

if (objectType === 'identify' || objectType === 'contacts' || objectType === 'track') {
endPoint = endpointMapping[objectType][action]
endPoint = endpointMapping[objectType][action];
} else {
endPoint = endpointMapping['others'][action].replace(':objectType', objectType)
endPoint = endpointMapping.others[action].replace(':objectType', objectType);
}
if (action === 'insert') {
// endPoint = 'https://api.hubapi.com/crm/v3/objects/contacts/batch/create';
Expand Down
17 changes: 4 additions & 13 deletions src/v0/destinations/hs/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,10 @@ const processRouterDest = async (inputs, reqMetadata) => {
tempInputs.map(async (input) => {
try {
const compositePayload = await processSingleAgnosticEvent(input.message, destination);

receivedResponse = Array.isArray(compositePayload)
? compositePayload
: [compositePayload];

// received response can be in array format [{}, {}, {}, ..., {}]
// if multiple response is being returned
receivedResponse.forEach((element) => {
successRespList.push({
message: element,
metadata: input.metadata,
destination,
});
successRespList.push({
message: compositePayload,
metadata: input.metadata,
destination,
});
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(input, error, reqMetadata);
Expand Down
96 changes: 26 additions & 70 deletions src/v0/destinations/hs/utilv2.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
const { TransformationError } = require('@rudderstack/integrations-lib');
// const { TransformationError } = require('@rudderstack/integrations-lib');
// const { get } = require('lodash');
const lodash = require('lodash');
const {
defaultBatchRequestConfig,
defaultPostRequestConfig,
getSuccessRespEvents,
} = require('../../util');
const { defaultBatchRequestConfig, getSuccessRespEvents } = require('../../util');

const {
BATCH_IDENTIFY_CRM_CREATE_NEW_CONTACT,
BATCH_IDENTIFY_CRM_UPDATE_CONTACT,
MAX_BATCH_SIZE,
} = require('./config');

const batchIdentify2 = (
Expand All @@ -18,52 +15,19 @@ const batchIdentify2 = (
batchOperation,
endPoint,
destinationObject,
metadaDataArray,
) => {
// list of chunks [ [..], [..] ]
arrayChunksIdentify.forEach((chunk) => {
const identifyResponseList = [];
const metadata = [];

// extracting message, destination value
// from the first event in a batch
// const { message, destination } = chunk[0];

let batchEventResponse = defaultBatchRequestConfig();
batchEventResponse.batchedRequest.endpoint = endPoint;

if (batchOperation === 'createObject') {
batchEventResponse.batchedRequest.endpoint = endPoint;

// create operation
chunk.forEach((ev) => {
identifyResponseList.push({ ...ev.tempPayload });
metadata.push(ev.metadata);
});
} else if (batchOperation === 'updateObject') {
batchEventResponse.batchedRequest.endpoint = endPoint;
// update operation
chunk.forEach((ev) => {
identifyResponseList.push(ev);

metadata.push(ev.metadata);
});
} else if (batchOperation === 'createContacts') {
// create operation
chunk.forEach((ev) => {
// appending unique events
identifyResponseList.push(ev);
// }
metadata.push(ev.metadata);
});
} else if (batchOperation === 'updateContacts') {
// update operation
chunk.forEach((ev) => {
identifyResponseList.push(...ev.tempPayload);
// }
metadata.push(ev.metadata);
});
} else {
throw new TransformationError('Unknown hubspot operation', 400);
}
// create operation
chunk.forEach((ev) => {
identifyResponseList.push(ev);
});

batchEventResponse.batchedRequest.body.JSON = {
inputs: identifyResponseList,
Expand All @@ -78,11 +42,10 @@ const batchIdentify2 = (
batchEventResponse.batchedRequest.headers = {
Authorization: `Bearer ${destinationObject.Config.accessToken}`,
};
// batchEventResponse.batchedRequest.params = message.params;

batchEventResponse = {
...batchEventResponse,
metadata,
metadata: metadaDataArray,
destinationObject,
};
batchedResponseList.push(
Expand All @@ -104,50 +67,42 @@ const batchEvents2 = (destEvents) => {
// rETL specific chunk
const createAllObjectsEventChunk = [];
const updateAllObjectsEventChunk = [];
let maxBatchSize;
const metadataCreateArray = [];
const metadataUpdateArray = [];
let endPoint;

destEvents.forEach((event) => {
// handler for track call
// track call does not have batch endpoint
const { message, metadata, destination } = event;
staticDestObject = destination;

if (message.operation === 'create') {
createAllObjectsEventChunk.push(message.tempPayload);
metadataCreateArray.push(metadata);
} else if (message.operation === 'update') {
updateAllObjectsEventChunk.push(message.tempPayload);
metadataUpdateArray.push(metadata);
}

const batchedResponse = defaultBatchRequestConfig();
batchedResponse.batchedRequest.headers = {
Authorization: `Bearer ${destination.Config.accessToken}`,
};
batchedResponse.batchedRequest.endpoint = message.endPoint;
batchedResponse.batchedRequest.body = message.tempPayload;
batchedResponse.batchedRequest.method = defaultPostRequestConfig.requestMethod;
batchedResponse.metadata = [metadata];
batchedResponse.destination = destination;

trackResponseList.push(
getSuccessRespEvents(
batchedResponse.batchedRequest,
batchedResponse.metadata,
batchedResponse.destination,
),
);
// eslint-disable-next-line unicorn/consistent-destructuring
endPoint = event?.message?.endPoint;
});

const arrayChunksIdentifyCreateObjects = lodash.chunk(createAllObjectsEventChunk, maxBatchSize);
const arrayChunksIdentifyCreateObjects = lodash.chunk(createAllObjectsEventChunk, MAX_BATCH_SIZE);
const arrayChunksMetadataCreateObjects = lodash.chunk(metadataCreateArray, MAX_BATCH_SIZE);

const arrayChunksIdentifyUpdateObjects = lodash.chunk(updateAllObjectsEventChunk, maxBatchSize);
const arrayChunksIdentifyUpdateObjects = lodash.chunk(updateAllObjectsEventChunk, MAX_BATCH_SIZE);
const arrayChunksMetadataUpdateObjects = lodash.chunk(metadataUpdateArray, MAX_BATCH_SIZE);

// batching up 'create' all objects endpoint chunks
if (arrayChunksIdentifyCreateObjects.length > 0) {
batchedResponseList = batchIdentify2(
arrayChunksIdentifyCreateObjects,
batchedResponseList,
'createObject',
BATCH_IDENTIFY_CRM_CREATE_NEW_CONTACT,
endPoint,
staticDestObject,
arrayChunksMetadataCreateObjects,
);
}

Expand All @@ -157,8 +112,9 @@ const batchEvents2 = (destEvents) => {
arrayChunksIdentifyUpdateObjects,
batchedResponseList,
'updateObject',
BATCH_IDENTIFY_CRM_UPDATE_CONTACT,
endPoint,
staticDestObject,
arrayChunksMetadataUpdateObjects,
);
}

Expand Down

0 comments on commit bab17e6

Please sign in to comment.