diff --git a/src/util/fetchDestinationHandlers.ts b/src/util/fetchDestinationHandlers.ts index 2661ef2e68..fa8cbb47c3 100644 --- a/src/util/fetchDestinationHandlers.ts +++ b/src/util/fetchDestinationHandlers.ts @@ -1,22 +1,18 @@ -import * as V0MarketoBulkUploadFileUpload from '../v0/destinations/marketo_bulk_upload/fileUpload'; -import * as V0MarketoBulkUploadPollStatus from '../v0/destinations/marketo_bulk_upload/poll'; -import * as V0MarketoBulkUploadJobStatus from '../v0/destinations/marketo_bulk_upload/fetchJobStatus'; - const fileUploadHandlers = { v0: { - marketo_bulk_upload: V0MarketoBulkUploadFileUpload, + marketo_bulk_upload: undefined, }, }; const pollStatusHandlers = { v0: { - marketo_bulk_upload: V0MarketoBulkUploadPollStatus, + marketo_bulk_upload: undefined, }, }; const jobStatusHandlers = { v0: { - marketo_bulk_upload: V0MarketoBulkUploadJobStatus, + marketo_bulk_upload: undefined, }, }; diff --git a/src/v0/destinations/gainsight_px/config.js b/src/v0/destinations/gainsight_px/config.js index cc058f88d2..a5ced4f1a7 100644 --- a/src/v0/destinations/gainsight_px/config.js +++ b/src/v0/destinations/gainsight_px/config.js @@ -1,12 +1,27 @@ const { getMappingConfig } = require('../../util'); const BASE_ENDPOINT = 'https://api.aptrinsic.com/v1'; -const ENDPOINTS = { - USERS_ENDPOINT: `${BASE_ENDPOINT}/users`, - CUSTOM_EVENTS_ENDPOINT: `${BASE_ENDPOINT}/events/custom`, - ACCOUNTS_ENDPOINT: `${BASE_ENDPOINT}/accounts`, +const BASE_EU_ENDPOINT = 'https://api-eu.aptrinsic.com/v1'; +const BASE_US2_ENDPOINT = 'https://api-us2.aptrinsic.com/v1'; + +const getBaseEndpoint = (Config) => { + const { dataCenter } = Config; + switch (dataCenter) { + case 'EU': + return BASE_EU_ENDPOINT; + case 'US2': + return BASE_US2_ENDPOINT; + default: + return BASE_ENDPOINT; + } }; +const getUsersEndpoint = (Config) => `${getBaseEndpoint(Config)}/users`; + +const getCustomEventsEndpoint = (Config) => `${getBaseEndpoint(Config)}/events/custom`; + +const getAccountsEndpoint = (Config) => `${getBaseEndpoint(Config)}/accounts`; + const CONFIG_CATEGORIES = { IDENTIFY: { type: 'identify', name: 'GainsightPX_Identify' }, TRACK: { type: 'track', name: 'GainsightPX_Track' }, @@ -79,10 +94,16 @@ const ACCOUNT_EXCLUSION_FIELDS = [ ]; module.exports = { - ENDPOINTS, USER_EXCLUSION_FIELDS, ACCOUNT_EXCLUSION_FIELDS, identifyMapping: MAPPING_CONFIG[CONFIG_CATEGORIES.IDENTIFY.name], trackMapping: MAPPING_CONFIG[CONFIG_CATEGORIES.TRACK.name], groupMapping: MAPPING_CONFIG[CONFIG_CATEGORIES.GROUP.name], + getUsersEndpoint, + getCustomEventsEndpoint, + getAccountsEndpoint, + BASE_ENDPOINT, + BASE_EU_ENDPOINT, + BASE_US2_ENDPOINT, + getBaseEndpoint, }; diff --git a/src/v0/destinations/gainsight_px/config.test.js b/src/v0/destinations/gainsight_px/config.test.js new file mode 100644 index 0000000000..825396d350 --- /dev/null +++ b/src/v0/destinations/gainsight_px/config.test.js @@ -0,0 +1,27 @@ +const { BASE_ENDPOINT, BASE_EU_ENDPOINT, BASE_US2_ENDPOINT, getBaseEndpoint } = require('./config'); + +describe('getBaseEndpoint method test', () => { + it('Should return BASE_ENDPOINT when destination.Config.dataCenter is not "EU" or "US2"', () => { + const Config = { + dataCenter: 'US', + }; + const result = getBaseEndpoint(Config); + expect(result).toBe(BASE_ENDPOINT); + }); + + it('Should return BASE_EU_ENDPOINT when destination.Config.dataCenter is "EU"', () => { + const Config = { + dataCenter: 'EU', + }; + const result = getBaseEndpoint(Config); + expect(result).toBe(BASE_EU_ENDPOINT); + }); + + it('Should return BASE_US2_ENDPOINT when destination.Config.dataCenter is "US2"', () => { + const Config = { + dataCenter: 'US2', + }; + const result = getBaseEndpoint(Config); + expect(result).toBe(BASE_US2_ENDPOINT); + }); +}); diff --git a/src/v0/destinations/gainsight_px/transform.js b/src/v0/destinations/gainsight_px/transform.js index 0911b76b6c..496099a6b4 100644 --- a/src/v0/destinations/gainsight_px/transform.js +++ b/src/v0/destinations/gainsight_px/transform.js @@ -27,12 +27,13 @@ const { formatEventProps, } = require('./util'); const { - ENDPOINTS, USER_EXCLUSION_FIELDS, ACCOUNT_EXCLUSION_FIELDS, trackMapping, groupMapping, identifyMapping, + getUsersEndpoint, + getCustomEventsEndpoint, } = require('./config'); const { JSON_MIME_TYPE } = require('../../util/constant'); @@ -92,7 +93,7 @@ const identifyResponseBuilder = async (message, { Config }, metadata) => { if (isUserPresent) { // update user response.method = defaultPutRequestConfig.requestMethod; - response.endpoint = `${ENDPOINTS.USERS_ENDPOINT}/${userId}`; + response.endpoint = `${getUsersEndpoint(Config)}/${userId}`; response.body.JSON = removeUndefinedAndNullValues(payload); return response; } @@ -100,7 +101,7 @@ const identifyResponseBuilder = async (message, { Config }, metadata) => { // create new user payload.identifyId = userId; response.method = defaultPostRequestConfig.requestMethod; - response.endpoint = ENDPOINTS.USERS_ENDPOINT; + response.endpoint = getUsersEndpoint(Config); response.body.JSON = removeUndefinedAndNullValues(payload); return response; }; @@ -162,7 +163,7 @@ const newGroupResponseBuilder = async (message, { Config }, metadata) => { 'X-APTRINSIC-API-KEY': Config.apiKey, 'Content-Type': JSON_MIME_TYPE, }; - response.endpoint = `${ENDPOINTS.USERS_ENDPOINT}/${userId}`; + response.endpoint = `${getUsersEndpoint(Config)}/${userId}`; response.body.JSON = { accountId: groupId, }; @@ -230,7 +231,7 @@ const groupResponseBuilder = async (message, { Config }, metadata) => { 'X-APTRINSIC-API-KEY': Config.apiKey, 'Content-Type': JSON_MIME_TYPE, }; - response.endpoint = `${ENDPOINTS.USERS_ENDPOINT}/${userId}`; + response.endpoint = `${getUsersEndpoint(Config)}/${userId}`; response.body.JSON = { accountId: groupId, }; @@ -271,7 +272,7 @@ const trackResponseBuilder = (message, { Config }) => { 'X-APTRINSIC-API-KEY': Config.apiKey, 'Content-Type': JSON_MIME_TYPE, }; - response.endpoint = ENDPOINTS.CUSTOM_EVENTS_ENDPOINT; + response.endpoint = getCustomEventsEndpoint(Config); return response; }; diff --git a/src/v0/destinations/gainsight_px/util.js b/src/v0/destinations/gainsight_px/util.js index 7300189297..71d85438de 100644 --- a/src/v0/destinations/gainsight_px/util.js +++ b/src/v0/destinations/gainsight_px/util.js @@ -1,9 +1,9 @@ const { NetworkError } = require('@rudderstack/integrations-lib'); -const { ENDPOINTS } = require('./config'); const tags = require('../../util/tags'); const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils'); const { JSON_MIME_TYPE } = require('../../util/constant'); const { handleHttpRequest } = require('../../../adapters/network'); +const { getUsersEndpoint, getAccountsEndpoint } = require('./config'); const handleErrorResponse = (error, customErrMessage, expectedErrStatus, defaultStatus = 400) => { let destResp; @@ -38,10 +38,10 @@ const handleErrorResponse = (error, customErrMessage, expectedErrStatus, default * @returns */ const objectExists = async (id, Config, objectType, metadata) => { - let url = `${ENDPOINTS.USERS_ENDPOINT}/${id}`; + let url = `${getUsersEndpoint(Config)}/${id}`; if (objectType === 'account') { - url = `${ENDPOINTS.ACCOUNTS_ENDPOINT}/${id}`; + url = `${getAccountsEndpoint(Config)}/${id}`; } const { httpResponse: res } = await handleHttpRequest( 'get', @@ -70,7 +70,7 @@ const objectExists = async (id, Config, objectType, metadata) => { const createAccount = async (payload, Config, metadata) => { const { httpResponse: res } = await handleHttpRequest( 'post', - ENDPOINTS.ACCOUNTS_ENDPOINT, + getAccountsEndpoint(Config), payload, { headers: { @@ -96,7 +96,7 @@ const createAccount = async (payload, Config, metadata) => { const updateAccount = async (accountId, payload, Config, metadata) => { const { httpResponse: res } = await handleHttpRequest( 'put', - `${ENDPOINTS.ACCOUNTS_ENDPOINT}/${accountId}`, + `${getAccountsEndpoint(Config)}/${accountId}`, payload, { headers: { diff --git a/src/v0/destinations/intercom_v2/config.js b/src/v0/destinations/intercom_v2/config.js index c7cb43b093..5ff5566d2d 100644 --- a/src/v0/destinations/intercom_v2/config.js +++ b/src/v0/destinations/intercom_v2/config.js @@ -6,6 +6,12 @@ const ApiVersions = { v2: '2.10', }; +const RecordAction = { + INSERT: 'insert', + UPDATE: 'update', + DELETE: 'delete', +}; + const ConfigCategory = { IDENTIFY: { name: 'IntercomIdentifyConfig', @@ -25,4 +31,5 @@ module.exports = { ConfigCategory, MappingConfig, ApiVersions, + RecordAction, }; diff --git a/src/v0/destinations/intercom_v2/transform.js b/src/v0/destinations/intercom_v2/transform.js index 8d97e20bde..3f9457410f 100644 --- a/src/v0/destinations/intercom_v2/transform.js +++ b/src/v0/destinations/intercom_v2/transform.js @@ -1,4 +1,4 @@ -const { InstrumentationError } = require('@rudderstack/integrations-lib'); +const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib'); const { handleRtTfSingleEventError, getSuccessRespEvents, @@ -17,13 +17,14 @@ const { addOrUpdateTagsToCompany, getStatusCode, getBaseEndpoint, + getRecordAction, } = require('./utils'); const { getName, filterCustomAttributes, addMetadataToPayload, } = require('../../../cdk/v2/destinations/intercom/utils'); -const { MappingConfig, ConfigCategory } = require('./config'); +const { MappingConfig, ConfigCategory, RecordAction } = require('./config'); const transformIdentifyPayload = (event) => { const { message, destination } = event; @@ -38,7 +39,7 @@ const transformIdentifyPayload = (event) => { } payload.name = getName(message); payload.custom_attributes = message.traits || message.context.traits || {}; - payload.custom_attributes = filterCustomAttributes(payload, 'user', destination); + payload.custom_attributes = filterCustomAttributes(payload, 'user', destination, message); return payload; }; @@ -66,7 +67,7 @@ const transformGroupPayload = (event) => { const category = ConfigCategory.GROUP; const payload = constructPayload(message, MappingConfig[category.name]); payload.custom_attributes = message.traits || message.context.traits || {}; - payload.custom_attributes = filterCustomAttributes(payload, 'company', destination); + payload.custom_attributes = filterCustomAttributes(payload, 'company', destination, message); return payload; }; @@ -131,6 +132,45 @@ const constructGroupResponse = async (event) => { return getResponse(method, endpoint, headers, finalPayload); }; +const constructRecordResponse = async (event) => { + const { message, destination, metadata } = event; + const { identifiers, fields } = message; + + let method = 'POST'; + let endpoint = `${getBaseEndpoint(destination)}/contacts`; + let payload = {}; + + const action = getRecordAction(message); + const contactId = await searchContact(event); + + if ((action === RecordAction.UPDATE || action === RecordAction.DELETE) && !contactId) { + throw new ConfigurationError('Contact is not present. Aborting.'); + } + + switch (action) { + case RecordAction.INSERT: + payload = { ...identifiers, ...fields }; + if (contactId) { + endpoint += `/${contactId}`; + payload = { ...fields }; + method = 'PUT'; + } + break; + case RecordAction.UPDATE: + endpoint += `/${contactId}`; + payload = { ...fields }; + method = 'PUT'; + break; + case RecordAction.DELETE: + endpoint += `/${contactId}`; + method = 'DELETE'; + break; + default: + throw new InstrumentationError(`action ${action} is not supported.`); + } + return getResponse(method, endpoint, getHeaders(metadata), payload); +}; + const processEvent = async (event) => { const { message } = event; const messageType = getEventType(message); @@ -145,6 +185,9 @@ const processEvent = async (event) => { case EventType.GROUP: response = await constructGroupResponse(event); break; + case EventType.RECORD: + response = constructRecordResponse(event); + break; default: throw new InstrumentationError(`message type ${messageType} is not supported.`); } diff --git a/src/v0/destinations/intercom_v2/utils.js b/src/v0/destinations/intercom_v2/utils.js index 69ea1385d9..df44b92e24 100644 --- a/src/v0/destinations/intercom_v2/utils.js +++ b/src/v0/destinations/intercom_v2/utils.js @@ -28,6 +28,8 @@ const { getAccessToken } = require('../../util'); const { ApiVersions, destType } = require('./config'); const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils'); +const getRecordAction = (message) => message?.action?.toLowerCase(); + /** * method to handle error during api call * ref docs: https://developers.intercom.com/docs/references/rest-api/errors/http-responses/ @@ -99,11 +101,25 @@ const getResponse = (method, endpoint, headers, payload) => { const searchContact = async (event) => { const { message, destination, metadata } = event; - const lookupField = getLookUpField(message); - let lookupFieldValue = getFieldValueFromMessage(message, lookupField); - if (!lookupFieldValue) { - lookupFieldValue = message?.context?.traits?.[lookupField]; + + const extractLookupFieldAndValue = () => { + const messageType = getEventType(message); + if (messageType === EventType.RECORD) { + const { identifiers } = message; + return Object.entries(identifiers || {})[0] || [null, null]; + } + const lookupField = getLookUpField(message); + const lookupFieldValue = + getFieldValueFromMessage(message, lookupField) || message?.context?.traits?.[lookupField]; + return [lookupField, lookupFieldValue]; + }; + + const [lookupField, lookupFieldValue] = extractLookupFieldAndValue(); + + if (!lookupField || !lookupFieldValue) { + throw new InstrumentationError('Missing lookup field or lookup field value for searchContact'); } + const data = JSON.stringify({ query: { operator: 'AND', @@ -329,4 +345,5 @@ module.exports = { attachContactToCompany, addOrUpdateTagsToCompany, getBaseEndpoint, + getRecordAction, }; diff --git a/src/v0/destinations/marketo_bulk_upload/config.js b/src/v0/destinations/marketo_bulk_upload/config.js deleted file mode 100644 index e3268711fe..0000000000 --- a/src/v0/destinations/marketo_bulk_upload/config.js +++ /dev/null @@ -1,55 +0,0 @@ -const ABORTABLE_CODES = ['601', '603', '605', '609', '610']; -const RETRYABLE_CODES = ['713', '602', '604', '611']; -const THROTTLED_CODES = ['502', '606', '607', '608', '615']; - -const MARKETO_FILE_SIZE = 10485760; -const MARKETO_FILE_PATH = `${__dirname}/uploadFile/marketo_bulkupload.csv`; - -const FETCH_ACCESS_TOKEN = 'marketo_bulk_upload_access_token_fetching'; - -const POLL_ACTIVITY = 'marketo_bulk_upload_polling'; -const POLL_STATUS_ERR_MSG = 'Could not poll status'; - -const UPLOAD_FILE = 'marketo_bulk_upload_upload_file'; -const FILE_UPLOAD_ERR_MSG = 'Could not upload file'; - -const JOB_STATUS_ACTIVITY = 'marketo_bulk_upload_get_job_status'; -const FETCH_FAILURE_JOB_STATUS_ERR_MSG = 'Could not fetch failure job status'; -const FETCH_WARNING_JOB_STATUS_ERR_MSG = 'Could not fetch warning job status'; -const ACCESS_TOKEN_FETCH_ERR_MSG = 'Error during fetching access token'; - -const SCHEMA_DATA_TYPE_MAP = { - string: 'string', - number: 'number', - boolean: 'boolean', - undefined: 'undefined', - float: 'number', - text: 'string', - currency: 'string', - integer: 'number', - reference: 'string', - datetime: 'string', - date: 'string', - email: 'string', - phone: 'string', - url: 'string', - object: 'object', -}; - -module.exports = { - ABORTABLE_CODES, - RETRYABLE_CODES, - THROTTLED_CODES, - MARKETO_FILE_SIZE, - POLL_ACTIVITY, - UPLOAD_FILE, - JOB_STATUS_ACTIVITY, - MARKETO_FILE_PATH, - FETCH_ACCESS_TOKEN, - POLL_STATUS_ERR_MSG, - FILE_UPLOAD_ERR_MSG, - FETCH_FAILURE_JOB_STATUS_ERR_MSG, - FETCH_WARNING_JOB_STATUS_ERR_MSG, - ACCESS_TOKEN_FETCH_ERR_MSG, - SCHEMA_DATA_TYPE_MAP, -}; diff --git a/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js b/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js deleted file mode 100644 index db3b13eeb8..0000000000 --- a/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js +++ /dev/null @@ -1,153 +0,0 @@ -/* eslint-disable no-restricted-syntax */ -/* eslint-disable no-prototype-builtins */ -const { PlatformError } = require('@rudderstack/integrations-lib'); -const { getAccessToken } = require('./util'); -const { handleHttpRequest } = require('../../../adapters/network'); -const stats = require('../../../util/stats'); -const { JSON_MIME_TYPE } = require('../../util/constant'); -const { - handleFetchJobStatusResponse, - getFieldSchemaMap, - checkEventStatusViaSchemaMatching, -} = require('./util'); -const { removeUndefinedValues } = require('../../util'); - -const getJobsStatus = async (event, type, accessToken) => { - const { config, importId } = event; - const { munchkinId } = config; - let url; - // Get status of each lead for failed leads - // DOC: https://developers.marketo.com/rest-api/bulk-import/bulk-lead-import/#failures - const requestOptions = { - headers: { - 'Content-Type': JSON_MIME_TYPE, - Authorization: `Bearer ${accessToken}`, - }, - }; - if (type === 'fail') { - url = `https://${munchkinId}.mktorest.com/bulk/v1/leads/batch/${importId}/failures.json`; - } else { - url = `https://${munchkinId}.mktorest.com/bulk/v1/leads/batch/${importId}/warnings.json`; - } - const startTime = Date.now(); - const { processedResponse: resp } = await handleHttpRequest('get', url, requestOptions, { - destType: 'marketo_bulk_upload', - feature: 'transformation', - endpointPath: '/leads/batch/', - requestMethod: 'GET', - module: 'router', - }); - const endTime = Date.now(); - const requestTime = endTime - startTime; - - stats.histogram('marketo_bulk_upload_fetch_job_time', requestTime); - - return handleFetchJobStatusResponse(resp, type); -}; - -/** - * Handles the response from the server based on the provided type. - * Retrieves the job status using the getJobsStatus function and processes the response data. - * Matches the response data with the data received from the server. - * Returns a response object containing the failed keys, failed reasons, warning keys, warning reasons, and succeeded keys. - * @param {Object} event - An object containing the input data and metadata. - * @param {string} type - A string indicating the type of job status to retrieve ("fail" or "warn"). - * @returns {Object} - A response object with the failed keys, failed reasons, warning keys, warning reasons, and succeeded keys. - */ -const responseHandler = async (event, type) => { - let FailedKeys = []; - const unsuccessfulJobIdsArr = []; - let successfulJobIdsArr = []; - let reasons = {}; - - const { config } = event; - const accessToken = await getAccessToken(config); - - /** - * { - "FailedKeys" : [jobID1,jobID3], - "FailedReasons" : { - "jobID1" : "failure-reason-1", - "jobID3" : "failure-reason-2", - }, - "WarningKeys" : [jobID2,jobID4], - "WarningReasons" : { - "jobID2" : "warning-reason-1", - "jobID4" : "warning-reason-2", - }, - "SucceededKeys" : [jobID5] -} - */ - - const jobStatus = - type === 'fail' - ? await getJobsStatus(event, 'fail', accessToken) - : await getJobsStatus(event, 'warn', accessToken); - const jobStatusArr = jobStatus.toString().split('\n'); // responseArr = ['field1,field2,Import Failure Reason', 'val1,val2,reason',...] - const { input, metadata } = event; - let headerArr; - if (metadata?.csvHeader) { - headerArr = metadata.csvHeader.split(','); - } else { - throw new PlatformError('No csvHeader in metadata'); - } - const startTime = Date.now(); - const data = {}; - const fieldSchemaMapping = await getFieldSchemaMap(accessToken, config.munchkinId); - const unsuccessfulJobInfo = checkEventStatusViaSchemaMatching(event, fieldSchemaMapping); - const mismatchJobIdArray = Object.keys(unsuccessfulJobInfo); - const dataTypeMismatchKeys = mismatchJobIdArray.map((strJobId) => parseInt(strJobId, 10)); - reasons = { ...unsuccessfulJobInfo }; - - const filteredEvents = input.filter( - (item) => !dataTypeMismatchKeys.includes(item.metadata.job_id), - ); - // create a map of job_id and data sent from server - // {: ','} - filteredEvents.forEach((i) => { - const response = headerArr.map((fieldName) => Object.values(i)[0][fieldName]).join(','); - data[i.metadata.job_id] = response; - }); - - // match marketo response data with received data from server - for (const element of jobStatusArr) { - // split response by comma but ignore commas inside double quotes - const elemArr = element.split(/,(?=(?:(?:[^"]*"){2})*[^"]*$)/); - // ref : - // https://developers.marketo.com/rest-api/bulk-import/bulk-custom-object-import/#:~:text=Now%20we%E2%80%99ll%20make%20Get%20Import%20Custom%20Object%20Failures%20endpoint%20call%20to%20get%20additional%20failure%20detail%3A - const reasonMessage = elemArr.pop(); // get the column named "Import Failure Reason" - for (const [key, val] of Object.entries(data)) { - // joining the parameter values sent from marketo match it with received data from server - if (val === `${elemArr.map((item) => item.replace(/"/g, '')).join(',')}`) { - // add job keys if warning/failure - if (!unsuccessfulJobIdsArr.includes(key)) { - unsuccessfulJobIdsArr.push(key); - } - reasons[key] = reasonMessage; - } - } - } - - FailedKeys = unsuccessfulJobIdsArr.map((strJobId) => parseInt(strJobId, 10)); - successfulJobIdsArr = Object.keys(data).filter((x) => !unsuccessfulJobIdsArr.includes(x)); - - const SucceededKeys = successfulJobIdsArr.map((strJobId) => parseInt(strJobId, 10)); - const endTime = Date.now(); - const requestTime = endTime - startTime; - stats.histogram('marketo_bulk_upload_fetch_job_create_response_time', requestTime); - const response = { - statusCode: 200, - metadata: { - FailedKeys: [...dataTypeMismatchKeys, ...FailedKeys], - FailedReasons: reasons, - SucceededKeys, - }, - }; - return removeUndefinedValues(response); -}; - -const processJobStatus = async (event, type) => { - const resp = await responseHandler(event, type); - return resp; -}; -module.exports = { processJobStatus }; diff --git a/src/v0/destinations/marketo_bulk_upload/fileUpload.js b/src/v0/destinations/marketo_bulk_upload/fileUpload.js deleted file mode 100644 index b49a265fd5..0000000000 --- a/src/v0/destinations/marketo_bulk_upload/fileUpload.js +++ /dev/null @@ -1,275 +0,0 @@ -/* eslint-disable no-plusplus */ -const FormData = require('form-data'); -const fs = require('fs'); -const { - NetworkError, - ConfigurationError, - RetryableError, - TransformationError, -} = require('@rudderstack/integrations-lib'); -const { - getAccessToken, - getMarketoFilePath, - handleFileUploadResponse, - getFieldSchemaMap, - hydrateStatusForServer, -} = require('./util'); -const { isHttpStatusSuccess } = require('../../util'); -const { MARKETO_FILE_SIZE, UPLOAD_FILE } = require('./config'); -const { - getHashFromArray, - removeUndefinedAndNullValues, - isDefinedAndNotNullAndNotEmpty, -} = require('../../util'); -const { handleHttpRequest } = require('../../../adapters/network'); -const { client } = require('../../../util/errorNotifier'); -const stats = require('../../../util/stats'); - -const fetchFieldSchemaNames = async (config, accessToken) => { - const fieldSchemaMapping = await getFieldSchemaMap(accessToken, config.munchkinId); - if (Object.keys(fieldSchemaMapping).length > 0) { - const fieldSchemaNames = Object.keys(fieldSchemaMapping); - return { fieldSchemaNames }; - } - throw new RetryableError('Failed to fetch Marketo Field Schema', 500, fieldSchemaMapping); -}; - -const getHeaderFields = (config, fieldSchemaNames) => { - const { columnFieldsMapping } = config; - - columnFieldsMapping.forEach((colField) => { - if (!fieldSchemaNames.includes(colField.to)) { - throw new ConfigurationError( - `The field ${colField.to} is not present in Marketo Field Schema. Aborting`, - ); - } - }); - const columnField = getHashFromArray(columnFieldsMapping, 'to', 'from', false); - return Object.keys(columnField); -}; -/** - * Processes input data to create a CSV file and returns the file data along with successful and unsuccessful job IDs. - * The file name is made unique with combination of UUID and current timestamp to avoid any overrides. It also has a - * maximum size limit of 10MB . The events that could be accomodated inside the file is marked as successful and the - * rest are marked as unsuccessful. Also the file is deleted when reading is complete. - * @param {Array} inputEvents - An array of input events. - * @param {Object} config - destination config - * @param {Array} headerArr - An array of header fields. - * @returns {Object} - An object containing the file stream, successful job IDs, and unsuccessful job IDs. - */ -const getFileData = async (inputEvents, config, headerArr) => { - const input = inputEvents; - const messageArr = []; - let startTime; - let endTime; - let requestTime; - startTime = Date.now(); - - input.forEach((i) => { - const inputData = i; - const jobId = inputData.metadata.job_id; - const data = {}; - data[jobId] = inputData.message; - messageArr.push(data); - }); - - if (isDefinedAndNotNullAndNotEmpty(config.deDuplicationField)) { - // dedup starts - // Time Complexity = O(n2) - const dedupMap = new Map(); - // iterating input and storing the occurences of messages - // with same dedup property received from config - // Example: dedup-property = email - // k (key) v (index of occurence in input) - // user@email [4,7,9] - // user2@email [2,3] - // user3@email [1] - input.forEach((element, index) => { - const indexAr = dedupMap.get(element.message[config.deDuplicationField]) || []; - indexAr.push(index); - dedupMap.set(element.message[config.deDuplicationField], indexAr); - return dedupMap; - }); - // 1. iterating dedupMap - // 2. storing the duplicate occurences in dupValues arr - // 3. iterating dupValues arr, and mapping each property on firstBorn - // 4. as dupValues arr is sorted hence the firstBorn will inherit properties of last occurence (most updated one) - // 5. store firstBorn to first occurence in input as it should get the highest priority - dedupMap.forEach((indexes) => { - let firstBorn = {}; - indexes.forEach((idx) => { - headerArr.forEach((headerStr) => { - // if duplicate item has defined property to offer we take it else old one remains - firstBorn[headerStr] = input[idx].message[headerStr] || firstBorn[headerStr]; - }); - }); - firstBorn = removeUndefinedAndNullValues(firstBorn); - input[indexes[0]].message = firstBorn; - }); - // dedup ends - } - - const csv = []; - csv.push(headerArr.toString()); - endTime = Date.now(); - requestTime = endTime - startTime; - stats.histogram('marketo_bulk_upload_create_header_time', requestTime); - const unsuccessfulJobs = []; - const successfulJobs = []; - const MARKETO_FILE_PATH = getMarketoFilePath(); - startTime = Date.now(); - messageArr.forEach((row) => { - const csvSize = JSON.stringify(csv); // stringify and remove all "stringification" extra data - const response = headerArr - .map((fieldName) => JSON.stringify(Object.values(row)[0][fieldName], '')) - .join(','); - if (csvSize.length <= MARKETO_FILE_SIZE) { - csv.push(response); - successfulJobs.push(Object.keys(row)[0]); - } else { - unsuccessfulJobs.push(Object.keys(row)[0]); - } - }); - endTime = Date.now(); - requestTime = endTime - startTime; - stats.histogram('marketo_bulk_upload_create_csvloop_time', requestTime); - const fileSize = Buffer.from(csv.join('\n')).length; - if (csv.length > 1) { - startTime = Date.now(); - fs.writeFileSync(MARKETO_FILE_PATH, csv.join('\n')); - const readStream = fs.readFileSync(MARKETO_FILE_PATH); - fs.unlinkSync(MARKETO_FILE_PATH); - endTime = Date.now(); - requestTime = endTime - startTime; - stats.histogram('marketo_bulk_upload_create_file_time', requestTime); - stats.histogram('marketo_bulk_upload_upload_file_size', fileSize); - - return { readStream, successfulJobs, unsuccessfulJobs }; - } - return { successfulJobs, unsuccessfulJobs }; -}; - -const getImportID = async (input, config, accessToken, csvHeader) => { - let readStream; - let successfulJobs; - let unsuccessfulJobs; - try { - ({ readStream, successfulJobs, unsuccessfulJobs } = await getFileData( - input, - config, - csvHeader, - )); - } catch (err) { - client.notify(err, `Marketo File Upload: Error while creating file: ${err.message}`, { - config, - csvHeader, - }); - throw new TransformationError( - `Marketo File Upload: Error while creating file: ${err.message}`, - 500, - ); - } - - const formReq = new FormData(); - const { munchkinId, deDuplicationField } = config; - // create file for multipart form - if (readStream) { - formReq.append('format', 'csv'); - formReq.append('file', readStream, 'marketo_bulk_upload.csv'); - formReq.append('access_token', accessToken); - // Upload data received from server as files to marketo - // DOC: https://developers.marketo.com/rest-api/bulk-import/bulk-lead-import/#import_file - const requestOptions = { - headers: { - ...formReq.getHeaders(), - }, - }; - if (isDefinedAndNotNullAndNotEmpty(deDuplicationField)) { - requestOptions.params = { - lookupField: deDuplicationField, - }; - } - const startTime = Date.now(); - const { processedResponse: resp } = await handleHttpRequest( - 'post', - `https://${munchkinId}.mktorest.com/bulk/v1/leads.json`, - formReq, - requestOptions, - { - destType: 'marketo_bulk_upload', - feature: 'transformation', - endpointPath: '/leads.json', - requestMethod: 'POST', - module: 'router', - }, - ); - const endTime = Date.now(); - const requestTime = endTime - startTime; - stats.counter('marketo_bulk_upload_upload_file_succJobs', successfulJobs.length); - stats.counter('marketo_bulk_upload_upload_file_unsuccJobs', unsuccessfulJobs.length); - if (!isHttpStatusSuccess(resp.status)) { - throw new NetworkError( - `Unable to upload file due to error : ${JSON.stringify(resp.response)}`, - hydrateStatusForServer(resp.status, 'During uploading file'), - ); - } - return handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); - } - return { importId: null, successfulJobs, unsuccessfulJobs }; -}; - -/** - * - * @param {*} input - * @param {*} config - * @returns returns the final response of fileUpload.js - */ -const responseHandler = async (input, config) => { - const accessToken = await getAccessToken(config); - /** - { - "importId" : , - "pollURL" : , - } - */ - const { fieldSchemaNames } = await fetchFieldSchemaNames(config, accessToken); - const headerForCsv = getHeaderFields(config, fieldSchemaNames); - if (Object.keys(headerForCsv).length === 0) { - throw new ConfigurationError( - 'Faulty configuration. Please map your traits to Marketo column fields', - ); - } - const { importId, successfulJobs, unsuccessfulJobs } = await getImportID( - input, - config, - accessToken, - headerForCsv, - ); - // if upload is successful - if (importId) { - const csvHeader = headerForCsv.toString(); - const metadata = { successfulJobs, unsuccessfulJobs, csvHeader }; - const response = { - statusCode: 200, - importId, - metadata, - }; - return response; - } - // if importId is returned null - stats.increment(UPLOAD_FILE, { - status: 500, - state: 'Retryable', - }); - return { - statusCode: 500, - FailedReason: '[Marketo File upload]: No import id received', - }; -}; -const processFileData = async (event) => { - const { input, config } = event; - const resp = await responseHandler(input, config); - return resp; -}; - -module.exports = { processFileData }; diff --git a/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js b/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js deleted file mode 100644 index 13e1b3a09a..0000000000 --- a/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js +++ /dev/null @@ -1,542 +0,0 @@ -const { - handleCommonErrorResponse, - handlePollResponse, - handleFileUploadResponse, - getAccessToken, - checkEventStatusViaSchemaMatching, -} = require('./util'); - -const { - AbortedError, - RetryableError, - NetworkError, - TransformationError, -} = require('@rudderstack/integrations-lib'); -const util = require('./util.js'); -const networkAdapter = require('../../../adapters/network'); -const { handleHttpRequest } = networkAdapter; - -// Mock the handleHttpRequest function -jest.mock('../../../adapters/network'); - -const successfulResponse = { - status: 200, - response: { - access_token: '', - token_type: 'bearer', - expires_in: 3600, - scope: 'dummy@scope.com', - success: true, - }, -}; - -const unsuccessfulResponse = { - status: 400, - response: '[ENOTFOUND] :: DNS lookup failed', -}; - -const emptyResponse = { - response: '', -}; - -const invalidClientErrorResponse = { - status: 401, - response: { - error: 'invalid_client', - error_description: 'Bad client credentials', - }, -}; - -describe('handleCommonErrorResponse', () => { - test('should throw AbortedError for abortable error codes', () => { - const resp = { - response: { - errors: [{ code: 1003, message: 'Aborted' }], - }, - }; - expect(() => handleCommonErrorResponse(resp, 'opErrorMessage', 'opActivity')).toThrow( - AbortedError, - ); - }); - - test('should throw ThrottledError for throttled error codes', () => { - const resp = { - response: { - errors: [{ code: 615, message: 'Throttled' }], - }, - }; - expect(() => handleCommonErrorResponse(resp, 'opErrorMessage', 'opActivity')).toThrow( - RetryableError, - ); - }); - - test('should throw RetryableError for other error codes', () => { - const resp = { - response: { - errors: [{ code: 2000, message: 'Retryable' }], - }, - }; - expect(() => handleCommonErrorResponse(resp, 'opErrorMessage', 'opActivity')).toThrow( - RetryableError, - ); - }); - - test('should throw RetryableError by default', () => { - const resp = { - response: { - errors: [], - }, - }; - expect(() => handleCommonErrorResponse(resp, 'opErrorMessage', 'opActivity')).toThrow( - RetryableError, - ); - }); -}); - -describe('handlePollResponse', () => { - // Tests that the function returns the response object if the polling operation was successful - it('should return the response object when the polling operation was successful', () => { - const pollStatus = { - response: { - success: true, - result: [ - { - batchId: '123', - status: 'Complete', - numOfLeadsProcessed: 2, - numOfRowsFailed: 1, - numOfRowsWithWarning: 0, - message: 'Import completed with errors, 2 records imported (2 members), 1 failed', - }, - ], - }, - }; - - const result = handlePollResponse(pollStatus); - - expect(result).toEqual(pollStatus.response); - }); - - // Tests that the function throws an AbortedError if the response contains an abortable error code - it('should throw an AbortedError when the response contains an abortable error code', () => { - const pollStatus = { - response: { - errors: [ - { - code: 1003, - message: 'Empty file', - }, - ], - }, - }; - - expect(() => handlePollResponse(pollStatus)).toThrow(AbortedError); - }); - - // Tests that the function throws a ThrottledError if the response contains a throttled error code - it('should throw a ThrottledError when the response contains a throttled error code', () => { - const pollStatus = { - response: { - errors: [ - { - code: 615, - message: 'Exceeded concurrent usage limit', - }, - ], - }, - }; - - expect(() => handlePollResponse(pollStatus)).toThrow(RetryableError); - }); - - // Tests that the function throws a RetryableError if the response contains an error code that is not abortable or throttled - it('should throw a RetryableError when the response contains an error code that is not abortable or throttled', () => { - const pollStatus = { - response: { - errors: [ - { - code: 601, - message: 'Unauthorized', - }, - ], - }, - }; - - expect(() => handlePollResponse(pollStatus)).toThrow(RetryableError); - }); - - // Tests that the function returns null if the polling operation was not successful - it('should return null when the polling operation was not successful', () => { - const pollStatus = { - response: { - success: false, - }, - }; - - const result = handlePollResponse(pollStatus); - - expect(result).toBeNull(); - }); -}); - -describe('handleFileUploadResponse', () => { - // Tests that the function returns an object with importId, successfulJobs, and unsuccessfulJobs when the response indicates a successful upload. - it('should return an object with importId, successfulJobs, and unsuccessfulJobs when the response indicates a successful upload', () => { - const resp = { - response: { - success: true, - result: [ - { - importId: '3404', - status: 'Queued', - }, - ], - }, - }; - const successfulJobs = []; - const unsuccessfulJobs = []; - const requestTime = 100; - - const result = handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); - - expect(result).toEqual({ - importId: '3404', - successfulJobs: [], - unsuccessfulJobs: [], - }); - }); - - // Tests that the function throws a RetryableError when the response indicates an empty file. - it('should throw a RetryableError when the response indicates an empty file', () => { - const resp = { - response: { - errors: [ - { - code: '1003', - message: 'Empty File', - }, - ], - }, - }; - const successfulJobs = []; - const unsuccessfulJobs = []; - const requestTime = 100; - - expect(() => { - handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); - }).toThrow(RetryableError); - }); - - // Tests that the function throws a RetryableError when the response indicates more than 10 concurrent uses. - it('should throw a RetryableError when the response indicates more than 10 concurrent uses', () => { - const resp = { - response: { - errors: [ - { - code: '615', - message: 'Concurrent Use Limit Exceeded', - }, - ], - }, - }; - const successfulJobs = []; - const unsuccessfulJobs = []; - const requestTime = 100; - - expect(() => { - handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); - }).toThrow(RetryableError); - }); - - // Tests that the function throws a RetryableError when the response contains an error code between 1000 and 1077. - it('should throw a Aborted when the response contains an error code between 1000 and 1077', () => { - const resp = { - response: { - errors: [ - { - code: 1001, - message: 'Some Error', - }, - ], - }, - }; - const successfulJobs = []; - const unsuccessfulJobs = []; - const requestTime = 100; - - expect(() => { - handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); - }).toThrow(AbortedError); - }); -}); - -describe('getAccessToken', () => { - beforeEach(() => { - handleHttpRequest.mockClear(); - }); - - it('should retrieve and return access token on successful response', async () => { - const url = - 'https://dummyMunchkinId.mktorest.com/identity/oauth/token?client_id=dummyClientId&client_secret=dummyClientSecret&grant_type=client_credentials'; - - handleHttpRequest.mockResolvedValueOnce({ - processedResponse: successfulResponse, - }); - - const config = { - clientId: 'dummyClientId', - clientSecret: 'dummyClientSecret', - munchkinId: 'dummyMunchkinId', - }; - - const result = await getAccessToken(config); - expect(result).toBe(''); - expect(handleHttpRequest).toHaveBeenCalledTimes(1); - // Ensure your mock response structure is consistent with the actual behavior - expect(handleHttpRequest).toHaveBeenCalledWith('get', url, { - destType: 'marketo_bulk_upload', - feature: 'transformation', - endpointPath: '/identity/oauth/token', - feature: 'transformation', - module: 'router', - requestMethod: 'GET', - }); - }); - - it('should throw a NetworkError on unsuccessful HTTP status', async () => { - handleHttpRequest.mockResolvedValueOnce({ - processedResponse: unsuccessfulResponse, - }); - - const config = { - clientId: 'dummyClientId', - clientSecret: 'dummyClientSecret', - munchkinId: 'dummyMunchkinId', - }; - - await expect(getAccessToken(config)).rejects.toThrow(NetworkError); - }); - - it('should throw a RetryableError when expires_in is 0', async () => { - handleHttpRequest.mockResolvedValueOnce({ - processedResponse: { - ...successfulResponse, - response: { ...successfulResponse.response, expires_in: 0 }, - }, - }); - - const config = { - clientId: 'dummyClientId', - clientSecret: 'dummyClientSecret', - munchkinId: 'dummyMunchkinId', - }; - - await expect(getAccessToken(config)).rejects.toThrow(RetryableError); - }); - - it('should throw an AbortedError on unsuccessful response', async () => { - handleHttpRequest.mockResolvedValueOnce({ processedResponse: invalidClientErrorResponse }); - - const config = { - clientId: 'invalidClientID', - clientSecret: 'dummyClientSecret', - munchkinId: 'dummyMunchkinId', - }; - - await expect(getAccessToken(config)).rejects.toThrow(NetworkError); - }); - - it('should throw transformation error response', async () => { - handleHttpRequest.mockResolvedValueOnce({ processedResponse: emptyResponse }); - - const config = { - clientId: 'dummyClientId', - clientSecret: 'dummyClientSecret', - munchkinId: 'dummyMunchkinId', - }; - - await expect(getAccessToken(config)).rejects.toThrow(TransformationError); - }); -}); - -describe('checkEventStatusViaSchemaMatching', () => { - // The function correctly identifies fields with expected data types. - it('if event data types match with expected data types we send no field as mismatch', () => { - const event = { - input: [ - { - message: { - email: 'value1', - id: 123, - isLead: true, - }, - metadata: { - job_id: 'job1', - }, - }, - ], - }; - const fieldSchemaMapping = { - email: 'string', - id: 'integer', - isLead: 'boolean', - }; - - const result = checkEventStatusViaSchemaMatching(event, fieldSchemaMapping); - - expect(result).toEqual({}); - }); - - // The function correctly identifies fields with unexpected data types. - it('if event data types do not match with expected data types we send that field as mismatch', () => { - const event = { - input: [ - { - message: { - email: 123, - city: '123', - islead: true, - }, - metadata: { - job_id: 'job1', - }, - }, - ], - }; - const fieldSchemaMapping = { - email: 'string', - city: 'number', - islead: 'boolean', - }; - - const result = checkEventStatusViaSchemaMatching(event, fieldSchemaMapping); - - expect(result).toEqual({ - job1: 'invalid email', - }); - }); - - // The function correctly handles events with multiple fields. - it('For array of events the mismatch object fills up with each event errors', () => { - const event = { - input: [ - { - message: { - id: 'value1', - testCustomFieldScore: 123, - isLead: true, - }, - metadata: { - job_id: 'job1', - }, - }, - { - message: { - email: 'value2', - id: 456, - testCustomFieldScore: false, - }, - metadata: { - job_id: 'job2', - }, - }, - ], - }; - const fieldSchemaMapping = { - email: 'email', - id: 'integer', - testCustomFieldScore: 'integer', - isLead: 'boolean', - }; - - const result = checkEventStatusViaSchemaMatching(event, fieldSchemaMapping); - - expect(result).toEqual({ - job1: 'invalid id', - job2: 'invalid testCustomFieldScore', - }); - }); - - // The function correctly handles events with missing fields. - it('it is not mandatory to send all the fields present in schema', () => { - const event = { - input: [ - { - message: { - email: 'value1', - isLead: true, - }, - metadata: { - job_id: 'job1', - }, - }, - ], - }; - const fieldSchemaMapping = { - email: 'string', - id: 'number', - isLead: 'boolean', - }; - - const result = checkEventStatusViaSchemaMatching(event, fieldSchemaMapping); - - expect(result).toEqual({}); - }); - - // The function correctly handles events with additional fields. But this will not happen in our use case - it('for any field beyond schema fields will be mapped as invalid', () => { - const event = { - input: [ - { - message: { - email: 'value1', - id: 124, - isLead: true, - abc: 'value2', - }, - metadata: { - job_id: 'job1', - }, - }, - ], - }; - const fieldSchemaMapping = { - email: 'string', - id: 'number', - isLead: 'boolean', - }; - - const result = checkEventStatusViaSchemaMatching(event, fieldSchemaMapping); - - expect(result).toEqual({ - job1: 'invalid abc', - }); - }); - - // The function correctly handles events with null values. - it('should ignore event properties with null values', () => { - const event = { - input: [ - { - message: { - email: 'value1', - id: null, - isLead: true, - }, - metadata: { - job_id: 'job1', - }, - }, - ], - }; - const fieldSchemaMapping = { - email: 'string', - id: 'number', - isLead: 'boolean', - }; - - const result = checkEventStatusViaSchemaMatching(event, fieldSchemaMapping); - - expect(result).toEqual({}); - }); -}); diff --git a/src/v0/destinations/marketo_bulk_upload/poll.js b/src/v0/destinations/marketo_bulk_upload/poll.js deleted file mode 100644 index f53347d6e5..0000000000 --- a/src/v0/destinations/marketo_bulk_upload/poll.js +++ /dev/null @@ -1,126 +0,0 @@ -const { NetworkError } = require('@rudderstack/integrations-lib'); -const { removeUndefinedValues, isHttpStatusSuccess } = require('../../util'); -const { getAccessToken, handlePollResponse, hydrateStatusForServer } = require('./util'); -const { handleHttpRequest } = require('../../../adapters/network'); -const stats = require('../../../util/stats'); -const { JSON_MIME_TYPE } = require('../../util/constant'); -const { POLL_ACTIVITY } = require('./config'); - -const getPollStatus = async (event) => { - const accessToken = await getAccessToken(event.config); - const { munchkinId } = event.config; - - // To see the status of the import job polling is done - // DOC: https://developers.marketo.com/rest-api/bulk-import/bulk-lead-import/#polling_job_status - const requestOptions = { - headers: { - 'Content-Type': JSON_MIME_TYPE, - Authorization: `Bearer ${accessToken}`, - }, - }; - const pollUrl = `https://${munchkinId}.mktorest.com/bulk/v1/leads/batch/${event.importId}.json`; - const { processedResponse: pollStatus } = await handleHttpRequest( - 'get', - pollUrl, - requestOptions, - { - destType: 'marketo_bulk_upload', - feature: 'transformation', - endpointPath: '/leads/batch/importId.json', - requestMethod: 'GET', - module: 'router', - }, - ); - if (!isHttpStatusSuccess(pollStatus.status)) { - stats.counter(POLL_ACTIVITY, 1, { - status: pollStatus.status, - state: 'Retryable', - }); - throw new NetworkError( - `Could not poll status: due to error ${JSON.stringify(pollStatus.response)}`, - hydrateStatusForServer(pollStatus.status, 'During fetching poll status'), - ); - } - return handlePollResponse(pollStatus); -}; - -const responseHandler = async (event) => { - const pollResp = await getPollStatus(event); - // Server expects : - /** - * - * { - "Complete": true, - "statusCode": 200, - "hasFailed": true, - "InProgress": false, - "FailedJobURLs": "", // transformer URL - "HasWarning": false, - "WarningJobURLs": "", // transformer URL - } // Succesful Upload - { - "success": false, - "statusCode": 400, - "errorResponse": - } // Failed Upload - { - "success": false, - "Inprogress": true, - statusCode: 500, - } // Importing or Queue - - */ - if (pollResp) { - // As marketo lead import API or bulk API does not support record level error response we are considering - // file level errors only. - // ref: https://nation.marketo.com/t5/ideas/support-error-code-in-record-level-in-lead-bulk-api/idi-p/262191 - const { status, numOfRowsFailed, numOfRowsWithWarning, message } = pollResp.result[0]; - if (status === 'Complete') { - const response = { - Complete: true, - statusCode: 200, - InProgress: false, - hasFailed: numOfRowsFailed > 0, - FailedJobURLs: numOfRowsFailed > 0 ? '/getFailedJobs' : undefined, - HasWarning: numOfRowsWithWarning > 0, - WarningJobURLs: numOfRowsWithWarning > 0 ? '/getWarningJobs' : undefined, - }; - return removeUndefinedValues(response); - } - if (status === 'Importing' || status === 'Queued') { - return { - Complete: false, - statusCode: 500, - hasFailed: false, - InProgress: true, - HasWarning: false, - }; - } - if (status === 'Failed') { - return { - Complete: false, - statusCode: 500, - hasFailed: false, - InProgress: false, - HasWarning: false, - Error: message || 'Marketo Poll Status Failed', - }; - } - } - // when pollResp is null - return { - Complete: false, - statusCode: 500, - hasFailed: false, - InProgress: false, - HasWarning: false, - Error: 'No poll response received from Marketo', - }; -}; - -const processPolling = async (event) => { - const resp = await responseHandler(event); - return resp; -}; - -module.exports = { processPolling }; diff --git a/src/v0/destinations/marketo_bulk_upload/uploadFile/marketo_bulk_upload_example.csv b/src/v0/destinations/marketo_bulk_upload/uploadFile/marketo_bulk_upload_example.csv deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/v0/destinations/marketo_bulk_upload/util.js b/src/v0/destinations/marketo_bulk_upload/util.js deleted file mode 100644 index 033239b5e4..0000000000 --- a/src/v0/destinations/marketo_bulk_upload/util.js +++ /dev/null @@ -1,436 +0,0 @@ -const { - AbortedError, - RetryableError, - NetworkError, - TransformationError, - isDefinedAndNotNull, -} = require('@rudderstack/integrations-lib'); -const { handleHttpRequest } = require('../../../adapters/network'); -const tags = require('../../util/tags'); -const { isHttpStatusSuccess, generateUUID } = require('../../util'); -const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils'); -const stats = require('../../../util/stats'); -const { - ABORTABLE_CODES, - THROTTLED_CODES, - POLL_ACTIVITY, - UPLOAD_FILE, - FETCH_ACCESS_TOKEN, - POLL_STATUS_ERR_MSG, - FILE_UPLOAD_ERR_MSG, - ACCESS_TOKEN_FETCH_ERR_MSG, - SCHEMA_DATA_TYPE_MAP, -} = require('./config'); -const logger = require('../../../logger'); - -const getMarketoFilePath = () => - `${__dirname}/uploadFile/${Date.now()}_marketo_bulk_upload_${generateUUID()}.csv`; - -// Server only aborts when status code is 400 -const hydrateStatusForServer = (statusCode, context) => { - const status = Number(statusCode); - if (Number.isNaN(status)) { - throw new TransformationError(`${context}: Couldn't parse status code ${statusCode}`); - } - if (status >= 400 && status <= 499) { - return 400; - } - return status; -}; - -/** - * Handles common error responses returned from API calls. - * Checks the error code and throws the appropriate error object based on the code. - * - * @param {object} resp - The response object containing the error information. - * @param {string} opErrorMessage - The error message to be used if the error code is not recognized. - * @param {string} opActivity - The activity name for tracking purposes. - * @throws {AbortedError} - If the error code is abortable. - * @throws {ThrottledError} - If the error code is within the range of throttled codes. - * @throws {RetryableError} - If the error code is neither abortable nor throttled. - * - * @example - * const resp = { - * response: { - * errors: [ - * { - * code: "1003", - * message: "Empty File" - * } - * ] - * } - * }; - * - * try { - * handleCommonErrorResponse(resp, "Error message", "Activity"); - * } catch (error) { - * console.log(error); - * } - */ -const handleCommonErrorResponse = (apiCallResult, opErrorMessage, opActivity) => { - // checking for invalid/expired token errors and evicting cache in that case - // rudderJobMetadata contains some destination info which is being used to evict the cache - if ( - apiCallResult.response?.errors && - apiCallResult.response?.errors?.length > 0 && - apiCallResult.response?.errors.some( - (errorObj) => errorObj.code === '601' || errorObj.code === '602', - ) - ) { - throw new RetryableError( - `[${opErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, - ); - } - if ( - apiCallResult.response?.errors?.length > 0 && - apiCallResult.response?.errors[0] && - ((apiCallResult.response?.errors[0]?.code >= 1000 && - apiCallResult.response?.errors[0]?.code <= 1077) || - ABORTABLE_CODES.includes(apiCallResult.response?.errors[0]?.code)) - ) { - // for empty file the code is 1003 and that should be retried - stats.increment(opActivity, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError(apiCallResult.response?.errors[0]?.message || opErrorMessage, 400); - } else if (THROTTLED_CODES.includes(apiCallResult.response?.errors[0]?.code)) { - // for more than 10 concurrent uses the code is 615 and that should be retried - stats.increment(opActivity, { - status: 429, - state: 'Retryable', - }); - throw new RetryableError( - `[${opErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, - 500, - ); - } - // by default every thing will be retried - stats.increment(opActivity, { - status: 500, - state: 'Retryable', - }); - throw new RetryableError( - `[${opErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, - 500, - ); -}; - -const getAccessTokenURL = (config) => { - const { clientId, clientSecret, munchkinId } = config; - const url = `https://${munchkinId}.mktorest.com/identity/oauth/token?client_id=${clientId}&client_secret=${clientSecret}&grant_type=client_credentials`; - return url; -}; - -// Fetch access token from client id and client secret -// DOC: https://developers.marketo.com/rest-api/authentication/ -const getAccessToken = async (config) => { - const url = getAccessTokenURL(config); - const { processedResponse: accessTokenResponse } = await handleHttpRequest('get', url, { - destType: 'marketo_bulk_upload', - feature: 'transformation', - endpointPath: '/identity/oauth/token', - requestMethod: 'GET', - module: 'router', - }); - - // sample response : {response: '[ENOTFOUND] :: DNS lookup failed', status: 400} - if (!isHttpStatusSuccess(accessTokenResponse.status)) { - throw new NetworkError( - `Could not retrieve authorisation token due to error ${JSON.stringify(accessTokenResponse)}`, - hydrateStatusForServer(accessTokenResponse.status, FETCH_ACCESS_TOKEN), - { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(accessTokenResponse.status), - }, - accessTokenResponse, - ); - } - if (accessTokenResponse.response?.success === false) { - handleCommonErrorResponse(accessTokenResponse, ACCESS_TOKEN_FETCH_ERR_MSG, FETCH_ACCESS_TOKEN); - } - - // when access token is present - if (accessTokenResponse.response.access_token) { - /* This scenario will handle the case when we get the following response - status: 200 - respnse: {"access_token":"","token_type":"bearer","expires_in":0,"scope":"dummy@scope.com"} - wherein "expires_in":0 denotes that we should refresh the accessToken but its not expired yet. - */ - if (accessTokenResponse.response?.expires_in === 0) { - throw new RetryableError( - `Request Failed for marketo_bulk_upload, Access Token Expired (Retryable).`, - 500, - ); - } - return accessTokenResponse.response.access_token; - } - throw new RetryableError( - `Could not retrieve authorisation token due to error ${JSON.stringify(accessTokenResponse)}`, - 500, - ); -}; - -/** - * Handles the response of a polling operation. - * Checks for any errors in the response and calls the `handleCommonErrorResponse` function to handle them. - * If the response is successful, increments the stats and returns the response. - * Otherwise, returns null. - * - * @param {object} pollStatus - The response object from the polling operation. - * @returns {object|null} - The response object if the polling operation was successful, otherwise null. - */ -const handlePollResponse = (pollStatus) => { - // DOC: https://developers.marketo.com/rest-api/error-codes/ - if (pollStatus.response.errors) { - /* Sample error response for poll is: - - { - "requestId": "e42b#14272d07d78", - "success": false, - "errors": [ - { - "code": "601", - "message": "Unauthorized" - } - ] - } - */ - handleCommonErrorResponse(pollStatus, POLL_STATUS_ERR_MSG, POLL_ACTIVITY); - } - - /* - Sample Successful Poll response structure: - { - "requestId":"8136#146daebc2ed", - "success":true, - "result":[ - { - "batchId":, - "status":"Complete", - "numOfLeadsProcessed":2, - "numOfRowsFailed":1, - "numOfRowsWithWarning":0, - "message":"Import completed with errors, 2 records imported (2 members), 1 failed" - } - ] - } - */ - if (pollStatus.response?.success) { - stats.counter(POLL_ACTIVITY, 1, { - status: 200, - state: 'Success', - }); - - if (pollStatus.response?.result?.length > 0) { - return pollStatus.response; - } - } - - return null; -}; - -const handleFetchJobStatusResponse = (resp, type) => { - const marketoResponse = resp.response; - const marketoReposnseStatus = resp.status; - - if (!isHttpStatusSuccess(marketoReposnseStatus)) { - logger.info('[Network Error]:Failed during fetching job status', { marketoResponse, type }); - throw new NetworkError( - `Unable to fetch job status: due to error ${JSON.stringify(marketoResponse)}`, - hydrateStatusForServer(marketoReposnseStatus, 'During fetching job status'), - ); - } - - if (marketoResponse?.success === false) { - logger.info('[Application Error]Failed during fetching job status', { marketoResponse, type }); - throw new RetryableError( - `Failure during fetching job status due to error : ${marketoResponse}`, - 500, - resp, - ); - } - - /* - successful response : - { - response: 'city, email,Import Failure ReasonChennai,s…a,Value for lookup field 'email' not found', - status: 200 - } - - */ - - return marketoResponse; -}; - -/** - * Handles the response received after a file upload request. - * Checks for errors in the response and throws appropriate error objects based on the error codes. - * If the response indicates a successful upload, extracts the importId and returns it along with other job details. - * - * @param {object} resp - The response object received after a file upload request. - * @param {array} successfulJobs - An array to store details of successful jobs. - * @param {array} unsuccessfulJobs - An array to store details of unsuccessful jobs. - * @param {number} requestTime - The time taken for the request in milliseconds. - * @returns {object} - An object containing the importId, successfulJobs, and unsuccessfulJobs. - */ -const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, requestTime) => { - /* - For unsuccessful response - { - "requestId": "e42b#14272d07d78", - "success": false, - "errors": [ - { - "code": "1003", - "message": "Empty File" - } - ] - } - */ - if (resp.response?.errors) { - if (resp.response?.errors[0]?.code === '1003') { - stats.increment(UPLOAD_FILE, { - status: 500, - state: 'Retryable', - }); - throw new RetryableError( - `[${FILE_UPLOAD_ERR_MSG}]:Error Message ${resp.response.errors[0]?.message}`, - 500, - ); - } else { - handleCommonErrorResponse(resp, FILE_UPLOAD_ERR_MSG, UPLOAD_FILE); - } - } - - /** - * SuccessFul Upload Response : - { - "requestId": "d01f#15d672f8560", - "result": [ - { - "batchId": 3404, - "importId": "3404", - "status": "Queued" - } - ], - "success": true - } - */ - if ( - resp.response?.success && - resp.response?.result?.length > 0 && - resp.response?.result[0]?.importId - ) { - const { importId } = resp.response.result[0]; - stats.histogram('marketo_bulk_upload_upload_file_time', requestTime); - - stats.increment(UPLOAD_FILE, { - status: 200, - state: 'Success', - }); - return { importId, successfulJobs, unsuccessfulJobs }; - } - // if neither successful, nor the error message is appropriate sending importId as default null - return { importId: null, successfulJobs, unsuccessfulJobs }; -}; - -/** - * Retrieves the field schema mapping for a given access token and munchkin ID from the Marketo API. - * - * @param {string} accessToken - The access token used to authenticate the API request. - * @param {string} munchkinId - The munchkin ID of the Marketo instance. - * @returns {object} - The field schema mapping retrieved from the Marketo API. - */ -const getFieldSchemaMap = async (accessToken, munchkinId) => { - let fieldArr = []; - const fieldMap = {}; // map to store field name and data type - // ref: https://developers.marketo.com/rest-api/endpoint-reference/endpoint-index/#:~:text=Describe%20Lead2,leads/describe2.json - const { processedResponse: fieldSchemaMapping } = await handleHttpRequest( - 'get', - `https://${munchkinId}.mktorest.com/rest/v1/leads/describe2.json`, - { - params: { - access_token: accessToken, - }, - }, - { - destType: 'marketo_bulk_upload', - feature: 'transformation', - endpointPath: '/leads/describe2.json', - requestMethod: 'GET', - module: 'router', - }, - ); - if (fieldSchemaMapping.response.errors) { - handleCommonErrorResponse( - fieldSchemaMapping, - 'Error while fetching Marketo Field Schema', - 'FieldSchemaMapping', - ); - } - if ( - fieldSchemaMapping.response?.success && - fieldSchemaMapping.response?.result.length > 0 && - fieldSchemaMapping.response?.result[0] - ) { - fieldArr = - fieldSchemaMapping.response.result && Array.isArray(fieldSchemaMapping.response.result) - ? fieldSchemaMapping.response.result[0]?.fields - : []; - - fieldArr.forEach((field) => { - fieldMap[field?.name] = field?.dataType; - }); - } else { - throw new RetryableError( - `Failed to fetch Marketo Field Schema due to error ${JSON.stringify(fieldSchemaMapping)}`, - 500, - fieldSchemaMapping, - ); - } - return fieldMap; -}; - -/** - * Compares the data types of the fields in an event message with the expected data types defined in the field schema mapping. - * Identifies any mismatched fields and returns them as a map of job IDs and the corresponding invalid fields. - * - * @param {object} event - An object containing an `input` array of events. Each event has a `message` object with field-value pairs and a `metadata` object with a `job_id` property. - * @param {object} fieldSchemaMapping - An object containing the field schema mapping, which includes the expected data types for each field. - * @returns {object} - An object containing the job IDs as keys and the corresponding invalid fields as values. - */ -const checkEventStatusViaSchemaMatching = (event, fieldMap) => { - const mismatchedFields = {}; - const events = event.input; - events.forEach((ev) => { - const { message, metadata } = ev; - // eslint-disable-next-line @typescript-eslint/naming-convention - const { job_id } = metadata; - - Object.entries(message).forEach(([paramName, paramValue]) => { - const expectedDataType = SCHEMA_DATA_TYPE_MAP[fieldMap[paramName]]; - const actualDataType = typeof paramValue; - - if ( - isDefinedAndNotNull(paramValue) && - !mismatchedFields[job_id] && - actualDataType !== expectedDataType - ) { - mismatchedFields[job_id] = `invalid ${paramName}`; - } - }); - }); - return mismatchedFields; -}; - -module.exports = { - checkEventStatusViaSchemaMatching, - handlePollResponse, - handleFetchJobStatusResponse, - handleFileUploadResponse, - handleCommonErrorResponse, - hydrateStatusForServer, - getAccessToken, - getMarketoFilePath, - getFieldSchemaMap, -}; diff --git a/test/__tests__/data/marketo_bulk_upload_fileUpload_input.json b/test/__tests__/data/marketo_bulk_upload_fileUpload_input.json deleted file mode 100644 index 737cd36ec3..0000000000 --- a/test/__tests__/data/marketo_bulk_upload_fileUpload_input.json +++ /dev/null @@ -1,272 +0,0 @@ -[ - { - "request": { - "body": { - "config": { - "munchkinId": "munchkinId", - "clientId": "b", - "clientSecret": "clientSecret", - "columnFieldsMapping": [ - { - "to": "email", - "from": "email" - } - ] - }, - "input": [ - { - "message": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "metadata": { - "job_id": 17 - } - } - ], - "destType": "MARKETO_BULK_UPLOAD" - } - } - }, - { - "request": { - "body": { - "config": { - "munchkinId": "munchkinId", - "clientId": "b", - "clientSecret": "clientSecret", - "columnFieldsMapping": [ - { - "to": "email", - "from": "email" - } - ] - }, - "input": [ - { - "message": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "metadata": { - "job_id": 17 - } - } - ], - "destType": "MARKETO_BULK_UPLOAD" - } - } - }, - { - "request": { - "body": { - "config": { - "munchkinId": "munchkinId", - "clientId": "wrongClientId", - "clientSecret": "clientSecret", - "columnFieldsMapping": [ - { - "to": "email", - "from": "email" - } - ] - }, - "input": [ - { - "message": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "metadata": { - "job_id": 17 - } - } - ], - "destType": "MARKETO_BULK_UPLOAD" - } - } - }, - { - "request": { - "body": { - "config": { - "munchkinId": "a", - "clientId": "b", - "clientSecret": "forThrottle", - "columnFieldsMapping": [ - { - "to": "Email", - "from": "email" - } - ] - }, - "input": [ - { - "message": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "metadata": { - "job_id": 17 - } - } - ], - "destType": "MARKETO_BULK_UPLOAD" - } - } - }, - { - "request": { - "body": { - "config": { - "munchkinId": "munchkinId", - "clientId": "b", - "clientSecret": "clientSecret", - "columnFieldsMapping": [ - { - "to": "email", - "from": "email" - } - ] - }, - "input": [ - { - "message": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "metadata": { - "job_id": 17 - } - } - ], - "destType": "MARKETO_BULK_UPLOAD" - } - } - }, - { - "request": { - "body": { - "config": { - "munchkinId": "testMunchkin1", - "clientId": "b", - "clientSecret": "c", - "columnFieldsMapping": [ - { - "to": "email", - "from": "email" - } - ] - }, - "input": [ - { - "message": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "metadata": { - "job_id": 17 - } - } - ], - "destType": "MARKETO_BULK_UPLOAD" - } - } - }, - { - "request": { - "body": { - "config": { - "munchkinId": "testMunchkin2", - "clientId": "b", - "clientSecret": "c", - "columnFieldsMapping": [ - { - "to": "Email", - "from": "email" - } - ] - }, - "input": [ - { - "message": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "metadata": { - "job_id": 17 - } - } - ], - "destType": "MARKETO_BULK_UPLOAD" - } - } - }, - { - "request": { - "body": { - "config": { - "munchkinId": "testMunchkin3", - "clientId": "b", - "clientSecret": "c", - "columnFieldsMapping": [ - { - "to": "Email", - "from": "email" - } - ] - }, - "input": [ - { - "message": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "metadata": { - "job_id": 17 - } - } - ], - "destType": "MARKETO_BULK_UPLOAD" - } - } - }, - { - "request": { - "body": { - "config": { - "munchkinId": "testMunchkin4", - "clientId": "b", - "clientSecret": "c", - "columnFieldsMapping": [ - { - "to": "Email", - "from": "email" - } - ] - }, - "input": [ - { - "message": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "metadata": { - "job_id": 17 - } - } - ], - "destType": "MARKETO_BULK_UPLOAD" - } - } - } -] diff --git a/test/__tests__/data/marketo_bulk_upload_fileUpload_output.json b/test/__tests__/data/marketo_bulk_upload_fileUpload_output.json deleted file mode 100644 index 0ea94284ae..0000000000 --- a/test/__tests__/data/marketo_bulk_upload_fileUpload_output.json +++ /dev/null @@ -1,63 +0,0 @@ -[ - { - "statusCode": 200, - "importId": "2977", - "metadata": { - "successfulJobs": ["17"], - "unsuccessfulJobs": [], - "csvHeader": "email" - } - }, - { - "statusCode": 200, - "importId": "2977", - "metadata": { - "successfulJobs": ["17"], - "unsuccessfulJobs": [], - "csvHeader": "email" - } - }, - { - "statusCode": 200, - "importId": "2977", - "metadata": { - "successfulJobs": ["17"], - "unsuccessfulJobs": [], - "csvHeader": "email" - } - }, - { - "statusCode": 400, - "error": "The field Email is not present in Marketo Field Schema. Aborting", - "metadata": null - }, - { - "statusCode": 200, - "importId": "2977", - "metadata": { - "successfulJobs": ["17"], - "unsuccessfulJobs": [], - "csvHeader": "email" - } - }, - { - "statusCode": 400, - "error": "[Could not upload file]Error message: undefined", - "metadata": null - }, - { - "statusCode": 400, - "error": "[Could not upload file]Error message: There are 10 imports currently being processed. Please try again later", - "metadata": null - }, - { - "statusCode": 400, - "error": "[Could not upload file]Error message: Empty file", - "metadata": null - }, - { - "statusCode": 400, - "error": "[Could not upload file]Error message: Any other error", - "metadata": null - } -] diff --git a/test/__tests__/data/marketo_bulk_upload_input.json b/test/__tests__/data/marketo_bulk_upload_input.json deleted file mode 100644 index ce48c8e7fe..0000000000 --- a/test/__tests__/data/marketo_bulk_upload_input.json +++ /dev/null @@ -1,270 +0,0 @@ -[ - { - "message": { - "type": "identify", - "traits": { - "name": "Carlo Lombard", - "plan": "Quarterly Team+ Plan for Enuffsaid Media", - "email": "carlo@enuffsaid.media" - }, - "userId": 476335, - "context": { - "ip": "14.0.2.238", - "page": { - "url": "enuffsaid.proposify.com", - "path": "/settings", - "method": "POST", - "referrer": "https://enuffsaid.proposify.com/login" - }, - "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36" - }, - "rudderId": "786dfec9-jfh", - "messageId": "5d9bc6e2-ekjh" - }, - "destination": { - "ID": "1mMy5cqbtfuaKZv1IhVQKnBdVwe", - "Config": { - "munchkinId": "XXXX", - "clientId": "YYYY", - "clientSecret": "ZZZZ", - "columnFieldsMapping": [ - { - "to": "name__c", - "from": "name" - }, - { - "to": "email__c", - "from": "email" - }, - { - "to": "plan__c", - "from": "plan" - } - ] - } - } - }, - { - "message": { - "traits": { - "name": "Carlo Lombard", - "plan": "Quarterly Team+ Plan for Enuffsaid Media", - "email": "carlo@enuffsaid.media" - }, - "userId": 476335, - "context": { - "ip": "14.0.2.238", - "page": { - "url": "enuffsaid.proposify.com", - "path": "/settings", - "method": "POST", - "referrer": "https://enuffsaid.proposify.com/login" - }, - "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36" - }, - "rudderId": "786dfec9-jfh", - "messageId": "5d9bc6e2-ekjh" - }, - "destination": { - "ID": "1mMy5cqbtfuaKZv1IhVQKnBdVwe", - "Config": { - "munchkinId": "XXXX", - "clientId": "YYYY", - "clientSecret": "ZZZZ", - "columnFieldsMapping": [ - { - "to": "name__c", - "from": "name" - }, - { - "to": "email__c", - "from": "email" - }, - { - "to": "plan__c", - "from": "plan" - } - ] - } - } - }, - { - "message": { - "type": "track", - "traits": { - "name": "Carlo Lombard", - "plan": "Quarterly Team+ Plan for Enuffsaid Media", - "email": "carlo@enuffsaid.media" - }, - "userId": 476335, - "context": { - "ip": "14.0.2.238", - "page": { - "url": "enuffsaid.proposify.com", - "path": "/settings", - "method": "POST", - "referrer": "https://enuffsaid.proposify.com/login" - }, - "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36" - }, - "rudderId": "786dfec9-jfh", - "messageId": "5d9bc6e2-ekjh" - }, - "destination": { - "ID": "1mMy5cqbtfuaKZv1IhVQKnBdVwe", - "Config": { - "munchkinId": "XXXX", - "clientId": "YYYY", - "clientSecret": "ZZZZ", - "columnFieldsMapping": [ - { - "to": "name__c", - "from": "name" - }, - { - "to": "email__c", - "from": "email" - }, - { - "to": "plan__c", - "from": "plan" - } - ] - } - } - }, - { - "message": { - "type": "identify", - "traits": { - "name": "Carlo Lombard", - "plan": "Quarterly Team+ Plan for Enuffsaid Media", - "email": "carlo@enuffsaid.media" - }, - "userId": 476335, - "context": { - "ip": "14.0.2.238", - "page": { - "url": "enuffsaid.proposify.com", - "path": "/settings", - "method": "POST", - "referrer": "https://enuffsaid.proposify.com/login" - }, - "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36" - }, - "rudderId": "786dfec9-jfh", - "messageId": "5d9bc6e2-ekjh" - }, - "destination": { - "ID": "1mMy5cqbtfuaKZv1IhVQKnBdVwe", - "Config": { - "munchkinId": "XXXX", - "clientId": "YYYY", - "clientSecret": "ZZZZ", - "columnFieldsMapping": [ - { - "to": "name__c", - "from": "1" - }, - { - "to": "email__c", - "from": "email1" - }, - { - "to": "plan__c", - "from": "plan1" - } - ] - } - } - }, - { - "message": { - "type": "identify", - "traits": { - "name": "Carlo Lombard", - "plan": "Quarterly Team+ Plan for Enuffsaid Media", - "email": "carlo@enuffsaid.media" - }, - "userId": 476335, - "context": { - "ip": "14.0.2.238", - "page": { - "url": "enuffsaid.proposify.com", - "path": "/settings", - "method": "POST", - "referrer": "https://enuffsaid.proposify.com/login" - }, - "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36" - }, - "rudderId": "786dfec9-jfh", - "messageId": "5d9bc6e2-ekjh" - }, - "destination": { - "ID": "1mMy5cqbtfuaKZv1IhVQKnBdVwe", - "Config": { - "munchkinId": "XXXX", - "clientId": "YYYY", - "clientSecret": "ZZZZ", - "columnFieldsMapping": [ - { - "to": "name__c", - "from": "name" - }, - { - "to": "email__c", - "from": "email1" - }, - { - "to": "plan__c", - "from": "plan1" - } - ] - } - } - }, - { - "message": { - "type": "identify", - "traits": { - "name": "Carlo Lombard", - "plan": 1 - }, - "userId": 476335, - "context": { - "ip": "14.0.2.238", - "page": { - "url": "enuffsaid.proposify.com", - "path": "/settings", - "method": "POST", - "referrer": "https://enuffsaid.proposify.com/login" - }, - "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36" - }, - "rudderId": "786dfec9-jfh", - "messageId": "5d9bc6e2-ekjh" - }, - "destination": { - "ID": "1mMy5cqbtfuaKZv1IhVQKnBdVwe", - "Config": { - "munchkinId": "XXXX", - "clientId": "YYYY", - "clientSecret": "ZZZZ", - "columnFieldsMapping": [ - { - "to": "name__c", - "from": "name" - }, - { - "to": "email__c", - "from": "email" - }, - { - "to": "plan__c", - "from": "plan" - } - ] - } - } - } -] diff --git a/test/__tests__/data/marketo_bulk_upload_jobStatus_input.json b/test/__tests__/data/marketo_bulk_upload_jobStatus_input.json deleted file mode 100644 index b36363e0db..0000000000 --- a/test/__tests__/data/marketo_bulk_upload_jobStatus_input.json +++ /dev/null @@ -1,102 +0,0 @@ -[ - { - "request": { - "body": { - "destType": "MARKETO_BULK_UPLOAD", - "importId": 12345, - "input": [ - { - "message": { - "firstName": "aa", - "email": "bb" - }, - "metadata": { - "job_id": 2 - } - }, - { - "message": { - "firstName": "aa", - "email": "bb", - "phone": "99" - }, - "metadata": { - "job_id": 4 - } - }, - { - "message": { - "firstName": "aa", - "email": "bb" - }, - "metadata": { - "job_id": 3 - } - } - ], - "config": { - "clientId": "b", - "clientSecret": "c", - "munchkinId": "a", - "columnFieldsMapping": [ - { - "to": "Email", - "from": "email" - } - ] - }, - "metadata": {} - } - } - }, - { - "request": { - "body": { - "destType": "MARKETO_BULK_UPLOAD", - "importId": 12345, - "input": [ - { - "message": { - "firstName": "aa", - "email": "bb" - }, - "metadata": { - "job_id": 2 - } - }, - { - "message": { - "firstName": "aa", - "email": "bb", - "phone": "99" - }, - "metadata": { - "job_id": 4 - } - }, - { - "message": { - "firstName": "aa", - "email": "bb" - }, - "metadata": { - "job_id": 3 - } - } - ], - "config": { - "clientId": "b", - "clientSecret": "c", - "munchkinId": "testMunchkin3", - "columnFieldsMapping": [ - { - "to": "Email", - "from": "email" - } - ] - }, - "metadata": {} - } - } - } -] diff --git a/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json b/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json deleted file mode 100644 index 60628f6b3f..0000000000 --- a/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json +++ /dev/null @@ -1,28 +0,0 @@ -[ - { - "type": "warn", - "data": [ - { - "statusCode": 400, - "error": "No csvHeader in metadata" - }, - { - "statusCode": 400, - "error": "Unable to fetch job status: due to error \"\"" - } - ] - }, - { - "type": "fail", - "data": [ - { - "statusCode": 400, - "error": "No csvHeader in metadata" - }, - { - "statusCode": 400, - "error": "Unable to fetch job status: due to error \"\"" - } - ] - } -] diff --git a/test/__tests__/data/marketo_bulk_upload_output.json b/test/__tests__/data/marketo_bulk_upload_output.json deleted file mode 100644 index 9911a7e831..0000000000 --- a/test/__tests__/data/marketo_bulk_upload_output.json +++ /dev/null @@ -1,89 +0,0 @@ -[ - { - "version": "1", - "type": "REST", - "method": "POST", - "endpoint": "/fileUpload", - "headers": {}, - "params": {}, - "body": { - "JSON": { - "name__c": "Carlo Lombard", - "email__c": "carlo@enuffsaid.media", - "plan__c": "Quarterly Team+ Plan for Enuffsaid Media" - }, - "JSON_ARRAY": {}, - "XML": {}, - "FORM": {} - }, - "files": {} - }, - { - "statusCode": 400, - "error": "Event type is required", - "statTags": { - "destination": "marketo_bulk_upload", - "stage": "transform", - "scope": "exception" - } - }, - { - "statusCode": 400, - "error": "Event type track is not supported", - "statTags": { - "destination": "marketo_bulk_upload", - "stage": "transform", - "scope": "exception" - } - }, - { - "version": "1", - "type": "REST", - "method": "POST", - "endpoint": "/fileUpload", - "headers": {}, - "params": {}, - "body": { - "JSON": {}, - "JSON_ARRAY": {}, - "XML": {}, - "FORM": {} - }, - "files": {} - }, - { - "version": "1", - "type": "REST", - "method": "POST", - "endpoint": "/fileUpload", - "headers": {}, - "params": {}, - "body": { - "JSON": { - "name__c": "Carlo Lombard" - }, - "JSON_ARRAY": {}, - "XML": {}, - "FORM": {} - }, - "files": {} - }, - { - "version": "1", - "type": "REST", - "method": "POST", - "endpoint": "/fileUpload", - "headers": {}, - "params": {}, - "body": { - "JSON": { - "name__c": "Carlo Lombard", - "plan__c": 1 - }, - "JSON_ARRAY": {}, - "XML": {}, - "FORM": {} - }, - "files": {} - } -] diff --git a/test/__tests__/data/marketo_bulk_upload_poll_input.json b/test/__tests__/data/marketo_bulk_upload_poll_input.json deleted file mode 100644 index f5457bd79c..0000000000 --- a/test/__tests__/data/marketo_bulk_upload_poll_input.json +++ /dev/null @@ -1,59 +0,0 @@ -[ - { - "request": { - "body": { - "destType": "MARKETO_BULK_UPLOAD", - "importId": 1234, - "config": { - "clientId": "b", - "clientSecret": "c", - "columnFieldsMapping": [ - { - "from": "email", - "to": "Email" - } - ], - "munchkinId": "a" - } - } - } - }, - { - "request": { - "body": { - "destType": "MARKETO_BULK_UPLOAD", - "importId": 1234, - "config": { - "clientId": "b", - "clientSecret": "c", - "columnFieldsMapping": [ - { - "from": "email", - "to": "Email" - } - ], - "munchkinId": "testMunchkin4" - } - } - } - }, - { - "request": { - "body": { - "destType": "MARKETO_BULK_UPLOAD", - "importId": 1234, - "config": { - "clientId": "b", - "clientSecret": "c", - "columnFieldsMapping": [ - { - "from": "email", - "to": "Email" - } - ], - "munchkinId": "testMunchkin500" - } - } - } - } -] diff --git a/test/__tests__/data/marketo_bulk_upload_poll_output.json b/test/__tests__/data/marketo_bulk_upload_poll_output.json deleted file mode 100644 index 92e312072e..0000000000 --- a/test/__tests__/data/marketo_bulk_upload_poll_output.json +++ /dev/null @@ -1,17 +0,0 @@ -[ - { - "Complete": true, - "statusCode": 200, - "hasFailed": false, - "InProgress": false, - "HasWarning": false - }, - { - "statusCode": 400, - "error": "Any 400 error" - }, - { - "statusCode": 400, - "error": "[Could not poll status]Error message: Any 500 error" - } -] diff --git a/test/__tests__/marketo_bulk_upload.test.js b/test/__tests__/marketo_bulk_upload.test.js deleted file mode 100644 index 6cf4d559b9..0000000000 --- a/test/__tests__/marketo_bulk_upload.test.js +++ /dev/null @@ -1,127 +0,0 @@ -const fs = require("fs"); -const path = require("path"); -const vRouter = require("../../src/legacy/router"); - -const version = "v0"; -const integration = "marketo_bulk_upload"; -const transformer = require(`../../src/${version}/destinations/${integration}/transform`); - -jest.mock("axios"); -let reqTransformBody; -let respTransformBody; -let respFileUploadBody; -let reqFileUploadBody; -let reqPollBody; -let respPollBody; -let reqJobStatusBody; -let respJobStatusBody; - -try { - reqTransformBody = JSON.parse( - fs.readFileSync(path.resolve(__dirname, `./data/${integration}_input.json`)) - ); - respTransformBody = JSON.parse( - fs.readFileSync( - path.resolve(__dirname, `./data/${integration}_output.json`) - ) - ); - reqFileUploadBody = JSON.parse( - fs.readFileSync( - path.resolve(__dirname, `./data/${integration}_fileUpload_input.json`) - ) - ); - respFileUploadBody = JSON.parse( - fs.readFileSync( - path.resolve(__dirname, `./data/${integration}_fileUpload_output.json`) - ) - ); - reqPollBody = JSON.parse( - fs.readFileSync( - path.resolve(__dirname, `./data/${integration}_poll_input.json`) - ) - ); - respPollBody = JSON.parse( - fs.readFileSync( - path.resolve(__dirname, `./data/${integration}_poll_output.json`) - ) - ); - reqJobStatusBody = JSON.parse( - fs.readFileSync( - path.resolve(__dirname, `./data/${integration}_jobStatus_input.json`) - ) - ); - respJobStatusBody = JSON.parse( - fs.readFileSync( - path.resolve(__dirname, `./data/${integration}_jobStatus_output.json`) - ) - ); -} catch (error) { - throw new Error("Could not read files." + error); -} - -describe(`${integration} Tests`, () => { - describe("Transformer.js", () => { - reqTransformBody.forEach(async (input, index) => { - it(`Payload - ${index}`, async () => { - try { - const output = await transformer.process(input); - expect(output).toEqual(respTransformBody[index]); - } catch (error) { - expect(error.message).toEqual(respTransformBody[index].error); - } - }); - }); - }); - - describe("fileUpload.js", () => { - reqFileUploadBody.forEach(async (input, index) => { - it(`Payload - ${index}`, async () => { - try { - const output = await vRouter.fileUpload(input); - expect(output).toEqual(respFileUploadBody[index]); - } catch (error) { - expect(error.message).toEqual(respFileUploadBody[index].error); - } - }); - }); - }); - - describe("poll.js", () => { - reqPollBody.forEach(async (input, index) => { - it(`Payload - ${index}`, async () => { - try { - const output = await vRouter.pollStatus(input); - expect(output).toEqual(respPollBody[index]); - } catch (error) { - expect(error.message).toEqual(respPollBody[index].error); - } - }); - }); - }); - - describe("fetchJobStatus.js for warn", () => { - reqJobStatusBody.forEach(async (input, index) => { - it(`Payload - ${index}`, async () => { - try { - const output = await vRouter.getJobStatus(input, "warn"); - expect(output).toEqual(respJobStatusBody[0].data[index]); - } catch (error) { - expect(error.message).toEqual(respJobStatusBody[0].data[index].error); - } - }); - }); - }); - - describe("fetchJobStatus.js for fail", () => { - reqJobStatusBody.forEach(async (input, index) => { - it(`Payload - ${index}`, async () => { - try { - const output = await vRouter.getJobStatus(input, "fail"); - expect(output).toEqual(respJobStatusBody[1].data[index]); - } catch (error) { - expect(error.message).toEqual(respJobStatusBody[1].data[index].error); - } - }); - }); - }); -}); diff --git a/test/integrations/destinations/intercom_v2/network.ts b/test/integrations/destinations/intercom_v2/network.ts index 26ff3c38ee..e4cae04d07 100644 --- a/test/integrations/destinations/intercom_v2/network.ts +++ b/test/integrations/destinations/intercom_v2/network.ts @@ -746,6 +746,108 @@ const deliveryCallsData = [ }, }, }, + { + httpReq: { + method: 'post', + url: 'https://api.intercom.io/contacts/search', + data: { + query: { + operator: 'AND', + value: [{ field: 'email', operator: '=', value: 'test-rETL-available@gmail.com' }], + }, + }, + headers, + }, + httpRes: { + status: 200, + statusText: 'ok', + data: { + type: 'list', + total_count: 0, + pages: { + type: 'pages', + page: 1, + per_page: 50, + total_pages: 0, + }, + data: [ + { + type: 'contact', + id: 'retl-available-contact-id', + workspace_id: 'rudderWorkspace', + external_id: 'detach-company-user-id', + role: 'user', + email: 'test-rETL-available@gmail.com', + }, + ], + }, + }, + }, + { + httpReq: { + method: 'post', + url: 'https://api.intercom.io/contacts/search', + data: { + query: { + operator: 'AND', + value: [{ field: 'email', operator: '=', value: 'test-rETL-unavailable@gmail.com' }], + }, + }, + headers, + }, + httpRes: { + status: 200, + statusText: 'ok', + data: { + type: 'list', + total_count: 0, + pages: { + type: 'pages', + page: 1, + per_page: 50, + total_pages: 0, + }, + data: [], + }, + }, + }, + { + httpReq: { + method: 'post', + url: 'https://api.au.intercom.io/contacts/search', + data: { + query: { + operator: 'AND', + value: [{ field: 'external_id', operator: '=', value: 'known-user-id-1' }], + }, + }, + headers, + }, + httpRes: { + status: 200, + statusText: 'ok', + data: { + type: 'list', + total_count: 0, + pages: { + type: 'pages', + page: 1, + per_page: 50, + total_pages: 0, + }, + data: [ + { + type: 'contact', + id: 'contact-id-by-intercom-known-user-id-1', + workspace_id: 'rudderWorkspace', + external_id: 'user-id-1', + role: 'user', + email: 'test@rudderlabs.com', + }, + ], + }, + }, + }, ]; export const networkCallsData = [...deliveryCallsData]; diff --git a/test/integrations/destinations/intercom_v2/router/data.ts b/test/integrations/destinations/intercom_v2/router/data.ts index 7656914059..75f5ba6ae7 100644 --- a/test/integrations/destinations/intercom_v2/router/data.ts +++ b/test/integrations/destinations/intercom_v2/router/data.ts @@ -17,6 +17,7 @@ import { userTraits, } from '../common'; import { RouterTestData } from '../../../testTypes'; +import { rETLRecordV2RouterRequest } from './rETL'; const routerRequest1: RouterTransformationRequest = { input: [ @@ -222,6 +223,26 @@ const routerRequest3: RouterTransformationRequest = { }, metadata: generateMetadata(3), }, + { + destination: destinationApiServerAU, + message: { + userId: 'known-user-id-1', + channel, + context: { + traits: { ...userTraits, external_id: 'known-user-id-1' }, + }, + type: 'identify', + integrations: { + All: true, + Intercom: { + lookup: 'external_id', + }, + }, + originalTimestamp, + timestamp, + }, + metadata: generateMetadata(4), + }, ], destType: 'intercom_v2', }; @@ -735,6 +756,38 @@ export const data: RouterTestData[] = [ metadata: [generateMetadata(3)], statusCode: 400, }, + { + batched: false, + batchedRequest: { + body: { + JSON: { + email: 'test@rudderlabs.com', + external_id: 'known-user-id-1', + name: 'John Snow', + owner_id: 13, + phone: '+91 9999999999', + custom_attributes: { + address: 'california usa', + age: 23, + }, + }, + XML: {}, + FORM: {}, + JSON_ARRAY: {}, + }, + endpoint: + 'https://api.au.intercom.io/contacts/contact-id-by-intercom-known-user-id-1', + files: {}, + headers, + method: 'PUT', + params: {}, + type: 'REST', + version: '1', + }, + destination: destinationApiServerAU, + metadata: [generateMetadata(4)], + statusCode: 200, + }, ], }, }, @@ -880,4 +933,150 @@ export const data: RouterTestData[] = [ }, }, }, + { + id: 'INTERCOM-V2-router-test-6', + scenario: 'Framework', + successCriteria: 'Some events should be transformed successfully and some should fail for rETL', + name: 'intercom_v2', + description: 'INTERCOM V2 rETL tests', + feature: 'router', + module: 'destination', + version: 'v0', + input: { + request: { + body: rETLRecordV2RouterRequest, + }, + }, + output: { + response: { + status: 200, + body: { + output: [ + { + batched: false, + batchedRequest: { + body: { + JSON: { + email: 'test-rETL-unavailable@gmail.com', + external_id: 'rEtl_external_id', + }, + XML: {}, + FORM: {}, + JSON_ARRAY: {}, + }, + endpoint: 'https://api.intercom.io/contacts', + files: {}, + headers, + method: 'POST', + params: {}, + type: 'REST', + version: '1', + }, + destination: destination, + metadata: [generateMetadata(1)], + statusCode: 200, + }, + { + batched: false, + batchedRequest: { + body: { + JSON: { + external_id: 'rEtl_external_id', + }, + XML: {}, + FORM: {}, + JSON_ARRAY: {}, + }, + endpoint: 'https://api.intercom.io/contacts/retl-available-contact-id', + files: {}, + headers, + method: 'PUT', + params: {}, + type: 'REST', + version: '1', + }, + destination: destination, + metadata: [generateMetadata(2)], + statusCode: 200, + }, + { + batched: false, + batchedRequest: { + body: { + JSON: {}, + XML: {}, + FORM: {}, + JSON_ARRAY: {}, + }, + endpoint: 'https://api.intercom.io/contacts/retl-available-contact-id', + files: {}, + headers, + method: 'DELETE', + params: {}, + type: 'REST', + version: '1', + }, + destination: destination, + metadata: [generateMetadata(3)], + statusCode: 200, + }, + { + batched: false, + error: 'Contact is not present. Aborting.', + statTags: { + ...RouterInstrumentationErrorStatTags, + errorType: 'configuration', + }, + destination, + metadata: [generateMetadata(4)], + statusCode: 400, + }, + { + batched: false, + batchedRequest: { + body: { + JSON: { + external_id: 'rEtl_external_id', + }, + XML: {}, + FORM: {}, + JSON_ARRAY: {}, + }, + endpoint: 'https://api.intercom.io/contacts/retl-available-contact-id', + files: {}, + headers, + method: 'PUT', + params: {}, + type: 'REST', + version: '1', + }, + destination: destination, + metadata: [generateMetadata(5)], + statusCode: 200, + }, + { + batched: false, + error: 'action dummyaction is not supported.', + statTags: { + ...RouterInstrumentationErrorStatTags, + }, + destination, + metadata: [generateMetadata(6)], + statusCode: 400, + }, + { + batched: false, + error: 'Missing lookup field or lookup field value for searchContact', + statTags: { + ...RouterInstrumentationErrorStatTags, + }, + destination, + metadata: [generateMetadata(7)], + statusCode: 400, + }, + ], + }, + }, + }, + }, ]; diff --git a/test/integrations/destinations/intercom_v2/router/rETL.ts b/test/integrations/destinations/intercom_v2/router/rETL.ts new file mode 100644 index 0000000000..0a36b8cfa6 --- /dev/null +++ b/test/integrations/destinations/intercom_v2/router/rETL.ts @@ -0,0 +1,182 @@ +import { RouterTransformationRequest } from '../../../../../src/types'; +import { destination } from '../common'; +import { generateMetadata } from '../../../testUtils'; + +export const rETLRecordV2RouterRequest: RouterTransformationRequest = { + input: [ + { + destination, + message: { + type: 'record', + action: 'insert', + fields: { + external_id: 'rEtl_external_id', + }, + channel: 'sources', + context: { + sources: { + job_id: 'job-id', + version: 'local', + job_run_id: 'job_run_id', + task_run_id: 'job_run_id', + }, + }, + recordId: '1', + rudderId: '1', + identifiers: { + email: 'test-rETL-unavailable@gmail.com', + }, + }, + metadata: generateMetadata(1), + }, + { + destination, + message: { + type: 'record', + action: 'update', + fields: { + external_id: 'rEtl_external_id', + }, + channel: 'sources', + context: { + sources: { + job_id: 'job-id', + version: 'local', + job_run_id: 'job_run_id', + task_run_id: 'job_run_id', + }, + }, + recordId: '2', + rudderId: '2', + identifiers: { + email: 'test-rETL-available@gmail.com', + }, + }, + metadata: generateMetadata(2), + }, + { + destination, + message: { + type: 'record', + action: 'delete', + fields: { + external_id: 'rEtl_external_id', + }, + channel: 'sources', + context: { + sources: { + job_id: 'job-id', + version: 'local', + job_run_id: 'job_run_id', + task_run_id: 'job_run_id', + }, + }, + recordId: '3', + rudderId: '3', + identifiers: { + email: 'test-rETL-available@gmail.com', + }, + }, + metadata: generateMetadata(3), + }, + { + destination, + message: { + type: 'record', + action: 'update', + fields: { + external_id: 'rEtl_external_id', + }, + channel: 'sources', + context: { + sources: { + job_id: 'job-id', + version: 'local', + job_run_id: 'job_run_id', + task_run_id: 'job_run_id', + }, + }, + recordId: '1', + rudderId: '1', + identifiers: { + email: 'test-rETL-unavailable@gmail.com', + }, + }, + metadata: generateMetadata(4), + }, + { + destination, + message: { + type: 'record', + action: 'insert', + fields: { + external_id: 'rEtl_external_id', + }, + channel: 'sources', + context: { + sources: { + job_id: 'job-id', + version: 'local', + job_run_id: 'job_run_id', + task_run_id: 'job_run_id', + }, + }, + recordId: '1', + rudderId: '1', + identifiers: { + email: 'test-rETL-available@gmail.com', + }, + }, + metadata: generateMetadata(5), + }, + { + destination, + message: { + type: 'record', + action: 'dummyAction', + fields: { + external_id: 'rEtl_external_id', + }, + channel: 'sources', + context: { + sources: { + job_id: 'job-id', + version: 'local', + job_run_id: 'job_run_id', + task_run_id: 'job_run_id', + }, + }, + recordId: '1', + rudderId: '1', + identifiers: { + email: 'test-rETL-available@gmail.com', + }, + }, + metadata: generateMetadata(6), + }, + { + destination, + message: { + type: 'record', + action: 'insert', + fields: { + external_id: 'rEtl_external_id', + }, + channel: 'sources', + context: { + sources: { + job_id: 'job-id', + version: 'local', + job_run_id: 'job_run_id', + task_run_id: 'job_run_id', + }, + }, + recordId: '1', + rudderId: '1', + identifiers: {}, + }, + metadata: generateMetadata(7), + }, + ], + destType: 'intercom_v2', +};