-
Notifications
You must be signed in to change notification settings - Fork 114
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
3 changed files
with
352 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
/* eslint-disable no-nested-ternary */ | ||
const get = require('get-value'); | ||
const { ConfigurationError, InstrumentationError } = require('@rudderstack/integrations-lib'); | ||
const { | ||
defaultRequestConfig, | ||
getHashFromArray, | ||
handleRtTfSingleEventError, | ||
} = require('../../util'); | ||
const { groupEvents, removePrefix, handleMappings, removeExtraFields } = require('./utils') | ||
const { EventType } = require('../../../constants'); | ||
|
||
const buildRequestPayload = (payload, method, headers, params, endpoint) => { | ||
const response = defaultRequestConfig(); | ||
response.method = method; | ||
if (method === 'GET') { | ||
response.params = { ...params, payload }; | ||
} else { | ||
response.params = params; | ||
response.body.JSON = payload; | ||
} | ||
response.headers = headers; | ||
response.endpoint = endpoint | ||
return response; | ||
} | ||
|
||
const trackResponseBuilder = (message, destination) => { | ||
const { track } = destination.Config.eventsMapping || destination.config.eventsMapping; | ||
const respList = []; | ||
/* | ||
[ | ||
{ | ||
"trackEventName": "Product Added", | ||
"trackEndpoint": "https://www.test.com/user/{userId}", | ||
"trackMethod": "PUT", | ||
"trackHeaders": [ | ||
{ | ||
"from": "content-type", | ||
"to": "application/json" | ||
} | ||
], | ||
"trackQueryParams": [ | ||
{ | ||
"from": "queryparam", | ||
"to": "123" | ||
} | ||
], | ||
"trackPathVariables": [ | ||
{ | ||
"from": "userId", | ||
"to": "$.event.userId" | ||
} | ||
] | ||
} | ||
] | ||
*/ | ||
const eventRequest = track.filter((key) => { | ||
return message.event === key.trackEventName; | ||
}); | ||
eventRequest.forEach(request => { | ||
const { endpoint, method, headers, queryParams, pathVariables, mappings, batchSize } = request; | ||
const headersObject = handleMappings(message, headers, 'value', 'key'); | ||
const params = handleMappings(message, queryParams, 'value', 'key'); | ||
const pathVariablesObj = getHashFromArray(pathVariables, 'pathVariable', 'pathValue', false); | ||
const payload = handleMappings(message, mappings); | ||
payload.maxBatchSize = batchSize; | ||
const updatedEndpoint = endpoint.replace(/{(\w+)}/g, (_, key) => { | ||
if (!pathVariablesObj[key]) { | ||
throw new Error(`Key ${key} not found in the pathVariables`); | ||
} | ||
return get(message, removePrefix(pathVariablesObj[key])); | ||
}); | ||
if (endpoint.length === 0) { | ||
throw new ConfigurationError('Endpoint is missing'); | ||
} | ||
respList.push(buildRequestPayload(payload, method, headersObject, params, updatedEndpoint)) | ||
}); | ||
return respList; | ||
}; | ||
const identifyResponseBuilder = (message, destination) => { | ||
/* | ||
Example Config : { | ||
"http-connectionMode": "cloud", | ||
"connectionMode": { | ||
"cloud": "cloud" | ||
}, | ||
"identify": [ | ||
{ | ||
"identifyEndpoint": "https://www.test.com/user/{userId}", | ||
"identifyMethod": "POST", | ||
"identifyHeaders": [ | ||
{ | ||
"from": "content-type", | ||
"to": "application/json" | ||
} | ||
], | ||
"identifyQueryParams": [ | ||
{ | ||
"from": "queryParam", | ||
"to": "123" | ||
} | ||
], | ||
"identifyPathVariables": [ | ||
{ | ||
"from": "userId", | ||
"to": "$.events.userId" | ||
} | ||
] | ||
} | ||
] | ||
} | ||
*/ | ||
const { identify } = destination.Config?.eventsMapping || destination.config.eventsMapping; | ||
const respList = []; | ||
identify.forEach(request => { | ||
const { endpoint, method, headers, queryParams, pathVariables, mappings, batchSize } = request; | ||
const headersObject = handleMappings(message, headers, 'value', 'key'); | ||
const params = handleMappings(message, queryParams, 'value', 'key'); | ||
const pathVariablesObj = getHashFromArray(pathVariables, 'pathVariable', 'pathValue', false); | ||
const payload = handleMappings(message, mappings); | ||
payload.maxBatchSize = batchSize; | ||
const updatedEndpoint = endpoint.replace(/{(\w+)}/g, (_, key) => { | ||
if (!pathVariablesObj[key]) { | ||
throw new Error(`Key ${key} not found in the pathVariables`); | ||
} | ||
return get(message, removePrefix(pathVariablesObj[key])); | ||
}); | ||
if (endpoint.length === 0) { | ||
throw new ConfigurationError('Endpoint is missing'); | ||
} | ||
respList.push(buildRequestPayload(payload, method, headersObject, params, updatedEndpoint)) | ||
}); | ||
return respList; | ||
}; | ||
const processEvent = (event) => { | ||
const { message, destination } = event; | ||
const { type } = message; | ||
let response | ||
switch (type) { | ||
case EventType.IDENTIFY: | ||
response = identifyResponseBuilder(message, destination); | ||
break; | ||
case EventType.TRACK: | ||
response = trackResponseBuilder(message, destination); | ||
break; | ||
Check failure on line 144 in src/v0/destinations/diy/transform.js GitHub Actions / Check for formatting & lint errors
|
||
default: | ||
throw new InstrumentationError(`Message type ${messageType} not supported`); | ||
} | ||
return response | ||
}; | ||
const process = (event) => { | ||
const response = processEvent(event); | ||
return response; | ||
}; | ||
|
||
const processRouterDest = (inputs, reqMetadata) => { | ||
let batchResponseList = []; | ||
const batchErrorRespList = []; | ||
const successRespList = []; | ||
const { destination } = inputs[0]; | ||
inputs.forEach((event) => { | ||
try { | ||
if (event.message.statusCode) { | ||
// already transformed event | ||
successRespList.push({ | ||
message: event.message, | ||
metadata: event.metadata, | ||
destination, | ||
}); | ||
} else { | ||
// if not transformed | ||
const messageList = process(event); | ||
messageList.forEach((message) => { | ||
const transformedPayload = { | ||
message, | ||
metadata: event.metadata, | ||
destination, | ||
}; | ||
successRespList.push(transformedPayload); | ||
}); | ||
} | ||
} catch (error) { | ||
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata); | ||
batchErrorRespList.push(errRespEvent); | ||
} | ||
}); | ||
Check failure on line 185 in src/v0/destinations/diy/transform.js GitHub Actions / Check for formatting & lint errors
|
||
if (successRespList.length > 0) { | ||
const { destination } = inputs[0]; | ||
const { enableBatching } = destination?.Config || destination.config; | ||
batchResponseList = enableBatching ? groupEvents(successRespList) : removeExtraFields(successRespList); | ||
|
||
} | ||
return [...batchResponseList, ...batchErrorRespList]; | ||
}; | ||
|
||
module.exports = { processEvent, process, processRouterDest }; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
const crypto = require('crypto'); | ||
const get = require('get-value'); | ||
const set = require('set-value'); | ||
const lodash = require('lodash'); | ||
|
||
const { | ||
getSuccessRespEvents, | ||
defaultBatchRequestConfig, | ||
getHashFromArray, | ||
applyCustomMappings, | ||
} = require('../../util'); | ||
|
||
const removeExtraFields = (events) => { | ||
events.map((ev) => delete ev.message.body.JSON.maxBatchSize) | ||
Check failure on line 14 in src/v0/destinations/diy/utils.js GitHub Actions / Check for formatting & lint errors
|
||
return events; | ||
} | ||
/** | ||
* This function calculates hash of incoming event | ||
* @param {*} event | ||
* @returns | ||
* Example event : { | ||
endpoint: 'https://www.abc.com/{userId}/{anonId}', | ||
headers: { | ||
'Content-Type': 'application/json', | ||
'Authorization': 'Bearer token' | ||
}, | ||
params: { userId: '123', anonId: '456' }, | ||
method: 'GET' | ||
}; | ||
*/ | ||
const getHash = (event) => { | ||
const { endpoint, headers, params, method } = event.message; | ||
const data = { | ||
endpoint, | ||
headers, | ||
params, | ||
method | ||
}; | ||
const dataString = JSON.stringify(data); | ||
const hash = crypto.createHash('sha256').update(dataString).digest('hex'); | ||
return hash; | ||
}; | ||
|
||
const generateBatchedPayloadForArray = (events) => { | ||
let batchEventResponse = defaultBatchRequestConfig(); | ||
const metadata = []; | ||
// extracting destination from the first event in a batch | ||
const { message, destination } = events[0]; | ||
|
||
const jsonArray = []; | ||
// Batch event into destination batch structure | ||
events.forEach((event) => { | ||
jsonArray.push(event.message.body.JSON); | ||
metadata.push(event.metadata); | ||
}); | ||
batchEventResponse.batchedRequest.endpoint = message.endpoint; | ||
batchEventResponse.batchedRequest.method = message.method; | ||
batchEventResponse.batchedRequest.headers = message.headers; | ||
batchEventResponse.batchedRequest.params = message.params; | ||
batchEventResponse.batchedRequest.body.JSON = jsonArray; | ||
batchEventResponse = { | ||
...batchEventResponse, | ||
metadata, | ||
destination, | ||
}; | ||
return batchEventResponse; | ||
}; | ||
const prefix = '$.'; | ||
const havePrefix = (str) => str.startsWith(prefix); | ||
const removePrefix = (str) => { | ||
if (havePrefix(str)) { | ||
return str.slice(prefix.length); | ||
} | ||
return str; | ||
} | ||
const getMaxBatchForBatch = (batch) => { | ||
const message = batch?.[0].message; | ||
return message.body.JSON?.maxBatchSize || 1; | ||
} | ||
const batchEvents = (groupedEvents) => { | ||
const batchedResponseList = []; | ||
// batching and chunking logic | ||
Object.keys(groupedEvents).forEach(group => { | ||
const maxBatchSize = getMaxBatchForBatch(groupedEvents[group]); | ||
const eventChunks = lodash.chunk(groupedEvents[group], maxBatchSize); | ||
eventChunks.forEach((chunk) => { | ||
const updatedEvents = removeExtraFields(chunk); | ||
const batchEventResponse = generateBatchedPayloadForArray(updatedEvents); | ||
batchedResponseList.push( | ||
getSuccessRespEvents( | ||
batchEventResponse.batchedRequest, | ||
batchEventResponse.metadata, | ||
batchEventResponse.destination, | ||
true, | ||
), | ||
); | ||
}); | ||
}); | ||
return batchedResponseList; | ||
} | ||
const groupEvents = (batch) => { | ||
const groupedEvents = {}; | ||
// grouping events | ||
batch.forEach(event => { | ||
const eventHash = getHash(event); | ||
if (!groupedEvents[eventHash]) { | ||
groupedEvents[eventHash] = [event]; | ||
} else { | ||
groupedEvents[eventHash].push(event); | ||
} | ||
}); | ||
return batchEvents(groupedEvents); | ||
}; | ||
|
||
const handleMappings = (message, mapArray, from = 'from', to = 'to') => { | ||
const customMappings = []; | ||
const normalMappings = [] | ||
mapArray.forEach(mapping => { | ||
if (havePrefix(mapping[from])) { | ||
customMappings.push(mapping) | ||
} else { | ||
normalMappings.push(mapping) | ||
} | ||
}); | ||
const constToConst = []; // use getHashFromArray | ||
const constToJsonPath = []; // use set method | ||
normalMappings.forEach(mapping => { | ||
if (havePrefix(mapping[to])) { | ||
constToJsonPath.push(mapping) | ||
} else { | ||
constToConst.push(mapping) | ||
} | ||
}) | ||
const finalMapping = {}; | ||
constToJsonPath.forEach(mapping => { | ||
set(finalMapping, mapping[to].replace(prefix, ''), mapping[from]) | ||
}) | ||
const constToConstMapping = getHashFromArray(constToConst, to, from, false) | ||
const jsonPathToJsonPath = []; // use custom mapping module for this | ||
const jsonPathToConst = []; // use set and get | ||
customMappings.forEach(mapping => { | ||
if (havePrefix(mapping[to])) { | ||
jsonPathToJsonPath.push(mapping) | ||
} else { | ||
const value = get(message, mapping[from].replace(prefix, '')); | ||
set(finalMapping, mapping[to], value) | ||
// jsonPathToConst.push({ [`$.${mapping[to]}`]: }) | ||
} | ||
}) | ||
const jsonPathToConstMapping = applyCustomMappings(message, jsonPathToConst); | ||
const jsonPathToJsonPathMapping = applyCustomMappings(message, jsonPathToJsonPath); | ||
return { ...finalMapping, ...jsonPathToJsonPathMapping, ...constToConstMapping, ...jsonPathToConstMapping } | ||
|
||
} | ||
|
||
module.exports = { groupEvents, removePrefix, handleMappings, removeExtraFields }; |