-
Notifications
You must be signed in to change notification settings - Fork 113
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: added bloomreach retl support (#3619)
* feat: added bloomreach retl support * fix: resolving comments * feat: added other scenario test * fix: fixing payload structure * fix: fixing network handler for catalog * fix: minor change * fix: catalog batch items * feat: splitting bloomreach destination into bloomreach and bloomreach_catalog * chore: refactoring bloomreach catalog * chore: minor changes * chore: minor changes * chore: refactoring code after suggestion
- Loading branch information
1 parent
001ee2b
commit 6b1a23a
Showing
11 changed files
with
1,119 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
export const MAX_PAYLOAD_SIZE = 10000000; | ||
export const MAX_ITEMS = 5000; | ||
|
||
// ref:- https://documentation.bloomreach.com/engagement/reference/bulk-update-catalog-item | ||
export const getCreateBulkCatalogItemEndpoint = ( | ||
apiBaseUrl: string, | ||
projectToken: string, | ||
catalogId: string, | ||
): string => `${apiBaseUrl}/data/v2/projects/${projectToken}/catalogs/${catalogId}/items`; | ||
|
||
// ref:- https://documentation.bloomreach.com/engagement/reference/bulk-partial-update-catalog-item | ||
export const getUpdateBulkCatalogItemEndpoint = ( | ||
apiBaseUrl: string, | ||
projectToken: string, | ||
catalogId: string, | ||
): string => | ||
`${apiBaseUrl}/data/v2/projects/${projectToken}/catalogs/${catalogId}/items/partial-update`; | ||
|
||
// ref:- https://documentation.bloomreach.com/engagement/reference/bulk-delete-catalog-items | ||
export const getDeleteBulkCatalogItemEndpoint = ( | ||
apiBaseUrl: string, | ||
projectToken: string, | ||
catalogId: string, | ||
): string => | ||
`${apiBaseUrl}/data/v2/projects/${projectToken}/catalogs/${catalogId}/items/bulk-delete`; | ||
|
||
export const CatalogAction = { | ||
INSERT: 'insert', | ||
UPDATE: 'update', | ||
DELETE: 'delete', | ||
}; |
42 changes: 42 additions & 0 deletions
42
src/cdk/v2/destinations/bloomreach_catalog/rtWorkflow.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
bindings: | ||
- name: EventType | ||
path: ../../../../constants | ||
- name: processRecordInputs | ||
path: ./transformRecord | ||
- name: handleRtTfSingleEventError | ||
path: ../../../../v0/util/index | ||
- name: InstrumentationError | ||
path: '@rudderstack/integrations-lib' | ||
|
||
steps: | ||
- name: validateConfig | ||
template: | | ||
const config = ^[0].destination.Config | ||
$.assertConfig(config.apiBaseUrl, "API Base URL is not present. Aborting"); | ||
$.assertConfig(config.apiKey, "API Key is not present . Aborting"); | ||
$.assertConfig(config.apiSecret, "API Secret is not present. Aborting"); | ||
$.assertConfig(config.projectToken, "Project Token is not present. Aborting"); | ||
$.assertConfig(config.catalogID, "Catalog Id is not present. Aborting"); | ||
- name: validateInput | ||
template: | | ||
$.assert(Array.isArray(^) && ^.length > 0, "Invalid event array") | ||
- name: processRecordEvents | ||
template: | | ||
$.processRecordInputs(^.{.message.type === $.EventType.RECORD}[], ^[0].destination) | ||
- name: failOtherEvents | ||
template: | | ||
const otherEvents = ^.{.message.type !== $.EventType.RECORD}[] | ||
let failedEvents = otherEvents.map( | ||
function(event) { | ||
const error = new $.InstrumentationError("Event type " + event.message.type + " is not supported"); | ||
$.handleRtTfSingleEventError(event, error, {}) | ||
} | ||
) | ||
failedEvents ?? [] | ||
- name: finalPayload | ||
template: | | ||
[...$.outputs.processRecordEvents, ...$.outputs.failOtherEvents] |
93 changes: 93 additions & 0 deletions
93
src/cdk/v2/destinations/bloomreach_catalog/transformRecord.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import { InstrumentationError } from '@rudderstack/integrations-lib'; | ||
import { CatalogAction } from './config'; | ||
import { batchResponseBuilder } from './utils'; | ||
|
||
import { handleRtTfSingleEventError, isEmptyObject } from '../../../../v0/util'; | ||
|
||
const prepareCatalogInsertOrUpdatePayload = (fields: any): any => { | ||
// eslint-disable-next-line @typescript-eslint/naming-convention | ||
const { item_id, ...properties } = fields; | ||
return { item_id, properties }; | ||
}; | ||
|
||
const processEvent = (event: any) => { | ||
const { message } = event; | ||
const { fields, action } = message; | ||
const response = { | ||
action, | ||
payload: null, | ||
}; | ||
if (isEmptyObject(fields)) { | ||
throw new InstrumentationError('`fields` cannot be empty'); | ||
} | ||
if (!fields.item_id) { | ||
throw new InstrumentationError('`item_id` cannot be empty'); | ||
} | ||
if (action === CatalogAction.INSERT || action === CatalogAction.UPDATE) { | ||
response.payload = prepareCatalogInsertOrUpdatePayload(fields); | ||
} else if (action === CatalogAction.DELETE) { | ||
response.payload = fields.item_id; | ||
} else { | ||
throw new InstrumentationError( | ||
`Invalid action type ${action}. You can only add, update or remove items from the catalog`, | ||
); | ||
} | ||
return response; | ||
}; | ||
|
||
const getEventChunks = ( | ||
input: any, | ||
insertItemRespList: any[], | ||
updateItemRespList: any[], | ||
deleteItemRespList: any[], | ||
) => { | ||
switch (input.response.action) { | ||
case CatalogAction.INSERT: | ||
insertItemRespList.push({ payload: input.response.payload, metadata: input.metadata }); | ||
break; | ||
case CatalogAction.UPDATE: | ||
updateItemRespList.push({ payload: input.response.payload, metadata: input.metadata }); | ||
break; | ||
case CatalogAction.DELETE: | ||
deleteItemRespList.push({ payload: input.response.payload, metadata: input.metadata }); | ||
break; | ||
default: | ||
throw new InstrumentationError(`Invalid action type ${input.response.action}`); | ||
} | ||
}; | ||
|
||
export const processRecordInputs = (inputs: any[], destination: any) => { | ||
const insertItemRespList: any[] = []; | ||
const updateItemRespList: any[] = []; | ||
const deleteItemRespList: any[] = []; | ||
const batchErrorRespList: any[] = []; | ||
|
||
if (!inputs || inputs.length === 0) { | ||
return []; | ||
} | ||
|
||
inputs.forEach((input) => { | ||
try { | ||
getEventChunks( | ||
{ | ||
response: processEvent(input), | ||
metadata: input.metadata, | ||
}, | ||
insertItemRespList, | ||
updateItemRespList, | ||
deleteItemRespList, | ||
); | ||
} catch (error) { | ||
const errRespEvent = handleRtTfSingleEventError(input, error, {}); | ||
batchErrorRespList.push(errRespEvent); | ||
} | ||
}); | ||
|
||
const batchSuccessfulRespList = batchResponseBuilder( | ||
insertItemRespList, | ||
updateItemRespList, | ||
deleteItemRespList, | ||
destination, | ||
); | ||
return [...batchSuccessfulRespList, ...batchErrorRespList]; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
import { BatchUtils } from '@rudderstack/workflow-engine'; | ||
import { base64Convertor } from '@rudderstack/integrations-lib'; | ||
import { | ||
getCreateBulkCatalogItemEndpoint, | ||
getDeleteBulkCatalogItemEndpoint, | ||
getUpdateBulkCatalogItemEndpoint, | ||
MAX_ITEMS, | ||
MAX_PAYLOAD_SIZE, | ||
} from './config'; | ||
|
||
const buildBatchedRequest = ( | ||
payload: string, | ||
method: string, | ||
endpoint: string, | ||
headers: any, | ||
metadata: any, | ||
destination: any, | ||
) => ({ | ||
batchedRequest: { | ||
body: { | ||
JSON: {}, | ||
JSON_ARRAY: { batch: payload }, | ||
XML: {}, | ||
FORM: {}, | ||
}, | ||
version: '1', | ||
type: 'REST', | ||
method, | ||
endpoint, | ||
headers, | ||
params: {}, | ||
files: {}, | ||
}, | ||
metadata, | ||
batched: true, | ||
statusCode: 200, | ||
destination, | ||
}); | ||
|
||
const getHeaders = (destination: any) => ({ | ||
'Content-Type': 'application/json', | ||
Authorization: `Basic ${base64Convertor(`${destination.Config.apiKey}:${destination.Config.apiSecret}`)}`, | ||
}); | ||
|
||
// returns merged metadata for a batch | ||
const getMergedMetadata = (batch: any[]) => batch.map((input) => input.metadata); | ||
|
||
// returns merged payload for a batch | ||
const getMergedEvents = (batch: any[]) => batch.map((input) => input.payload); | ||
|
||
// builds final batched response for insert action records | ||
const insertItemBatchResponseBuilder = (insertItemRespList: any[], destination: any) => { | ||
const insertItemBatchedResponse: any[] = []; | ||
|
||
const method = 'PUT'; | ||
const endpoint = getCreateBulkCatalogItemEndpoint( | ||
destination.Config.apiBaseUrl, | ||
destination.Config.projectToken, | ||
destination.Config.catalogID, | ||
); | ||
const headers = getHeaders(destination); | ||
|
||
const batchesOfEvents = BatchUtils.chunkArrayBySizeAndLength(insertItemRespList, { | ||
maxSizeInBytes: MAX_PAYLOAD_SIZE, | ||
maxItems: MAX_ITEMS, | ||
}); | ||
batchesOfEvents.items.forEach((batch: any) => { | ||
const mergedPayload = JSON.stringify(getMergedEvents(batch)); | ||
const mergedMetadata = getMergedMetadata(batch); | ||
insertItemBatchedResponse.push( | ||
buildBatchedRequest(mergedPayload, method, endpoint, headers, mergedMetadata, destination), | ||
); | ||
}); | ||
return insertItemBatchedResponse; | ||
}; | ||
|
||
// builds final batched response for update action records | ||
const updateItemBatchResponseBuilder = (updateItemRespList: any[], destination: any) => { | ||
const updateItemBatchedResponse: any[] = []; | ||
|
||
const method = 'POST'; | ||
const endpoint = getUpdateBulkCatalogItemEndpoint( | ||
destination.Config.apiBaseUrl, | ||
destination.Config.projectToken, | ||
destination.Config.catalogID, | ||
); | ||
const headers = getHeaders(destination); | ||
|
||
const batchesOfEvents = BatchUtils.chunkArrayBySizeAndLength(updateItemRespList, { | ||
maxSizeInBytes: MAX_PAYLOAD_SIZE, | ||
maxItems: MAX_ITEMS, | ||
}); | ||
batchesOfEvents.items.forEach((batch: any) => { | ||
const mergedPayload = JSON.stringify(getMergedEvents(batch)); | ||
const mergedMetadata = getMergedMetadata(batch); | ||
updateItemBatchedResponse.push( | ||
buildBatchedRequest(mergedPayload, method, endpoint, headers, mergedMetadata, destination), | ||
); | ||
}); | ||
return updateItemBatchedResponse; | ||
}; | ||
|
||
// builds final batched response for delete action records | ||
const deleteItemBatchResponseBuilder = (deleteItemRespList: any[], destination: any) => { | ||
const deleteItemBatchedResponse: any[] = []; | ||
|
||
const method = 'DELETE'; | ||
const endpoint = getDeleteBulkCatalogItemEndpoint( | ||
destination.Config.apiBaseUrl, | ||
destination.Config.projectToken, | ||
destination.Config.catalogID, | ||
); | ||
const headers = getHeaders(destination); | ||
|
||
const batchesOfEvents = BatchUtils.chunkArrayBySizeAndLength(deleteItemRespList, { | ||
maxSizeInBytes: MAX_PAYLOAD_SIZE, | ||
maxItems: MAX_ITEMS, | ||
}); | ||
batchesOfEvents.items.forEach((batch: any) => { | ||
const mergedPayload = JSON.stringify(getMergedEvents(batch)); | ||
const mergedMetadata = getMergedMetadata(batch); | ||
deleteItemBatchedResponse.push( | ||
buildBatchedRequest(mergedPayload, method, endpoint, headers, mergedMetadata, destination), | ||
); | ||
}); | ||
return deleteItemBatchedResponse; | ||
}; | ||
|
||
// returns final batched response | ||
export const batchResponseBuilder = ( | ||
insertItemRespList: any, | ||
updateItemRespList: any, | ||
deleteItemRespList: any, | ||
destination: any, | ||
) => { | ||
const response: any[] = []; | ||
if (insertItemRespList.length > 0) { | ||
response.push(...insertItemBatchResponseBuilder(insertItemRespList, destination)); | ||
} | ||
if (updateItemRespList.length > 0) { | ||
response.push(...updateItemBatchResponseBuilder(updateItemRespList, destination)); | ||
} | ||
if (deleteItemRespList.length > 0) { | ||
response.push(...deleteItemBatchResponseBuilder(deleteItemRespList, destination)); | ||
} | ||
return response; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.