Skip to content

Commit

Permalink
feat: splitting bloomreach destination into bloomreach and bloomreach…
Browse files Browse the repository at this point in the history
…_catalog
  • Loading branch information
manish339k committed Aug 15, 2024
1 parent eaf169f commit 8706593
Show file tree
Hide file tree
Showing 26 changed files with 1,113 additions and 649 deletions.
32 changes: 0 additions & 32 deletions src/cdk/v2/destinations/bloomreach/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,10 @@ import { getMappingConfig } from '../../../../v0/util';
export const CUSTOMER_COMMAND = 'customers';
export const CUSTOMER_EVENT_COMMAND = 'customers/events';
export const MAX_BATCH_SIZE = 50;
export const MAX_PAYLOAD_SIZE = 10000000;
export const MAX_ITEMS = 5000;

// ref:- https://documentation.bloomreach.com/engagement/reference/batch-commands-2
export const getBatchEndpoint = (apiBaseUrl: string, projectToken: string): string =>
`${apiBaseUrl}/track/v2/projects/${projectToken}/batch`;

// ref:- https://documentation.bloomreach.com/engagement/reference/bulk-update-catalog-item
export const getCreateBulkCatalogItemEndpoint = (
apiBaseUrl: string,
projectToken: string,
catalogId: string,
): string => `${apiBaseUrl}/data/v2/projects/${projectToken}/catalogs/${catalogId}/items`;

// ref:- https://documentation.bloomreach.com/engagement/reference/bulk-partial-update-catalog-item
export const getUpdateBulkCatalogItemEndpoint = (
apiBaseUrl: string,
projectToken: string,
catalogId: string,
): string =>
`${apiBaseUrl}/data/v2/projects/${projectToken}/catalogs/${catalogId}/items/partial-update`;

// ref:- https://documentation.bloomreach.com/engagement/reference/bulk-delete-catalog-items
export const getDeleteBulkCatalogItemEndpoint = (
apiBaseUrl: string,
projectToken: string,
catalogId: string,
): string =>
`${apiBaseUrl}/data/v2/projects/${projectToken}/catalogs/${catalogId}/items/bulk-delete`;

export const RecordAction = {
INSERT: 'insert',
UPDATE: 'update',
DELETE: 'delete',
};

const CONFIG_CATEGORIES = {
CUSTOMER_PROPERTIES_CONFIG: { name: 'BloomreachCustomerPropertiesConfig' },
};
Expand Down
89 changes: 25 additions & 64 deletions src/cdk/v2/destinations/bloomreach/procWorkflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,22 @@ steps:
template: |
$.context.messageType = .message.type.toLowerCase();
- name: mappedToDestination
template: |
.message.context.mappedToDestination || false;
- name: validateInput
template: |
let messageType = $.context.messageType;
$.assert(messageType, "message Type is not present. Aborting");
$.assert(messageType in {{$.EventType.([.IDENTIFY,.TRACK,.PAGE,.SCREEN])}}, "message type " + messageType + " is not supported");
$.assertConfig(.destination.Config.apiBaseUrl, "API Base URL is not present. Aborting");
$.assertConfig(.destination.Config.apiKey, "API Key is not present . Aborting");
$.assertConfig(.destination.Config.apiSecret, "API Secret is not present. Aborting");
$.assertConfig(.destination.Config.projectToken, "Project Token is not present. Aborting");
steps:
- name: validateContactAndEventsInput
condition: $.outputs.mappedToDestination === false
template: |
let messageType = $.context.messageType;
$.assert(messageType in {{$.EventType.([.IDENTIFY,.TRACK,.PAGE,.SCREEN])}}, "message type " + messageType + " is not supported");
$.assertConfig(.destination.Config.hardID, "Hard ID is not present. Aborting");
$.assertConfig(.destination.Config.softID, "Soft ID is not present. Aborting");
$.assert(.message.timestamp ?? .message.originalTimestamp, "Timestamp is not present. Aborting");
const userId = .message.().(
$.assertConfig(.destination.Config.hardID, "Hard ID is not present. Aborting");
$.assertConfig(.destination.Config.softID, "Soft ID is not present. Aborting");
$.assert(.message.timestamp ?? .message.originalTimestamp, "Timestamp is not present. Aborting");
const userId = .message.().(
{{{{$.getGenericPaths("userIdOnly")}}}};
);
$.assert(userId || .message.anonymousId, "Either one of userId or anonymousId is required. Aborting");
- name: validateCatalogInput
condition: $.outputs.mappedToDestination
template: |
let messageType = $.context.messageType;
$.assert(messageType in {{$.EventType.([.RECORD])}}, "message type " + messageType + " is not supported");
$.assertConfig(.destination.Config.catalogID, "Catalog Id is not present. Aborting");
const action = .message.action;
const item_id = .message.fields.item_id;
$.assert(action in {{$.RecordAction.([.INSERT,.UPDATE,.DELETE])}}, "message action " + action + " is not supported");
$.assert(item_id, "Item Id is required. Aborting");
);
$.assert(userId || .message.anonymousId, "Either one of userId or anonymousId is required. Aborting");
- name: prepareIdentifyPayload
condition: $.context.messageType === {{$.EventType.IDENTIFY}}
Expand All @@ -76,9 +57,7 @@ steps:
properties
});
$.context.payload = $.removeUndefinedAndNullValues({name: $.CUSTOMER_COMMAND, data});
$.context.endpoint = $.getBatchEndpoint(.destination.Config.apiBaseUrl, .destination.Config.projectToken);
$.context.method = 'POST';
$.context.payload = $.removeUndefinedAndNullValues({name: $.CUSTOMER_COMMAND, data})
- name: prepareEventName
steps:
Expand Down Expand Up @@ -109,7 +88,7 @@ steps:
$.context.event = .message.event
- name: prepareTrackPageScreenPayload
condition: $.context.messageType !== {{$.EventType.IDENTIFY}} && $.context.messageType !== {{$.EventType.RECORD}}
condition: $.context.messageType !== {{$.EventType.IDENTIFY}}
template: |
const customerIDs = $.prepareCustomerIDs(.message, .destination);
const data = .message.().({
Expand All @@ -119,40 +98,22 @@ steps:
"event_type": $.context.event,
});
$.context.payload = $.removeUndefinedAndNullValues({name: $.CUSTOMER_EVENT_COMMAND, data});
$.context.endpoint = $.getBatchEndpoint(.destination.Config.apiBaseUrl, .destination.Config.projectToken);
$.context.method = 'POST';
- name: prepareRecordPayload
condition: $.context.messageType === {{$.EventType.RECORD}}
steps:
- name: prepareInsertActionPayload
condition: .message.action === {{$.RecordAction.INSERT}}
template: |
$.context.payload = $.prepareRecordInsertOrUpdatePayload(.message.fields);
$.context.endpoint = $.getCreateBulkCatalogItemEndpoint(.destination.Config.apiBaseUrl, .destination.Config.projectToken, .destination.Config.catalogID);
$.context.method = 'PUT';
- name: prepareUpdateActionPayload
condition: .message.action === {{$.RecordAction.UPDATE}}
template: |
$.context.payload = $.prepareRecordInsertOrUpdatePayload(.message.fields);
$.context.endpoint = $.getUpdateBulkCatalogItemEndpoint(.destination.Config.apiBaseUrl, .destination.Config.projectToken, .destination.Config.catalogID);
$.context.method = 'POST';
- name: prepareDeleteActionPayload
condition: .message.action === {{$.RecordAction.DELETE}}
template: |
$.context.payload = .message.fields.item_id;
$.context.endpoint = $.getDeleteBulkCatalogItemEndpoint(.destination.Config.apiBaseUrl, .destination.Config.projectToken, .destination.Config.catalogID);
$.context.method = 'DELETE';
$.context.payload = $.removeUndefinedAndNullValues({name: $.CUSTOMER_EVENT_COMMAND, data})
- name: buildResponse
description: In batchMode we return payload directly
condition: $.batchMode
template: |
const response = $.defaultRequestConfig();
response.body.JSON = $.context.payload;
response.endpoint = $.context.endpoint;
response.method = $.context.method;
response.headers = {
"Content-Type": "application/json",
"Authorization": "Basic " + $.base64Convertor(.destination.Config.apiKey + ":" + .destination.Config.apiSecret)
}
response;
$.context.payload
else:
name: buildResponseForProcessTransformation
template: |
const response = $.defaultRequestConfig();
response.body.JSON = $.context.payload;
response.endpoint = $.getBatchEndpoint(.destination.Config.apiBaseUrl, .destination.Config.projectToken);
response.method = "POST";
response.headers = {
"Content-Type": "application/json",
"Authorization": "Basic " + $.base64Convertor(.destination.Config.apiKey + ":" + .destination.Config.apiSecret)
}
response;
36 changes: 32 additions & 4 deletions src/cdk/v2/destinations/bloomreach/rtWorkflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ steps:
- name: successfulEvents
template: |
$.outputs.transform#idx.output.({
"message": .[],
"destination": ^ [idx].destination,
"metadata": ^ [idx].metadata
"batchedRequest": .,
"batched": false,
"destination": ^[idx].destination,
"metadata": ^[idx].metadata,
"statusCode": 200
})[]
- name: failedEvents
Expand All @@ -41,7 +43,33 @@ steps:
- name: batchSuccessfulEvents
description: Batches the successfulEvents
template: |
$.batchResponseBuilder($.outputs.successfulEvents);
let batches = $.BatchUtils.chunkArrayBySizeAndLength(
$.outputs.successfulEvents, {maxItems: $.MAX_BATCH_SIZE}).items;
batches@batch.({
"batchedRequest": {
"body": {
"JSON": {"commands": ~r batch.batchedRequest[]},
"JSON_ARRAY": {},
"XML": {},
"FORM": {}
},
"version": "1",
"type": "REST",
"method": "POST",
"endpoint": batch[0].destination.Config.().($.getBatchEndpoint(.apiBaseUrl, .projectToken)),
"headers": batch[0].destination.Config.().({
"Content-Type": "application/json",
"Authorization": "Basic " + $.base64Convertor(.apiKey + ":" + .apiSecret)
}),
"params": {},
"files": {}
},
"metadata": ~r batch.metadata[],
"batched": true,
"statusCode": 200,
"destination": batch[0].destination
})[];
- name: finalPayload
template: |
Expand Down
97 changes: 0 additions & 97 deletions src/cdk/v2/destinations/bloomreach/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import lodash from 'lodash';
import { BatchUtils } from '@rudderstack/workflow-engine';
import { isObject, isEmptyObject, getIntegrationsObj } from '../../../../v0/util';
import { RudderMessage, Destination } from '../../../../types';
import { MAX_BATCH_SIZE, MAX_ITEMS, MAX_PAYLOAD_SIZE } from './config';

const getCustomerIDsFromIntegrationObject = (message: RudderMessage): any => {
const integrationObj = getIntegrationsObj(message, 'bloomreach' as any) || {};
Expand Down Expand Up @@ -32,97 +29,3 @@ export const prepareCustomerIDs = (message: RudderMessage, destination: Destinat
};
return customerIDs;
};

export const prepareRecordInsertOrUpdatePayload = (fields: any): any => {
// eslint-disable-next-line @typescript-eslint/naming-convention
const { item_id, ...properties } = fields;
return { item_id, properties };
};

const mergeMetadata = (batch: any[]) => batch.map((event) => event.metadata);

const getMergedEvents = (batch: any[]) => batch.map((event) => event.message[0].body.JSON);

const buildBatchedRequest = (
batch: any[],
constants: {
version: any;
type: any;
method: any;
headers: any;
destination: any;
endPoint: any;
} | null,
endpoint: string,
batchEvent: boolean,
) => ({
batchedRequest: {
body: {
JSON: batchEvent ? { commands: getMergedEvents(batch) } : {},
JSON_ARRAY: batchEvent ? {} : { batch: getMergedEvents(batch) },
XML: {},
FORM: {},
},
version: '1',
type: 'REST',
method: constants?.method,
endpoint,
headers: constants?.headers,
params: {},
files: {},
},
metadata: mergeMetadata(batch),
batched: true,
statusCode: 200,
destination: batch[0].destination,
});

const initializeConstants = (successfulEvents: any[]) => {
if (successfulEvents.length === 0) return null;
return {
version: successfulEvents[0].message[0].version,
type: successfulEvents[0].message[0].type,
method: successfulEvents[0].message[0].method,
headers: successfulEvents[0].message[0].headers,
destination: successfulEvents[0].destination,
endPoint: successfulEvents[0].message[0].endpoint,
};
};

export const batchResponseBuilder = (events: any): any => {
const response: any[] = [];
let constants = initializeConstants(events);
if (!constants) return [];
const eventsGroupByEndpoint = lodash.groupBy(events, (event) => event.message[0].endpoint);

Object.keys(eventsGroupByEndpoint).forEach((eventEndPoint) => {
const batchEvent = eventEndPoint.endsWith('/batch');
if (batchEvent) {
constants = initializeConstants(eventsGroupByEndpoint[eventEndPoint]);
const bathesOfEvents = BatchUtils.chunkArrayBySizeAndLength(
eventsGroupByEndpoint[eventEndPoint],
{ maxItems: MAX_BATCH_SIZE },
);
bathesOfEvents.items.forEach((batch) => {
response.push(buildBatchedRequest(batch, constants, eventEndPoint, batchEvent));
});
} else {
constants = initializeConstants(eventsGroupByEndpoint[eventEndPoint]);
const bathesOfEvents = BatchUtils.chunkArrayBySizeAndLength(
eventsGroupByEndpoint[eventEndPoint],
{
maxSizeInBytes: MAX_PAYLOAD_SIZE,
maxItems: MAX_ITEMS,
},
);
bathesOfEvents.items.forEach((batch) => {
const requests: any = buildBatchedRequest(batch, constants, eventEndPoint, batchEvent);
requests.batchedRequest.body.JSON_ARRAY.batch = JSON.stringify(
requests.batchedRequest.body.JSON_ARRAY.batch,
);
response.push(requests);
});
}
});
return response;
};
31 changes: 31 additions & 0 deletions src/cdk/v2/destinations/bloomreach_catalog/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export const MAX_PAYLOAD_SIZE = 10000000;
export const MAX_ITEMS = 5000;

// ref:- https://documentation.bloomreach.com/engagement/reference/bulk-update-catalog-item
export const getCreateBulkCatalogItemEndpoint = (
apiBaseUrl: string,
projectToken: string,
catalogId: string,
): string => `${apiBaseUrl}/data/v2/projects/${projectToken}/catalogs/${catalogId}/items`;

// ref:- https://documentation.bloomreach.com/engagement/reference/bulk-partial-update-catalog-item
export const getUpdateBulkCatalogItemEndpoint = (
apiBaseUrl: string,
projectToken: string,
catalogId: string,
): string =>
`${apiBaseUrl}/data/v2/projects/${projectToken}/catalogs/${catalogId}/items/partial-update`;

// ref:- https://documentation.bloomreach.com/engagement/reference/bulk-delete-catalog-items
export const getDeleteBulkCatalogItemEndpoint = (
apiBaseUrl: string,
projectToken: string,
catalogId: string,
): string =>
`${apiBaseUrl}/data/v2/projects/${projectToken}/catalogs/${catalogId}/items/bulk-delete`;

export const RecordAction = {
INSERT: 'insert',
UPDATE: 'update',
DELETE: 'delete',
};
Loading

0 comments on commit 8706593

Please sign in to comment.