diff --git a/CHANGELOG.md b/CHANGELOG.md index 0aa1a58d3d..552026d41b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,47 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [1.40.2](https://github.com/rudderlabs/rudder-transformer/compare/v1.40.1...v1.40.2) (2023-09-06) + + +### Bug Fixes + +* marketo bulk upload import issue ([#2559](https://github.com/rudderlabs/rudder-transformer/issues/2559)) ([752f351](https://github.com/rudderlabs/rudder-transformer/commit/752f351f02b7f7611c702d7dbcb4804972bb0970)) + +### [1.40.1](https://github.com/rudderlabs/rudder-transformer/compare/v1.40.0...v1.40.1) (2023-09-06) + + +### Bug Fixes + +* **google_ads_offline_conversions:** partial failure status code issue ([abfce44](https://github.com/rudderlabs/rudder-transformer/commit/abfce44067dbcefe7f2db90a5bd8e2895fd49ea9)) +* **google_ads_offline_conversions:** partial failure status code issue ([#2552](https://github.com/rudderlabs/rudder-transformer/issues/2552)) ([ae90087](https://github.com/rudderlabs/rudder-transformer/commit/ae900872680fd258dbb7cf10d5bfe6f02def94a5)) +* type issue in lookup via externalid, row lock error as retryable ([#2553](https://github.com/rudderlabs/rudder-transformer/issues/2553)) ([319ff90](https://github.com/rudderlabs/rudder-transformer/commit/319ff903059f21f8b11df3e984547a82f35e7ceb)) +* update datafile lookup error message ([#2555](https://github.com/rudderlabs/rudder-transformer/issues/2555)) ([c4aff36](https://github.com/rudderlabs/rudder-transformer/commit/c4aff3626a1f75059bd6a09edff1e38b4e6fc4e4)) + +## [1.40.0](https://github.com/rudderlabs/rudder-transformer/compare/v1.39.1...v1.40.0) (2023-09-04) + + +### Features + +* add eu instance support to kustomer destination ([#2513](https://github.com/rudderlabs/rudder-transformer/issues/2513)) ([34dbabf](https://github.com/rudderlabs/rudder-transformer/commit/34dbabfcec610b87a0a1512743528bef2e4b69ae)) +* blank audience support in google ads ([#2526](https://github.com/rudderlabs/rudder-transformer/issues/2526)) ([54d3704](https://github.com/rudderlabs/rudder-transformer/commit/54d3704a9dea612b98735f7191351e8195af205a)) +* eloqua new destination cdkv2 ([#2501](https://github.com/rudderlabs/rudder-transformer/issues/2501)) ([1db0573](https://github.com/rudderlabs/rudder-transformer/commit/1db0573eff0a0091248ffa2107fd8064a03ce2dd)) +* **ga4:** added support of campaign_details event ([#2542](https://github.com/rudderlabs/rudder-transformer/issues/2542)) ([95920b8](https://github.com/rudderlabs/rudder-transformer/commit/95920b8a851e1e78a7154dae222033c7f34b3c09)) +* **posthog:** support timestamp mapping from properties ([#2507](https://github.com/rudderlabs/rudder-transformer/issues/2507)) ([88392d7](https://github.com/rudderlabs/rudder-transformer/commit/88392d70b73525a15933e5a83a25df7d6c9417ee)) +* retl audience support google ads ([#2530](https://github.com/rudderlabs/rudder-transformer/issues/2530)) ([804aa79](https://github.com/rudderlabs/rudder-transformer/commit/804aa79113ed628d4c4dc92ad5dd4aa347aabe5a)) +* support for profiles event in redis ([#2497](https://github.com/rudderlabs/rudder-transformer/issues/2497)) ([f0c0a21](https://github.com/rudderlabs/rudder-transformer/commit/f0c0a211d167be2393c92db0a37dd517b1dbd1c4)) + + +### Bug Fixes + +* **braze:** enable merge behaviour to stitch user data ([#2508](https://github.com/rudderlabs/rudder-transformer/issues/2508)) ([8a2cf93](https://github.com/rudderlabs/rudder-transformer/commit/8a2cf93d9e83954edf1878390c254fb88a6c83c7)) +* **gaoc:** custom variables issue ([#2545](https://github.com/rudderlabs/rudder-transformer/issues/2545)) ([3afee53](https://github.com/rudderlabs/rudder-transformer/commit/3afee53759e19765c4a284910cfd86e774dc0a24)) +* **INT-512:** removed personal information from test cases ([#2517](https://github.com/rudderlabs/rudder-transformer/issues/2517)) ([9582e31](https://github.com/rudderlabs/rudder-transformer/commit/9582e31b9398f8d9bb01c431fd573fc54dbf7b3d)) +* **iterable:** squadcast alert ([#2535](https://github.com/rudderlabs/rudder-transformer/issues/2535)) ([5a2194b](https://github.com/rudderlabs/rudder-transformer/commit/5a2194baa2c07d5b0fbe7bd7f4cfdec9117661ba)) +* missing type for page and group calls ([#2512](https://github.com/rudderlabs/rudder-transformer/issues/2512)) ([bf08b9e](https://github.com/rudderlabs/rudder-transformer/commit/bf08b9e7177dbe7920e50e014484189a0c336b75)) +* remove secure environment for datafile call ([#2544](https://github.com/rudderlabs/rudder-transformer/issues/2544)) ([b069e26](https://github.com/rudderlabs/rudder-transformer/commit/b069e262e9864a60611ee1b1e8e6c91dad76b7f4)) +* fix: marketo bulk upload bugs and refactor ([#2414](https://github.com/rudderlabs/rudder-transformer/issues/2414)) ([9e3ace1](https://github.com/rudderlabs/rudder-transformer/pull/2546/commits/9e3ace17012f8fae3db35608367d98840037d1c0)) + ### [1.39.1](https://github.com/rudderlabs/rudder-transformer/compare/v1.39.0...v1.39.1) (2023-08-28) diff --git a/package-lock.json b/package-lock.json index 387d2a1671..9372b3add9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "rudder-transformer", - "version": "1.39.1", + "version": "1.40.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "rudder-transformer", - "version": "1.39.1", + "version": "1.40.2", "license": "ISC", "dependencies": { "@amplitude/ua-parser-js": "^0.7.24", diff --git a/package.json b/package.json index 557bc9b4cb..aecf5d581e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rudder-transformer", - "version": "1.39.1", + "version": "1.40.2", "description": "", "homepage": "https://github.com/rudderlabs/rudder-transformer#readme", "bugs": { diff --git a/src/cdk/v2/bindings/default.js b/src/cdk/v2/bindings/default.js index 4c9b046214..0bba7210f0 100644 --- a/src/cdk/v2/bindings/default.js +++ b/src/cdk/v2/bindings/default.js @@ -34,15 +34,15 @@ function assertConfig(val, message) { } } -function assertHttpResp(response, message) { - if (!isHttpStatusSuccess(response.status)) { +function assertHttpResp(processedResponse, message) { + if (!isHttpStatusSuccess(processedResponse.status)) { throw new NetworkError( message, - message.status, + processedResponse.status, { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(response.status), + [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(processedResponse.status), }, - response, + processedResponse.response, ); } } diff --git a/src/cdk/v2/destinations/optimizely_fullstack/procWorkflow.yaml b/src/cdk/v2/destinations/optimizely_fullstack/procWorkflow.yaml index f796036acf..4d90065f7e 100644 --- a/src/cdk/v2/destinations/optimizely_fullstack/procWorkflow.yaml +++ b/src/cdk/v2/destinations/optimizely_fullstack/procWorkflow.yaml @@ -61,8 +61,8 @@ steps: const dataFileUrl = .destination.Config.dataFileUrl; const rawResponse = await $.handleHttpRequest("get", dataFileUrl); const processedResponse = rawResponse.processedResponse; - $.assertHttpResp(processedResponse, "Data File Lookup Failed"); - processedResponse.response + $.assertHttpResp(processedResponse, "Data File Lookup Failed due to " + JSON.stringify(processedResponse.response)); + processedResponse.response; - name: validateDateFileForTrackPageAndScreen condition: $.outputs.messageType in [ {{$.EventType.TRACK}}, {{$.EventType.PAGE}}, {{$.EventType.SCREEN}}] diff --git a/src/controllers/bulkUpload.ts b/src/controllers/bulkUpload.ts index 80fb9b9dd0..e4d60b2021 100644 --- a/src/controllers/bulkUpload.ts +++ b/src/controllers/bulkUpload.ts @@ -44,7 +44,7 @@ export const fileUpload = async (ctx) => { response = await destFileUploadHandler.processFileData(ctx.request.body); } catch (error: any) { response = { - statusCode: error.response ? error.response.status : 400, + statusCode: error?.response?.status || error?.status || 400, error: error.message || 'Error occurred while processing payload.', metadata: error.response ? error.response.metadata : null, }; @@ -89,7 +89,7 @@ export const pollStatus = async (ctx) => { response = await destFileUploadHandler.processPolling(ctx.request.body); } catch (error: any) { response = { - statusCode: error.response?.status || 400, + statusCode: error.response?.status || error?.status || 400, error: error.message || 'Error occurred while processing payload.', }; errNotificationClient.notify(error, 'Poll Status', { diff --git a/src/util/prometheus.js b/src/util/prometheus.js index e1f0ee724b..2d781fb862 100644 --- a/src/util/prometheus.js +++ b/src/util/prometheus.js @@ -424,7 +424,7 @@ class Prometheus { name: 'marketo_bulk_upload_polling', help: 'marketo_bulk_upload_polling', type: 'counter', - labelNames: ['status', 'state'], + labelNames: ['status', 'state', 'requestTime'], }, { name: 'marketo_fetch_token', @@ -789,6 +789,18 @@ class Prometheus { type: 'counter', labelNames: ['http_status', 'destination_id'], }, + { + name: 'marketo_bulk_upload_upload_file_succJobs', + help: 'marketo_bulk_upload_upload_file_succJobs', + type: 'counter', + labelNames: [], + }, + { + name: 'marketo_bulk_upload_upload_file_unsuccJobs', + help: 'marketo_bulk_upload_upload_file_unsuccJobs', + type: 'counter', + labelNames: [], + }, { name: 'braze_lookup_time', help: 'braze look-up time', @@ -874,6 +886,43 @@ class Prometheus { labelNames: ['processSessions'], buckets: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 150, 200], }, + { + name: 'marketo_bulk_upload_create_header_time', + help: 'marketo_bulk_upload_create_header_time', + type: 'histogram', + labelNames: [], + }, + { + name: 'marketo_bulk_upload_fetch_job_time', + help: 'marketo_bulk_upload_fetch_job_time', + type: 'histogram', + labelNames: [], + }, + { + name: 'marketo_bulk_upload_fetch_job_create_response_time', + help: 'marketo_bulk_upload_fetch_job_create_response_time', + type: 'histogram', + labelNames: [], + }, + { + name: 'marketo_bulk_upload_create_file_time', + help: 'marketo_bulk_upload_create_file_time', + type: 'histogram', + labelNames: [], + }, + { + name: 'marketo_bulk_upload_upload_file_time', + help: 'marketo_bulk_upload_upload_file_time', + type: 'histogram', + labelNames: [], + }, + { + name: 'marketo_bulk_upload_create_csvloop_time', + help: 'marketo_bulk_upload_create_csvloop_time', + type: 'histogram', + labelNames: [], + }, + ]; metrics.forEach((metric) => { diff --git a/src/v0/destinations/google_adwords_offline_conversions/networkHandler.js b/src/v0/destinations/google_adwords_offline_conversions/networkHandler.js index 5a8942a25b..963721f024 100644 --- a/src/v0/destinations/google_adwords_offline_conversions/networkHandler.js +++ b/src/v0/destinations/google_adwords_offline_conversions/networkHandler.js @@ -270,9 +270,9 @@ const responseHandler = (destinationResponse) => { if (partialFailureError && partialFailureError.code !== 0) { throw new NetworkError( `[Google Ads Offline Conversions]:: partialFailureError - ${partialFailureError?.message}`, - status, + 400, { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status), + [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(400), }, partialFailureError, ); diff --git a/src/v0/destinations/intercom/networkHandler.js b/src/v0/destinations/intercom/networkHandler.js new file mode 100644 index 0000000000..4133fbe4dc --- /dev/null +++ b/src/v0/destinations/intercom/networkHandler.js @@ -0,0 +1,37 @@ +const { proxyRequest, prepareProxyRequest } = require('../../../adapters/network'); +const { processAxiosResponse } = require('../../../adapters/utils/networkUtils'); + +const { RetryableError } = require('../../util/errorTypes'); + +const errorResponseHandler = (destinationResponse, dest) => { + const { status } = destinationResponse; + if (status === 408) { + throw new RetryableError( + `[Intercom Response Handler] Request failed for destination ${dest} with status: ${status}`, + 500, + destinationResponse, + ); + } +}; + +const destResponseHandler = (destinationResponse, dest) => { + errorResponseHandler(destinationResponse, dest); + return { + destinationResponse: destinationResponse.response, + message: 'Request Processed Successfully', + status: destinationResponse.status, + }; +}; + +class networkHandler { + constructor() { + this.responseHandler = destResponseHandler; + this.proxy = proxyRequest; + this.prepareProxy = prepareProxyRequest; + this.processAxiosResponse = processAxiosResponse; + } +} + +module.exports = { + networkHandler, +}; diff --git a/src/v0/destinations/marketo_bulk_upload/config.js b/src/v0/destinations/marketo_bulk_upload/config.js new file mode 100644 index 0000000000..487e11fe24 --- /dev/null +++ b/src/v0/destinations/marketo_bulk_upload/config.js @@ -0,0 +1,36 @@ +const ABORTABLE_CODES = ['601', '603', '605', '609', '610']; +const RETRYABLE_CODES = ['713', '602', '604', '611']; +const THROTTLED_CODES = ['502', '606', '607', '608', '615']; + +const MARKETO_FILE_SIZE = 10485760; +const MARKETO_FILE_PATH = `${__dirname}/uploadFile/marketo_bulkupload.csv`; + +const FETCH_ACCESS_TOKEN = 'marketo_bulk_upload_access_token_fetching'; + +const POLL_ACTIVITY = 'marketo_bulk_upload_polling'; +const POLL_STATUS_ERR_MSG = 'Could not poll status'; + +const UPLOAD_FILE = 'marketo_bulk_upload_upload_file'; +const FILE_UPLOAD_ERR_MSG = 'Could not upload file'; + +const JOB_STATUS_ACTIVITY = 'marketo_bulk_upload_get_job_status'; +const FETCH_FAILURE_JOB_STATUS_ERR_MSG = 'Could not fetch failure job status'; +const FETCH_WARNING_JOB_STATUS_ERR_MSG = 'Could not fetch warning job status'; +const ACCESS_TOKEN_FETCH_ERR_MSG = 'Error during fetching access token'; + +module.exports = { + ABORTABLE_CODES, + RETRYABLE_CODES, + THROTTLED_CODES, + MARKETO_FILE_SIZE, + POLL_ACTIVITY, + UPLOAD_FILE, + JOB_STATUS_ACTIVITY, + MARKETO_FILE_PATH, + FETCH_ACCESS_TOKEN, + POLL_STATUS_ERR_MSG, + FILE_UPLOAD_ERR_MSG, + FETCH_FAILURE_JOB_STATUS_ERR_MSG, + FETCH_WARNING_JOB_STATUS_ERR_MSG, + ACCESS_TOKEN_FETCH_ERR_MSG, +}; diff --git a/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js b/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js index 7e7474c2a4..04eadc4c51 100644 --- a/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js +++ b/src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js @@ -1,31 +1,21 @@ /* eslint-disable no-restricted-syntax */ /* eslint-disable no-prototype-builtins */ -const { - getAccessToken, - ABORTABLE_CODES, - THROTTLED_CODES, - RETRYABLE_CODES, - JOB_STATUS_ACTIVITY, -} = require('./util'); -const { httpGET } = require('../../../adapters/network'); -const { - AbortedError, - RetryableError, - ThrottledError, - PlatformError, -} = require('../../util/errorTypes'); +const { getAccessToken } = require('./util'); +const { handleHttpRequest } = require('../../../adapters/network'); +const { PlatformError } = require('../../util/errorTypes'); const stats = require('../../../util/stats'); const { JSON_MIME_TYPE } = require('../../util/constant'); +const { + handleFetchJobStatusResponse, + getFieldSchemaMap, + checkEventStatusViaSchemaMatching, +} = require('./util'); +const { removeUndefinedValues } = require('../../util'); -const FETCH_FAILURE_JOB_STATUS_ERR_MSG = 'Could not fetch failure job status'; -const FAILURE_JOB_STATUS_ERR_MSG = 'Error during fetching failure job status'; -const FETCH_WARNING_JOB_STATUS_ERR_MSG = 'Could not fetch warning job status'; -const WARNING_JOB_STATUS_ERR_MSG = 'Error during fetching warning job status'; - -const getFailedJobStatus = async (event) => { +const getJobsStatus = async (event, type, accessToken) => { const { config, importId } = event; - const accessToken = await getAccessToken(config); const { munchkinId } = config; + let url; // Get status of each lead for failed leads // DOC: https://developers.marketo.com/rest-api/bulk-import/bulk-lead-import/#failures const requestOptions = { @@ -34,270 +24,123 @@ const getFailedJobStatus = async (event) => { Authorization: `Bearer ${accessToken}`, }, }; - const failedLeadUrl = `https://${munchkinId}.mktorest.com/bulk/v1/leads/batch/${importId}/failures.json`; - const startTime = Date.now(); - const resp = await httpGET(failedLeadUrl, requestOptions, { - destType: 'marketo_bulk_upload', - feature: 'transformation', - }); - const endTime = Date.now(); - const requestTime = endTime - startTime; - - stats.gauge('marketo_bulk_upload_fetch_job_time', requestTime); - if (resp.success) { - if (resp.response && resp.response.data) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 200, - state: 'Success', - }); - return resp.response; - } - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError(FETCH_FAILURE_JOB_STATUS_ERR_MSG, 400, resp); - } - if (resp.response) { - if ( - ABORTABLE_CODES.includes(resp.response.code) || - (resp.response.code >= 400 && resp.response.code <= 499) - ) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError(resp.response.code, 400, resp); - } else if (RETRYABLE_CODES.includes(resp.response.code)) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 500, - state: 'Retryable', - }); - throw new RetryableError(resp.response.code, 500, resp); - } else if (resp.response.response) { - if (ABORTABLE_CODES.includes(resp.response.response.status)) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError( - resp.response.response.statusText || FAILURE_JOB_STATUS_ERR_MSG, - 400, - resp, - ); - } else if (THROTTLED_CODES.includes(resp.response.response.status)) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 500, - state: 'Retryable', - }); - throw new ThrottledError( - resp.response.response.statusText || FAILURE_JOB_STATUS_ERR_MSG, - resp, - ); - } - stats.increment(JOB_STATUS_ACTIVITY, { - status: 500, - state: 'Retryable', - }); - throw new RetryableError( - resp.response.response.statusText || FAILURE_JOB_STATUS_ERR_MSG, - 500, - resp, - ); - } - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError(FETCH_FAILURE_JOB_STATUS_ERR_MSG, 400, resp); + if (type === 'fail') { + url = `https://${munchkinId}.mktorest.com/bulk/v1/leads/batch/${importId}/failures.json`; + } else { + url = `https://${munchkinId}.mktorest.com/bulk/v1/leads/batch/${importId}/warnings.json`; } - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError(FETCH_FAILURE_JOB_STATUS_ERR_MSG, 400, resp); -}; - -const getWarningJobStatus = async (event) => { - const { config, importId } = event; - const accessToken = await getAccessToken(config); - const { munchkinId } = config; - // Get status of each lead for warning leads - // DOC: https://developers.marketo.com/rest-api/bulk-import/bulk-lead-import/#warnings - const requestOptions = { - headers: { - 'Content-Type': JSON_MIME_TYPE, - Authorization: `Bearer ${accessToken}`, - }, - }; const startTime = Date.now(); - const warningJobStatusUrl = `https://${munchkinId}.mktorest.com/bulk/v1/leads/batch/${importId}/warnings.json`; - const resp = await httpGET(warningJobStatusUrl, requestOptions, { + const { processedResponse: resp } = await handleHttpRequest('get', url, requestOptions, { destType: 'marketo_bulk_upload', feature: 'transformation', }); const endTime = Date.now(); const requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_fetch_job_time', requestTime); - if (resp.success) { - if (resp.response && resp.response.data) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 200, - state: 'Success', - }); - return resp.response; - } - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError(FETCH_WARNING_JOB_STATUS_ERR_MSG, 400, resp); - } - if (resp.response) { - if ( - ABORTABLE_CODES.includes(resp.response.code) || - (resp.response.code >= 400 && resp.response.code <= 499) - ) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError(resp.response.code, 400, resp); - } else if (RETRYABLE_CODES.includes(resp.response.code)) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 500, - state: 'Retryable', - }); - throw new RetryableError(resp.response.code, 500, resp); - } else if (resp.response.response) { - if (ABORTABLE_CODES.includes(resp.response.response.status)) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError( - resp.response.response.statusText || WARNING_JOB_STATUS_ERR_MSG, - 400, - resp, - ); - } else if (THROTTLED_CODES.includes(resp.response.response.status)) { - stats.increment(JOB_STATUS_ACTIVITY, { - status: 500, - state: 'Retryable', - }); - throw new ThrottledError( - resp.response.response.statusText || WARNING_JOB_STATUS_ERR_MSG, - resp, - ); - } - stats.increment(JOB_STATUS_ACTIVITY, { - status: 500, - state: 'Retryable', - }); - throw new RetryableError( - resp.response.response.statusText || WARNING_JOB_STATUS_ERR_MSG, - 500, - resp, - ); - } - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError(FETCH_WARNING_JOB_STATUS_ERR_MSG, 400, resp); - } - stats.increment(JOB_STATUS_ACTIVITY, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError(FETCH_WARNING_JOB_STATUS_ERR_MSG, 400, resp); + stats.histogram('marketo_bulk_upload_fetch_job_time', requestTime); + + return handleFetchJobStatusResponse(resp, type); }; +/** + * Handles the response from the server based on the provided type. + * Retrieves the job status using the getJobsStatus function and processes the response data. + * Matches the response data with the data received from the server. + * Returns a response object containing the failed keys, failed reasons, warning keys, warning reasons, and succeeded keys. + * @param {Object} event - An object containing the input data and metadata. + * @param {string} type - A string indicating the type of job status to retrieve ("fail" or "warn"). + * @returns {Object} - A response object with the failed keys, failed reasons, warning keys, warning reasons, and succeeded keys. + */ const responseHandler = async (event, type) => { - let failedKeys = []; - let failedReasons = {}; - let warningKeys = []; - let warningReasons = {}; + let FailedKeys = []; + const unsuccessfulJobIdsArr = []; + let successfulJobIdsArr = []; + let reasons = {}; + + const { config } = event; + const accessToken = await getAccessToken(config); /** * { - "failedKeys" : [jobID1,jobID3], - "failedReasons" : { + "FailedKeys" : [jobID1,jobID3], + "FailedReasons" : { "jobID1" : "failure-reason-1", "jobID3" : "failure-reason-2", }, - "warningKeys" : [jobID2,jobID4], - "warningReasons" : { + "WarningKeys" : [jobID2,jobID4], + "WarningReasons" : { "jobID2" : "warning-reason-1", "jobID4" : "warning-reason-2", }, - "succeededKeys" : [jobID5] + "SucceededKeys" : [jobID5] } */ - const responseStatus = - type === 'fail' ? await getFailedJobStatus(event) : await getWarningJobStatus(event); - const responseArr = responseStatus.data.split('\n'); + const jobStatus = + type === 'fail' + ? await getJobsStatus(event, 'fail', accessToken) + : await getJobsStatus(event, 'warn', accessToken); + const jobStatusArr = jobStatus.toString().split('\n'); // responseArr = ['field1,field2,Import Failure Reason', 'val1,val2,reason',...] const { input, metadata } = event; let headerArr; - if (metadata && metadata.csvHeader) { + if (metadata?.csvHeader) { headerArr = metadata.csvHeader.split(','); } else { throw new PlatformError('No csvHeader in metadata'); } + const startTime = Date.now(); const data = {}; - input.forEach((i) => { + const fieldSchemaMapping = await getFieldSchemaMap(accessToken, config.munchkinId); + const unsuccessfulJobInfo = checkEventStatusViaSchemaMatching(event, fieldSchemaMapping); + const mismatchJobIdArray = Object.keys(unsuccessfulJobInfo); + const dataTypeMismatchKeys = mismatchJobIdArray.map((strJobId) => parseInt(strJobId, 10)); + reasons = { ...unsuccessfulJobInfo }; + + const filteredEvents = input.filter( + (item) => !dataTypeMismatchKeys.includes(item.metadata.job_id), + ); + // create a map of job_id and data sent from server + // {: ','} + filteredEvents.forEach((i) => { const response = headerArr.map((fieldName) => Object.values(i)[0][fieldName]).join(','); data[i.metadata.job_id] = response; }); - const unsuccessfulJobIdsArr = []; - const reasons = {}; - const startTime = Date.now(); - for (const element of responseArr) { + + // match marketo response data with received data from server + for (const element of jobStatusArr) { // split response by comma but ignore commas inside double quotes const elemArr = element.split(/,(?=(?:(?:[^"]*"){2})*[^"]*$)/); - const reasonMessage = elemArr.pop(); - // match response data with received data from server - for (const key in data) { - if (data.hasOwnProperty(key)) { - const val = data[key]; - if (val === `${elemArr.join()}`) { - // add job keys if warning/failure - - if (!unsuccessfulJobIdsArr.includes(key)) { - unsuccessfulJobIdsArr.push(key); - } - reasons[key] = reasonMessage; + // ref : + // https://developers.marketo.com/rest-api/bulk-import/bulk-custom-object-import/#:~:text=Now%20we%E2%80%99ll%20make%20Get%20Import%20Custom%20Object%20Failures%20endpoint%20call%20to%20get%20additional%20failure%20detail%3A + const reasonMessage = elemArr.pop(); // get the column named "Import Failure Reason" + for (const [key, val] of Object.entries(data)) { + // joining the parameter values sent from marketo match it with received data from server + if (val === `${elemArr.map((item) => item.replace(/"/g, '')).join(',')}`) { + // add job keys if warning/failure + if (!unsuccessfulJobIdsArr.includes(key)) { + unsuccessfulJobIdsArr.push(key); } + reasons[key] = reasonMessage; } } } - const successfulJobIdsArr = Object.keys(data).filter((x) => !unsuccessfulJobIdsArr.includes(x)); + FailedKeys = unsuccessfulJobIdsArr.map((strJobId) => parseInt(strJobId, 10)); + successfulJobIdsArr = Object.keys(data).filter((x) => !unsuccessfulJobIdsArr.includes(x)); - if (type === 'fail') { - failedKeys = unsuccessfulJobIdsArr; - failedReasons = reasons; - } else if (type === 'warn') { - warningKeys = unsuccessfulJobIdsArr; - warningReasons = reasons; - } - const succeededKeys = successfulJobIdsArr; + const SucceededKeys = successfulJobIdsArr.map((strJobId) => parseInt(strJobId, 10)); const endTime = Date.now(); const requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_fetch_job_create_response_time', requestTime); + stats.histogram('marketo_bulk_upload_fetch_job_create_response_time', requestTime); const response = { statusCode: 200, metadata: { - failedKeys, - failedReasons, - warningKeys, - warningReasons, - succeededKeys, + FailedKeys: [...dataTypeMismatchKeys, ...FailedKeys], + FailedReasons: reasons, + SucceededKeys, }, }; - return response; + return removeUndefinedValues(response); }; const processJobStatus = async (event, type) => { diff --git a/src/v0/destinations/marketo_bulk_upload/fileUpload.js b/src/v0/destinations/marketo_bulk_upload/fileUpload.js index b951918678..2c77cd6e29 100644 --- a/src/v0/destinations/marketo_bulk_upload/fileUpload.js +++ b/src/v0/destinations/marketo_bulk_upload/fileUpload.js @@ -3,95 +3,61 @@ const FormData = require('form-data'); const fs = require('fs'); const { getAccessToken, - ABORTABLE_CODES, - THROTTLED_CODES, - MARKETO_FILE_SIZE, getMarketoFilePath, - UPLOAD_FILE, + handleFileUploadResponse, + getFieldSchemaMap, + hydrateStatusForServer, } = require('./util'); +const { isHttpStatusSuccess } = require('../../util'); +const { MARKETO_FILE_SIZE, UPLOAD_FILE } = require('./config'); const { getHashFromArray, removeUndefinedAndNullValues, isDefinedAndNotNullAndNotEmpty, } = require('../../util'); -const { httpPOST, httpGET } = require('../../../adapters/network'); +const { handleHttpRequest } = require('../../../adapters/network'); const { - RetryableError, - AbortedError, - ThrottledError, NetworkError, ConfigurationError, + RetryableError, + TransformationError, } = require('../../util/errorTypes'); -const tags = require('../../util/tags'); -const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils'); +const { client } = require('../../../util/errorNotifier'); const stats = require('../../../util/stats'); -const fetchFieldSchema = async (config) => { - let fieldArr = []; - const fieldSchemaNames = []; - const accessToken = await getAccessToken(config); - const fieldSchemaMapping = await httpGET( - `https://${config.munchkinId}.mktorest.com/rest/v1/leads/describe2.json`, - { - params: { - access_token: accessToken, - }, - }, - { - destType: 'marketo_bulk_upload', - feature: 'transformation', - }, - ); - if ( - fieldSchemaMapping && - fieldSchemaMapping.success && - fieldSchemaMapping.response.data && - fieldSchemaMapping.response.data.result.length > 0 && - fieldSchemaMapping.response.data.result[0] - ) { - fieldArr = - fieldSchemaMapping.response.data.result && - Array.isArray(fieldSchemaMapping.response.data.result) - ? fieldSchemaMapping.response.data.result[0].fields - : []; - fieldArr.forEach((field) => { - fieldSchemaNames.push(field.name); - }); - } else if (fieldSchemaMapping.response.error) { - const status = fieldSchemaMapping?.response?.status || 400; - throw new NetworkError( - `${fieldSchemaMapping.response.error}`, - status, - { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status), - }, - fieldSchemaMapping, - ); - } else { - throw new AbortedError('Failed to fetch Marketo Field Schema', 400, fieldSchemaMapping); +const fetchFieldSchemaNames = async (config, accessToken) => { + const fieldSchemaMapping = await getFieldSchemaMap(accessToken, config.munchkinId); + if (Object.keys(fieldSchemaMapping).length > 0) { + const fieldSchemaNames = Object.keys(fieldSchemaMapping); + return { fieldSchemaNames }; } - return { fieldSchemaNames, accessToken }; + throw new RetryableError('Failed to fetch Marketo Field Schema', 500, fieldSchemaMapping); }; const getHeaderFields = (config, fieldSchemaNames) => { const { columnFieldsMapping } = config; columnFieldsMapping.forEach((colField) => { - if (fieldSchemaNames) { - if (!fieldSchemaNames.includes(colField.to)) { - throw new ConfigurationError( - `The field ${colField.to} is not present in Marketo Field Schema. Aborting`, - ); - } - } else { - throw new ConfigurationError('Marketo Field Schema is Empty. Aborting'); + if (!fieldSchemaNames.includes(colField.to)) { + throw new ConfigurationError( + `The field ${colField.to} is not present in Marketo Field Schema. Aborting`, + ); } }); const columnField = getHashFromArray(columnFieldsMapping, 'to', 'from', false); return Object.keys(columnField); }; - -const getFileData = async (inputEvents, config, fieldSchemaNames) => { +/** + * Processes input data to create a CSV file and returns the file data along with successful and unsuccessful job IDs. + * The file name is made unique with combination of UUID and current timestamp to avoid any overrides. It also has a + * maximum size limit of 10MB . The events that could be accomodated inside the file is marked as successful and the + * rest are marked as unsuccessful. Also the file is deleted when reading is complete. + * @param {Array} inputEvents - An array of input events. + * @param {Object} config - destination config + * @param {Array} headerArr - An array of header fields. + * @returns {Object} - An object containing the file stream, successful job IDs, and unsuccessful job IDs. + */ +const getFileData = async (inputEvents, config, headerArr) => { const input = inputEvents; const messageArr = []; let startTime; @@ -107,8 +73,6 @@ const getFileData = async (inputEvents, config, fieldSchemaNames) => { messageArr.push(data); }); - const headerArr = getHeaderFields(config, fieldSchemaNames); - if (isDefinedAndNotNullAndNotEmpty(config.deDuplicationField)) { // dedup starts // Time Complexity = O(n2) @@ -120,7 +84,7 @@ const getFileData = async (inputEvents, config, fieldSchemaNames) => { // user@email [4,7,9] // user2@email [2,3] // user3@email [1] - input.map((element, index) => { + input.forEach((element, index) => { const indexAr = dedupMap.get(element.message[config.deDuplicationField]) || []; indexAr.push(index); dedupMap.set(element.message[config.deDuplicationField], indexAr); @@ -145,19 +109,16 @@ const getFileData = async (inputEvents, config, fieldSchemaNames) => { // dedup ends } - if (Object.keys(headerArr).length === 0) { - throw new ConfigurationError('Header fields not present'); - } const csv = []; csv.push(headerArr.toString()); endTime = Date.now(); requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_create_header_time', requestTime); + stats.histogram('marketo_bulk_upload_create_header_time', requestTime); const unsuccessfulJobs = []; const successfulJobs = []; const MARKETO_FILE_PATH = getMarketoFilePath(); startTime = Date.now(); - messageArr.map((row) => { + messageArr.forEach((row) => { const csvSize = JSON.stringify(csv); // stringify and remove all "stringification" extra data const response = headerArr .map((fieldName) => JSON.stringify(Object.values(row)[0][fieldName], '')) @@ -168,215 +129,140 @@ const getFileData = async (inputEvents, config, fieldSchemaNames) => { } else { unsuccessfulJobs.push(Object.keys(row)[0]); } - return response; }); endTime = Date.now(); requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_create_csvloop_time', requestTime); + stats.histogram('marketo_bulk_upload_create_csvloop_time', requestTime); const fileSize = Buffer.from(csv.join('\n')).length; if (csv.length > 1) { startTime = Date.now(); fs.writeFileSync(MARKETO_FILE_PATH, csv.join('\n')); - const readStream = fs.createReadStream(MARKETO_FILE_PATH); + const readStream = fs.readFileSync(MARKETO_FILE_PATH); fs.unlinkSync(MARKETO_FILE_PATH); endTime = Date.now(); requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_create_file_time', requestTime); - stats.gauge('marketo_bulk_upload_upload_file_size', fileSize); + stats.histogram('marketo_bulk_upload_create_file_time', requestTime); + stats.histogram('marketo_bulk_upload_upload_file_size', fileSize); return { readStream, successfulJobs, unsuccessfulJobs }; } return { successfulJobs, unsuccessfulJobs }; }; -const getImportID = async (input, config, fieldSchemaNames, accessToken) => { - const { readStream, successfulJobs, unsuccessfulJobs } = await getFileData( - input, - config, - fieldSchemaNames, - ); - const FILE_UPLOAD_ERR_MSG = 'Could not upload file'; +const getImportID = async (input, config, accessToken, csvHeader) => { + let readStream; + let successfulJobs; + let unsuccessfulJobs; try { - const formReq = new FormData(); - const { munchkinId, deDuplicationField } = config; - // create file for multipart form - if (readStream) { - formReq.append('format', 'csv'); - formReq.append('file', readStream, 'marketo_bulk_upload.csv'); - formReq.append('access_token', accessToken); - // Upload data received from server as files to marketo - // DOC: https://developers.marketo.com/rest-api/bulk-import/bulk-lead-import/#import_file - const requestOptions = { - headers: { - ...formReq.getHeaders(), - }, - }; - if (isDefinedAndNotNullAndNotEmpty(deDuplicationField)) { - requestOptions.params = { - lookupField: deDuplicationField, - }; - } - const startTime = Date.now(); - const resp = await httpPOST( - `https://${munchkinId}.mktorest.com/bulk/v1/leads.json`, - formReq, - requestOptions, - { - destType: 'marketo_bulk_upload', - feature: 'transformation', - }, - ); - const endTime = Date.now(); - const requestTime = endTime - startTime; - stats.gauge('marketo_bulk_upload_upload_file_succJobs', successfulJobs.length); - stats.gauge('marketo_bulk_upload_upload_file_unsuccJobs', unsuccessfulJobs.length); - if (resp.success) { - /** - * - { - "requestId": "d01f#15d672f8560", - "result": [ - { - "batchId": 3404, - "importId": "3404", - "status": "Queued" - } - ], - "success": true - } - */ - if ( - resp.response && - resp.response.data.success && - resp.response.data.result.length > 0 && - resp.response.data.result[0] && - resp.response.data.result[0].importId - ) { - const { importId } = await resp.response.data.result[0]; - stats.gauge('marketo_bulk_upload_upload_file_time', requestTime); - - stats.increment(UPLOAD_FILE, { - status: 200, - state: 'Success', - }); - return { importId, successfulJobs, unsuccessfulJobs }; - } - if (resp.response && resp.response.data) { - if ( - resp.response.data.errors[0] && - resp.response.data.errors[0].message === - 'There are 10 imports currently being processed. Please try again later' - ) { - stats.increment(UPLOAD_FILE, { - status: 500, - state: 'Retryable', - }); - throw new RetryableError( - resp.response.data.errors[0].message || FILE_UPLOAD_ERR_MSG, - 500, - { successfulJobs, unsuccessfulJobs }, - ); - } - if ( - resp.response.data.errors[0] && - ((resp.response.data.errors[0].code >= 1000 && - resp.response.data.errors[0].code <= 1077) || - ABORTABLE_CODES.indexOf(resp.response.data.errors[0].code)) - ) { - if (resp.response.data.errors[0].message === 'Empty file') { - stats.increment(UPLOAD_FILE, { - status: 500, - state: 'Retryable', - }); - throw new RetryableError( - resp.response.data.errors[0].message || FILE_UPLOAD_ERR_MSG, - 500, - { successfulJobs, unsuccessfulJobs }, - ); - } - - stats.increment(UPLOAD_FILE, { - status: 400, - state: 'Abortable', - }); - throw new AbortedError( - resp.response.data.errors[0].message || FILE_UPLOAD_ERR_MSG, - 400, - { successfulJobs, unsuccessfulJobs }, - ); - } else if (THROTTLED_CODES.indexOf(resp.response.data.errors[0].code)) { - stats.increment(UPLOAD_FILE, { - status: 500, - state: 'Retryable', - }); - throw new ThrottledError(resp.response.response.statusText || FILE_UPLOAD_ERR_MSG, { - successfulJobs, - unsuccessfulJobs, - }); - } - stats.increment(UPLOAD_FILE, { - status: 500, - state: 'Retryable', - }); - throw new RetryableError(resp.response.response.statusText || FILE_UPLOAD_ERR_MSG, 500, { - successfulJobs, - unsuccessfulJobs, - }); - } - } - } - return { successfulJobs, unsuccessfulJobs }; + ({ readStream, successfulJobs, unsuccessfulJobs } = await getFileData( + input, + config, + csvHeader, + )); } catch (err) { - // TODO check the tags - stats.increment(UPLOAD_FILE, { - status: err.response?.status || 400, - errorMessage: err.message || FILE_UPLOAD_ERR_MSG, + client.notify(err, `Marketo File Upload: Error while creating file: ${err.message}`, { + config, + csvHeader, }); - const status = err.response?.status || 400; - throw new NetworkError( - err.message || FILE_UPLOAD_ERR_MSG, - status, + throw new TransformationError( + `Marketo File Upload: Error while creating file: ${err.message}`, + 500, + ); + } + + const formReq = new FormData(); + const { munchkinId, deDuplicationField } = config; + // create file for multipart form + if (readStream) { + formReq.append('format', 'csv'); + formReq.append('file', readStream, 'marketo_bulk_upload.csv'); + formReq.append('access_token', accessToken); + // Upload data received from server as files to marketo + // DOC: https://developers.marketo.com/rest-api/bulk-import/bulk-lead-import/#import_file + const requestOptions = { + headers: { + ...formReq.getHeaders(), + }, + }; + if (isDefinedAndNotNullAndNotEmpty(deDuplicationField)) { + requestOptions.params = { + lookupField: deDuplicationField, + }; + } + const startTime = Date.now(); + const { processedResponse: resp } = await handleHttpRequest( + 'post', + `https://${munchkinId}.mktorest.com/bulk/v1/leads.json`, + formReq, + requestOptions, { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status), + destType: 'marketo_bulk_upload', + feature: 'transformation', }, - { successfulJobs, unsuccessfulJobs }, ); + const endTime = Date.now(); + const requestTime = endTime - startTime; + stats.counter('marketo_bulk_upload_upload_file_succJobs', successfulJobs.length); + stats.counter('marketo_bulk_upload_upload_file_unsuccJobs', unsuccessfulJobs.length); + if (!isHttpStatusSuccess(resp.status)) { + throw new NetworkError( + 'Unable to upload file', + hydrateStatusForServer(resp.status, 'During fetching poll status'), + ); + } + return handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime, config); } + return { importId: null, successfulJobs, unsuccessfulJobs }; }; +/** + * + * @param {*} input + * @param {*} config + * @returns returns the final response of fileUpload.js + */ const responseHandler = async (input, config) => { + const accessToken = await getAccessToken(config); /** { "importId" : , "pollURL" : , } */ - const { fieldSchemaNames, accessToken } = await fetchFieldSchema(config); + const { fieldSchemaNames } = await fetchFieldSchemaNames(config, accessToken); + const headerForCsv = getHeaderFields(config, fieldSchemaNames); + if (Object.keys(headerForCsv).length === 0) { + throw new ConfigurationError( + 'Faulty configuration. Please map your traits to Marketo column fields', + ); + } const { importId, successfulJobs, unsuccessfulJobs } = await getImportID( input, config, - fieldSchemaNames, accessToken, + headerForCsv, ); + + // if upload is successful if (importId) { + const csvHeader = headerForCsv.toString(); + const metadata = { successfulJobs, unsuccessfulJobs, csvHeader }; const response = { statusCode: 200, importId, - pollURL: '/pollStatus', + metadata, }; - const csvHeader = getHeaderFields(config, fieldSchemaNames).toString(); - response.metadata = { successfulJobs, unsuccessfulJobs, csvHeader }; return response; } - + // if importId is returned null stats.increment(UPLOAD_FILE, { status: 500, state: 'Retryable', }); - throw new RetryableError('No import id received', 500, { - successfulJobs, - unsuccessfulJobs, - }); + return { + statusCode: 500, + FailedReason: '[Marketo File upload]: No import id received', + }; }; const processFileData = async (event) => { const { input, config } = event; diff --git a/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js b/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js new file mode 100644 index 0000000000..777301b6c3 --- /dev/null +++ b/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js @@ -0,0 +1,230 @@ +const { + handleCommonErrorResponse, + handlePollResponse, + handleFileUploadResponse, +} = require('./util'); + +const { AbortedError, RetryableError } = require('../../util/errorTypes'); + +describe('handleCommonErrorResponse', () => { + test('should throw AbortedError for abortable error codes', () => { + const resp = { + response: { + errors: [{ code: 1003, message: 'Aborted' }], + }, + }; + expect(() => handleCommonErrorResponse(resp, 'OpErrorMessage', 'OpActivity')).toThrow( + AbortedError, + ); + }); + + test('should throw ThrottledError for throttled error codes', () => { + const resp = { + response: { + errors: [{ code: 615, message: 'Throttled' }], + }, + }; + expect(() => handleCommonErrorResponse(resp, 'OpErrorMessage', 'OpActivity')).toThrow( + RetryableError, + ); + }); + + test('should throw RetryableError for other error codes', () => { + const resp = { + response: { + errors: [{ code: 2000, message: 'Retryable' }], + }, + }; + expect(() => handleCommonErrorResponse(resp, 'OpErrorMessage', 'OpActivity')).toThrow( + RetryableError, + ); + }); + + test('should throw RetryableError by default', () => { + const resp = { + response: { + errors: [], + }, + }; + expect(() => handleCommonErrorResponse(resp, 'OpErrorMessage', 'OpActivity')).toThrow( + RetryableError, + ); + }); +}); + +describe('handlePollResponse', () => { + // Tests that the function returns the response object if the polling operation was successful + it('should return the response object when the polling operation was successful', () => { + const pollStatus = { + response: { + success: true, + result: [ + { + batchId: '123', + status: 'Complete', + numOfLeadsProcessed: 2, + numOfRowsFailed: 1, + numOfRowsWithWarning: 0, + message: 'Import completed with errors, 2 records imported (2 members), 1 failed', + }, + ], + }, + }; + + const result = handlePollResponse(pollStatus); + + expect(result).toEqual(pollStatus.response); + }); + + // Tests that the function throws an AbortedError if the response contains an abortable error code + it('should throw an AbortedError when the response contains an abortable error code', () => { + const pollStatus = { + response: { + errors: [ + { + code: 1003, + message: 'Empty file', + }, + ], + }, + }; + + expect(() => handlePollResponse(pollStatus)).toThrow(AbortedError); + }); + + // Tests that the function throws a ThrottledError if the response contains a throttled error code + it('should throw a ThrottledError when the response contains a throttled error code', () => { + const pollStatus = { + response: { + errors: [ + { + code: 615, + message: 'Exceeded concurrent usage limit', + }, + ], + }, + }; + + expect(() => handlePollResponse(pollStatus)).toThrow(RetryableError); + }); + + // Tests that the function throws a RetryableError if the response contains an error code that is not abortable or throttled + it('should throw a RetryableError when the response contains an error code that is not abortable or throttled', () => { + const pollStatus = { + response: { + errors: [ + { + code: 601, + message: 'Unauthorized', + }, + ], + }, + }; + + expect(() => handlePollResponse(pollStatus)).toThrow(RetryableError); + }); + + // Tests that the function returns null if the polling operation was not successful + it('should return null when the polling operation was not successful', () => { + const pollStatus = { + response: { + success: false, + }, + }; + + const result = handlePollResponse(pollStatus); + + expect(result).toBeNull(); + }); +}); + +describe('handleFileUploadResponse', () => { + // Tests that the function returns an object with importId, successfulJobs, and unsuccessfulJobs when the response indicates a successful upload. + it('should return an object with importId, successfulJobs, and unsuccessfulJobs when the response indicates a successful upload', () => { + const resp = { + response: { + success: true, + result: [ + { + importId: '3404', + status: 'Queued', + }, + ], + }, + }; + const successfulJobs = []; + const unsuccessfulJobs = []; + const requestTime = 100; + + const result = handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); + + expect(result).toEqual({ + importId: '3404', + successfulJobs: [], + unsuccessfulJobs: [], + }); + }); + + // Tests that the function throws a RetryableError when the response indicates an empty file. + it('should throw a RetryableError when the response indicates an empty file', () => { + const resp = { + response: { + errors: [ + { + code: '1003', + message: 'Empty File', + }, + ], + }, + }; + const successfulJobs = []; + const unsuccessfulJobs = []; + const requestTime = 100; + + expect(() => { + handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); + }).toThrow(RetryableError); + }); + + // Tests that the function throws a RetryableError when the response indicates more than 10 concurrent uses. + it('should throw a RetryableError when the response indicates more than 10 concurrent uses', () => { + const resp = { + response: { + errors: [ + { + code: '615', + message: 'Concurrent Use Limit Exceeded', + }, + ], + }, + }; + const successfulJobs = []; + const unsuccessfulJobs = []; + const requestTime = 100; + + expect(() => { + handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); + }).toThrow(RetryableError); + }); + + // Tests that the function throws a RetryableError when the response contains an error code between 1000 and 1077. + it('should throw a Aborted when the response contains an error code between 1000 and 1077', () => { + const resp = { + response: { + errors: [ + { + code: 1001, + message: 'Some Error', + }, + ], + }, + }; + const successfulJobs = []; + const unsuccessfulJobs = []; + const requestTime = 100; + + expect(() => { + handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); + }).toThrow(AbortedError); + }); +}); diff --git a/src/v0/destinations/marketo_bulk_upload/poll.js b/src/v0/destinations/marketo_bulk_upload/poll.js index 7978f2d876..5e37fd7c0e 100644 --- a/src/v0/destinations/marketo_bulk_upload/poll.js +++ b/src/v0/destinations/marketo_bulk_upload/poll.js @@ -1,9 +1,10 @@ -const { removeUndefinedValues } = require('../../util'); -const { getAccessToken, ABORTABLE_CODES, THROTTLED_CODES, POLL_ACTIVITY } = require('./util'); -const { httpGET } = require('../../../adapters/network'); +const { removeUndefinedValues, isHttpStatusSuccess } = require('../../util'); +const { getAccessToken, handlePollResponse, hydrateStatusForServer } = require('./util'); +const { handleHttpRequest } = require('../../../adapters/network'); const stats = require('../../../util/stats'); -const { AbortedError, ThrottledError, RetryableError } = require('../../util/errorTypes'); +const { NetworkError } = require('../../util/errorTypes'); const { JSON_MIME_TYPE } = require('../../util/constant'); +const { POLL_ACTIVITY } = require('./config'); const getPollStatus = async (event) => { const accessToken = await getAccessToken(event.config); @@ -18,106 +19,41 @@ const getPollStatus = async (event) => { }, }; const pollUrl = `https://${munchkinId}.mktorest.com/bulk/v1/leads/batch/${event.importId}.json`; - const startTime = Date.now(); - const pollStatus = await httpGET(pollUrl, requestOptions, { - destType: 'marketo_bulk_upload', - feature: 'transformation', - }); - const endTime = Date.now(); - const requestTime = endTime - startTime; - const POLL_STATUS_ERR_MSG = 'Could not poll status'; - if (pollStatus.success) { - if (pollStatus.response && pollStatus.response.data.success) { - stats.increment(POLL_ACTIVITY, { - requestTime, - status: 200, - state: 'Success', - }); - return pollStatus.response; - } - // DOC: https://developers.marketo.com/rest-api/error-codes/ - if (pollStatus.response && pollStatus.response.data) { - // Abortable jobs - // Errors from polling come as - /** - * { - "requestId": "e42b#14272d07d78", - "success": false, - "errors": [ - { - "code": "601", - "message": "Unauthorized" - } - ] -} - */ - if ( - pollStatus.response.data.errors[0] && - ((pollStatus.response.data.errors[0].code >= 1000 && - pollStatus.response.data.errors[0].code <= 1077) || - ABORTABLE_CODES.includes(pollStatus.response.data.errors[0].code)) - ) { - stats.increment(POLL_ACTIVITY, { - requestTime, - status: 400, - state: 'Abortable', - }); - throw new AbortedError( - pollStatus.response.data.errors[0].message || POLL_STATUS_ERR_MSG, - 400, - pollStatus, - ); - } else if (THROTTLED_CODES.includes(pollStatus.response.data.errors[0].code)) { - stats.increment(POLL_ACTIVITY, { - requestTime, - status: 500, - state: 'Retryable', - }); - throw new ThrottledError( - pollStatus.response.data.errors[0].message || POLL_STATUS_ERR_MSG, - pollStatus, - ); - } - stats.increment(POLL_ACTIVITY, { - requestTime, - status: 500, - state: 'Retryable', - }); - throw new RetryableError( - pollStatus.response.response.statusText || 'Error during polling status', - 500, - pollStatus, - ); - } + const { processedResponse: pollStatus } = await handleHttpRequest( + 'get', + pollUrl, + requestOptions, + { + destType: 'marketo_bulk_upload', + feature: 'transformation', + }, + ); + if (!isHttpStatusSuccess(pollStatus.status)) { + stats.counter(POLL_ACTIVITY, 1, { + status: pollStatus.status, + state: 'Retryable', + }); + throw new NetworkError( + 'Could not poll status', + hydrateStatusForServer(pollStatus.status, 'During fetching poll status'), + ); } - stats.increment(POLL_ACTIVITY, { - requestTime, - status: 400, - state: 'Abortable', - }); - throw new AbortedError(POLL_STATUS_ERR_MSG, 400, pollStatus); + return handlePollResponse(pollStatus, event.config); }; const responseHandler = async (event) => { const pollResp = await getPollStatus(event); - let pollSuccess; - let success; - let statusCode; - let hasFailed; - let failedJobsURL; - let hasWarnings; - let warningJobsURL; - let errorResponse; // Server expects : /** * * { - "success": true, + "Complete": true, "statusCode": 200, "hasFailed": true, - "failedJobsURL": "", // transformer URL - "hasWarnings": false, - "warningJobsURL": "", // transformer URL + "InProgress": false, + "FailedJobURLs": "", // transformer URL + "HasWarning": false, + "WarningJobURLs": "", // transformer URL } // Succesful Upload { "success": false, @@ -126,41 +62,57 @@ const responseHandler = async (event) => { } // Failed Upload { "success": false, + "Inprogress": true, + statusCode: 500, } // Importing or Queue */ - if (pollResp && pollResp.data) { - pollSuccess = pollResp.data.success; - if (pollSuccess) { - const { status, numOfRowsFailed, numOfRowsWithWarning } = pollResp.data.result[0]; - if (status === 'Complete') { - success = true; - statusCode = 200; - hasFailed = numOfRowsFailed > 0; - failedJobsURL = '/getFailedJobs'; - warningJobsURL = '/getWarningJobs'; - hasWarnings = numOfRowsWithWarning > 0; - } else if (status === 'Importing' || status === 'Queued') { - success = false; - } - } else { - success = false; - statusCode = 400; - errorResponse = pollResp.data.errors - ? pollResp.data.errors[0].message - : 'Error in importing jobs'; + if (pollResp) { + // As marketo lead import API or bulk API does not support record level error response we are considering + // file level errors only. + // ref: https://nation.marketo.com/t5/ideas/support-error-code-in-record-level-in-lead-bulk-api/idi-p/262191 + const { status, numOfRowsFailed, numOfRowsWithWarning, message } = pollResp.result[0]; + if (status === 'Complete') { + const response = { + Complete: true, + statusCode: 200, + InProgress: false, + hasFailed: numOfRowsFailed > 0, + FailedJobURLs: numOfRowsFailed > 0 ? '/getFailedJobs' : undefined, + HasWarning: numOfRowsWithWarning > 0, + WarningJobURLs: numOfRowsWithWarning > 0 ? '/getWarningJobs' : undefined, + }; + return removeUndefinedValues(response); + } + if (status === 'Importing' || status === 'Queued') { + return { + Complete: false, + statusCode: 500, + hasFailed: false, + InProgress: true, + HasWarning: false, + }; + } + if (status === 'Failed') { + return { + Complete: false, + statusCode: 500, + hasFailed: false, + InProgress: false, + HasWarning: false, + Error: message || 'Marketo Poll Status Failed', + }; } } - const response = { - success, - statusCode, - hasFailed, - failedJobsURL, - hasWarnings, - warningJobsURL, - errorResponse, + // when pollResp is null + return { + Complete: false, + statusCode: 500, + hasFailed: false, + InProgress: false, + HasWarning: false, + Error: 'No poll response received from Marketo', }; - return removeUndefinedValues(response); }; const processPolling = async (event) => { diff --git a/src/v0/destinations/marketo_bulk_upload/util.js b/src/v0/destinations/marketo_bulk_upload/util.js index 38bda6c3a4..0b4eb1e48a 100644 --- a/src/v0/destinations/marketo_bulk_upload/util.js +++ b/src/v0/destinations/marketo_bulk_upload/util.js @@ -1,98 +1,444 @@ -const { httpGET } = require('../../../adapters/network'); +const { handleHttpRequest } = require('../../../adapters/network'); const { - ThrottledError, AbortedError, RetryableError, NetworkError, + TransformationError, } = require('../../util/errorTypes'); const tags = require('../../util/tags'); +const { isHttpStatusSuccess, generateUUID } = require('../../util'); const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils'); +const stats = require('../../../util/stats'); +const { + ABORTABLE_CODES, + THROTTLED_CODES, + POLL_ACTIVITY, + UPLOAD_FILE, + FETCH_ACCESS_TOKEN, + POLL_STATUS_ERR_MSG, + FILE_UPLOAD_ERR_MSG, + ACCESS_TOKEN_FETCH_ERR_MSG, +} = require('./config'); +const Cache = require('../../util/cache'); +const logger = require('../../../logger'); -const ABORTABLE_CODES = ['ENOTFOUND', 'ECONNREFUSED', 603, 605, 609, 610]; -const RETRYABLE_CODES = ['EADDRINUSE', 'ECONNRESET', 'ETIMEDOUT', 713, 601, 602, 604, 611]; -const THROTTLED_CODES = [502, 606, 607, 608, 615]; +const { AUTH_CACHE_TTL } = require('../../util/constant'); -const MARKETO_FILE_SIZE = 10485760; -const MARKETO_FILE_PATH = `${__dirname}/uploadFile/marketo_bulkupload.csv`; +const authCache = new Cache(AUTH_CACHE_TTL); -const POLL_ACTIVITY = 'marketo_bulk_upload_polling'; -const UPLOAD_FILE = 'marketo_bulk_upload_upload_file'; -const JOB_STATUS_ACTIVITY = 'marketo_bulk_upload_get_job_status'; +const getMarketoFilePath = () => + `${__dirname}/uploadFile/${Date.now()}_marketo_bulk_upload_${generateUUID()}.csv`; -const getMarketoFilePath = () => MARKETO_FILE_PATH; -// Fetch access token from client id and client secret -// DOC: https://developers.marketo.com/rest-api/authentication/ -const getAccessToken = async (config) => { +// Server only aborts when status code is 400 +const hydrateStatusForServer = (statusCode, context) => { + const status = Number(statusCode); + if (Number.isNaN(status)) { + throw new TransformationError(`${context}: Couldn't parse status code ${statusCode}`); + } + if (status >= 400 && status <= 499) { + return 400; + } + return status; +}; + +const getAccessTokenCacheKey = (config = {}) => { + const { munchkinId, clientId, clientSecret } = config; + return `${munchkinId}-${clientId}-${clientSecret}`; +}; + +/** + * Handles common error responses returned from API calls. + * Checks the error code and throws the appropriate error object based on the code. + * + * @param {object} resp - The response object containing the error information. + * @param {string} OpErrorMessage - The error message to be used if the error code is not recognized. + * @param {string} OpActivity - The activity name for tracking purposes. + * @throws {AbortedError} - If the error code is abortable. + * @throws {ThrottledError} - If the error code is within the range of throttled codes. + * @throws {RetryableError} - If the error code is neither abortable nor throttled. + * + * @example + * const resp = { + * response: { + * errors: [ + * { + * code: "1003", + * message: "Empty File" + * } + * ] + * } + * }; + * + * try { + * handleCommonErrorResponse(resp, "Error message", "Activity"); + * } catch (error) { + * console.log(error); + * } + */ +const handleCommonErrorResponse = (apiCallResult, OpErrorMessage, OpActivity, config) => { + // checking for invalid/expired token errors and evicting cache in that case + // rudderJobMetadata contains some destination info which is being used to evict the cache + if ( + authCache && + apiCallResult.response?.errors && + apiCallResult.response?.errors?.length > 0 && + apiCallResult.response?.errors.some( + (errorObj) => errorObj.code === '601' || errorObj.code === '602', + ) + ) { + // Special handling for 601 and 602 error codes for access token + authCache.del(getAccessTokenCacheKey(config)); + if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '601')) { + throw new AbortedError( + `[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, + ); + } + if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '602')) { + throw new RetryableError( + `[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, + ); + } + } + if ( + apiCallResult.response?.errors?.length > 0 && + apiCallResult.response?.errors[0] && + ((apiCallResult.response?.errors[0]?.code >= 1000 && + apiCallResult.response?.errors[0]?.code <= 1077) || + ABORTABLE_CODES.includes(apiCallResult.response?.errors[0]?.code)) + ) { + // for empty file the code is 1003 and that should be retried + stats.increment(OpActivity, { + status: 400, + state: 'Abortable', + }); + throw new AbortedError(apiCallResult.response?.errors[0]?.message || OpErrorMessage, 400); + } else if (THROTTLED_CODES.includes(apiCallResult.response?.errors[0]?.code)) { + // for more than 10 concurrent uses the code is 615 and that should be retried + stats.increment(OpActivity, { + status: 429, + state: 'Retryable', + }); + throw new RetryableError( + `[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, + 500, + ); + } + // by default every thing will be retried + stats.increment(OpActivity, { + status: 500, + state: 'Retryable', + }); + throw new RetryableError( + `[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, + 500, + ); +}; + +const getAccessTokenURL = (config) => { const { clientId, clientSecret, munchkinId } = config; const url = `https://${munchkinId}.mktorest.com/identity/oauth/token?client_id=${clientId}&client_secret=${clientSecret}&grant_type=client_credentials`; - const resp = await httpGET( - url, - {}, - { + return url; +}; + +// Fetch access token from client id and client secret +// DOC: https://developers.marketo.com/rest-api/authentication/ +const getAccessToken = async (config) => + authCache.get(getAccessTokenCacheKey(config), async () => { + const url = getAccessTokenURL(config); + const { processedResponse: accessTokenResponse } = await handleHttpRequest('get', url, { destType: 'marketo_bulk_upload', feature: 'transformation', - }, - ); - const ACCESS_TOKEN_FETCH_ERR_MSG = 'Error during fetching access token'; - if (resp.success) { - if (resp.response && resp.response.data && resp.response.data.access_token) { - return resp.response.data.access_token; + }); + + // sample response : {response: '[ENOTFOUND] :: DNS lookup failed', status: 400} + if (!isHttpStatusSuccess(accessTokenResponse.status)) { + throw new NetworkError( + 'Could not retrieve authorisation token', + hydrateStatusForServer(accessTokenResponse.status, FETCH_ACCESS_TOKEN), + { + [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(accessTokenResponse.status), + }, + accessTokenResponse, + ); + } + if (accessTokenResponse.response?.success === false) { + handleCommonErrorResponse( + accessTokenResponse, + ACCESS_TOKEN_FETCH_ERR_MSG, + FETCH_ACCESS_TOKEN, + config, + ); + } + + // when access token is present + if (accessTokenResponse.response.access_token) { + /* This scenario will handle the case when we get the foloowing response + status: 200 + respnse: {"access_token":"","token_type":"bearer","expires_in":0,"scope":"dummy@scope.com"} + wherein "expires_in":0 denotes that we should refresh the accessToken but its not expired yet. + */ + if (accessTokenResponse.response?.expires_in === 0) { + throw new RetryableError( + `Request Failed for marketo_bulk_upload, Access Token Expired (Retryable).`, + 500, + ); + } + return accessTokenResponse.response.access_token; + } + throw new AbortedError('Could not retrieve authorisation token', 400); + }); + +/** + * Handles the response of a polling operation. + * Checks for any errors in the response and calls the `handleCommonErrorResponse` function to handle them. + * If the response is successful, increments the stats and returns the response. + * Otherwise, returns null. + * + * @param {object} pollStatus - The response object from the polling operation. + * @returns {object|null} - The response object if the polling operation was successful, otherwise null. + */ +const handlePollResponse = (pollStatus, config) => { + // DOC: https://developers.marketo.com/rest-api/error-codes/ + if (pollStatus.response.errors) { + /* Sample error response for poll is: + + { + "requestId": "e42b#14272d07d78", + "success": false, + "errors": [ + { + "code": "601", + "message": "Unauthorized" + } + ] + } + */ + handleCommonErrorResponse(pollStatus, POLL_STATUS_ERR_MSG, POLL_ACTIVITY, config); + } + + /* + Sample Successful Poll response structure: + { + "requestId":"8136#146daebc2ed", + "success":true, + "result":[ + { + "batchId":, + "status":"Complete", + "numOfLeadsProcessed":2, + "numOfRowsFailed":1, + "numOfRowsWithWarning":0, + "message":"Import completed with errors, 2 records imported (2 members), 1 failed" + } + ] + } + */ + if (pollStatus.response?.success) { + stats.counter(POLL_ACTIVITY, 1, { + status: 200, + state: 'Success', + }); + + if (pollStatus.response?.result?.length > 0) { + return pollStatus.response; } - const status = resp?.response?.status || 400; + } + + return null; +}; + +const handleFetchJobStatusResponse = (resp, type) => { + const marketoResponse = resp.response; + const marketoReposnseStatus = resp.status; + + if (!isHttpStatusSuccess(marketoReposnseStatus)) { + logger.info('[Network Error]:Failed during fetching job status', { marketoResponse, type }); throw new NetworkError( - 'Could not retrieve authorisation token', - status, - { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status), - }, - resp, + 'Unable to fetch job status', + hydrateStatusForServer(marketoReposnseStatus, 'During fetching job status'), ); } - if (resp.response) { - // handle for abortable codes - if ( - ABORTABLE_CODES.includes(resp.response.code) || - (resp.response.code >= 400 && resp.response.code <= 499) - ) { - throw new AbortedError(resp.response.code, 400, resp); - } // handle for retryable codes - else if (RETRYABLE_CODES.includes(resp.response.code)) { - throw new RetryableError(resp.response.code, 500, resp); - } // handle for abortable codes - else if (resp.response.response) { - if (ABORTABLE_CODES.includes(resp.response.response.status)) { - throw new AbortedError( - resp.response.response.statusText || ACCESS_TOKEN_FETCH_ERR_MSG, - 400, - resp, - ); - } // handle for throttled codes - else if (THROTTLED_CODES.includes(resp.response.response.status)) { - throw new ThrottledError( - resp.response.response.statusText || ACCESS_TOKEN_FETCH_ERR_MSG, - resp, - ); - } - // Assuming none we should retry the remaining errors + + if (marketoResponse?.success === false) { + logger.info('[Application Error]Failed during fetching job status', { marketoResponse, type }); + throw new RetryableError('Failure during fetching job status', 500, resp); + } + + /* + successful response : + { + response: 'city, email,Import Failure ReasonChennai,s…a,Value for lookup field 'email' not found', + status: 200 + } + + */ + + return marketoResponse; +}; + +/** + * Handles the response received after a file upload request. + * Checks for errors in the response and throws appropriate error objects based on the error codes. + * If the response indicates a successful upload, extracts the importId and returns it along with other job details. + * + * @param {object} resp - The response object received after a file upload request. + * @param {array} successfulJobs - An array to store details of successful jobs. + * @param {array} unsuccessfulJobs - An array to store details of unsuccessful jobs. + * @param {number} requestTime - The time taken for the request in milliseconds. + * @returns {object} - An object containing the importId, successfulJobs, and unsuccessfulJobs. + */ +const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, requestTime, config) => { + /* + For unsuccessful response + { + "requestId": "e42b#14272d07d78", + "success": false, + "errors": [ + { + "code": "1003", + "message": "Empty File" + } + ] + } + */ + if (resp.response?.errors) { + if (resp.response?.errors[0]?.code === '1003') { + stats.increment(UPLOAD_FILE, { + status: 500, + state: 'Retryable', + }); throw new RetryableError( - resp.response.response.statusText || ACCESS_TOKEN_FETCH_ERR_MSG, + `[${FILE_UPLOAD_ERR_MSG}]:Error Message ${resp.response.errors[0]?.message}`, 500, - resp, ); + } else { + handleCommonErrorResponse(resp, FILE_UPLOAD_ERR_MSG, UPLOAD_FILE, config); + } + } + + /** + * SuccessFul Upload Response : + { + "requestId": "d01f#15d672f8560", + "result": [ + { + "batchId": 3404, + "importId": "3404", + "status": "Queued" + } + ], + "success": true } - throw new NetworkError('Could not retrieve authorization token'); + */ + if ( + resp.response?.success && + resp.response?.result?.length > 0 && + resp.response?.result[0]?.importId + ) { + const { importId } = resp.response.result[0]; + stats.histogram('marketo_bulk_upload_upload_file_time', requestTime); + + stats.increment(UPLOAD_FILE, { + status: 200, + state: 'Success', + }); + return { importId, successfulJobs, unsuccessfulJobs }; + } + // if neither successful, nor the error message is appropriate sending importId as default null + return { importId: null, successfulJobs, unsuccessfulJobs }; +}; + +/** + * Retrieves the field schema mapping for a given access token and munchkin ID from the Marketo API. + * + * @param {string} accessToken - The access token used to authenticate the API request. + * @param {string} munchkinId - The munchkin ID of the Marketo instance. + * @returns {object} - The field schema mapping retrieved from the Marketo API. + */ +const getFieldSchemaMap = async (accessToken, munchkinId) => { + let fieldArr = []; + const fieldMap = {}; // map to store field name and data type + // ref: https://developers.marketo.com/rest-api/endpoint-reference/endpoint-index/#:~:text=Describe%20Lead2,leads/describe2.json + const { processedResponse: fieldSchemaMapping } = await handleHttpRequest( + 'get', + `https://${munchkinId}.mktorest.com/rest/v1/leads/describe2.json`, + { + params: { + access_token: accessToken, + }, + }, + { + destType: 'marketo_bulk_upload', + feature: 'transformation', + }, + ); + + if (fieldSchemaMapping.response.errors) { + handleCommonErrorResponse( + fieldSchemaMapping, + 'Error while fetching Marketo Field Schema', + 'FieldSchemaMapping', + ); + } + if ( + fieldSchemaMapping.response?.success && + fieldSchemaMapping.response?.result.length > 0 && + fieldSchemaMapping.response?.result[0] + ) { + fieldArr = + fieldSchemaMapping.response.result && Array.isArray(fieldSchemaMapping.response.result) + ? fieldSchemaMapping.response.result[0]?.fields + : []; + + fieldArr.forEach((field) => { + fieldMap[field?.name] = field?.dataType; + }); + } else { + throw new RetryableError('Failed to fetch Marketo Field Schema', 500, fieldSchemaMapping); } - throw new NetworkError('Could not retrieve authorization token'); + return fieldMap; +}; + +/** + * Compares the data types of the fields in an event message with the expected data types defined in the field schema mapping. + * Identifies any mismatched fields and returns them as a map of job IDs and the corresponding invalid fields. + * + * @param {object} event - An object containing an `input` array of events. Each event has a `message` object with field-value pairs and a `metadata` object with a `job_id` property. + * @param {object} fieldSchemaMapping - An object containing the field schema mapping, which includes the expected data types for each field. + * @returns {object} - An object containing the job IDs as keys and the corresponding invalid fields as values. + */ +const checkEventStatusViaSchemaMatching = (event, fieldMap) => { + const mismatchedFields = {}; + const events = event.input; + events.forEach((event) => { + const { message, metadata } = event; + const { job_id } = metadata; + + Object.entries(message).forEach(([paramName, paramValue]) => { + let expectedDataType = fieldMap[paramName]; + const actualDataType = typeof paramValue; + + // If expectedDataType is not one of the primitive data types, treat it as a string + if (!['string', 'number', 'boolean', 'undefined'].includes(expectedDataType)) { + expectedDataType = 'string'; + } + + if (!mismatchedFields[job_id] && actualDataType !== expectedDataType) { + mismatchedFields[job_id] = `invalid ${paramName}`; + } + }); + }); + return mismatchedFields; }; module.exports = { + checkEventStatusViaSchemaMatching, + handlePollResponse, + handleFetchJobStatusResponse, + handleFileUploadResponse, + handleCommonErrorResponse, + hydrateStatusForServer, getAccessToken, - ABORTABLE_CODES, - RETRYABLE_CODES, - THROTTLED_CODES, - MARKETO_FILE_SIZE, getMarketoFilePath, - POLL_ACTIVITY, - UPLOAD_FILE, - JOB_STATUS_ACTIVITY, + getFieldSchemaMap, }; diff --git a/src/v0/destinations/salesforce/transform.js b/src/v0/destinations/salesforce/transform.js index c09a3b33df..95e41ccd98 100644 --- a/src/v0/destinations/salesforce/transform.js +++ b/src/v0/destinations/salesforce/transform.js @@ -26,7 +26,6 @@ const { const { getAccessToken, salesforceResponseHandler } = require('./utils'); const { handleHttpRequest } = require('../../../adapters/network'); const { InstrumentationError, NetworkInstrumentationError } = require('../../util/errorTypes'); -const logger = require('../../../logger'); const { JSON_MIME_TYPE } = require('../../util/constant'); // Basic response builder @@ -125,7 +124,7 @@ async function getSaleforceIdForRecord( ); } const searchRecord = processedsfSearchResponse.response?.searchRecords?.find( - (rec) => rec[identifierType] === identifierValue, + (rec) => typeof identifierValue !== 'undefined' && rec[identifierType] === `${identifierValue}`, ); return searchRecord?.Id; diff --git a/src/v0/destinations/salesforce/utils.js b/src/v0/destinations/salesforce/utils.js index 295578c929..840c42aa35 100644 --- a/src/v0/destinations/salesforce/utils.js +++ b/src/v0/destinations/salesforce/utils.js @@ -41,6 +41,18 @@ const salesforceResponseHandler = (destResponse, sourceMessage, authKey) => { `${DESTINATION} Request Failed - due to "REQUEST_LIMIT_EXCEEDED", (Throttled) ${sourceMessage}`, destResponse, ); + } else if ( + status === 400 && + matchErrorCode('CANNOT_INSERT_UPDATE_ACTIVATE_ENTITY') && + response.message.includes('UNABLE_TO_LOCK_ROW') + ) { + // handling the error case where the record is locked by another background job + // this is a retryable error + throw new RetryableError( + `${DESTINATION} Request Failed - "Row locked due to another background running on the same object", (Retryable) ${sourceMessage}`, + 500, + destResponse, + ); } else if (status === 503 || status === 500) { // The salesforce server is unavailable to handle the request. Typically this occurs if the server is down // for maintenance or is currently overloaded. diff --git a/test/__mocks__/data/intercom/proxy_response.json b/test/__mocks__/data/intercom/proxy_response.json new file mode 100644 index 0000000000..3c5a3d9638 --- /dev/null +++ b/test/__mocks__/data/intercom/proxy_response.json @@ -0,0 +1,15 @@ +{ + "https://api.intercom.io/users/test1": { + "data": { + "type": "error.list", + "request_id": "000on04msi4jpk7d3u60", + "errors": [ + { + "code": "Request Timeout", + "message": "The server would not wait any longer for the client" + } + ] + }, + "status": 408 + } +} diff --git a/test/__mocks__/data/salesforce/proxy_response.json b/test/__mocks__/data/salesforce/proxy_response.json index f24e898825..2ce60e4ec9 100644 --- a/test/__mocks__/data/salesforce/proxy_response.json +++ b/test/__mocks__/data/salesforce/proxy_response.json @@ -69,7 +69,7 @@ "status": 503 } }, - "https://rudderstack.my.salesforce.com/services/data/v50.0/parameterizedSearch/?q=external_id&sobject=object_name&in=External_ID__c&object_name.fields=id,External_ID__c": { + "https://rudderstack.my.salesforce.com/services/data/v50.0/parameterizedSearch/?q=123&sobject=object_name&in=External_ID__c&object_name.fields=id,External_ID__c": { "response": { "searchRecords": [ { diff --git a/test/__mocks__/network.js b/test/__mocks__/network.js index acfe898512..752dd48ca2 100644 --- a/test/__mocks__/network.js +++ b/test/__mocks__/network.js @@ -23,7 +23,8 @@ const urlDirectoryMap = { "api.clevertap.com": "clevertap", "marketo_acct_id_success.mktorest.com": "marketo_static_list", "api.criteo.com": "criteo_audience", - "business-api.tiktok.com": "tiktok_ads" + "business-api.tiktok.com": "tiktok_ads", + "api.intercom.io": "intercom" }; function getData(arg) { diff --git a/test/__tests__/data/intercom_proxy_input.json b/test/__tests__/data/intercom_proxy_input.json new file mode 100644 index 0000000000..0074202ceb --- /dev/null +++ b/test/__tests__/data/intercom_proxy_input.json @@ -0,0 +1,44 @@ +[ + { + "version": "1", + "type": "REST", + "method": "POST", + "endpoint": "https://api.intercom.io/users/test1", + "headers": { + "Content-Type": "application/json", + "Authorization": "Bearer intercomApiKey", + "Accept": "application/json", + "Intercom-Version": "1.4" + }, + "params": {}, + "body": { + "JSON": { + "email": "test_1@test.com", + "phone": "9876543210", + "name": "Test Name", + "signed_up_at": 1601493060, + "last_seen_user_agent": "unknown", + "update_last_request_at": true, + "user_id": "test_user_id_1", + "custom_attributes": { + "anonymousId": "58b21c2d-f8d5-4410-a2d0-b268a26b7e33", + "key1": "value1", + "address.city": "Kolkata", + "address.state": "West Bengal", + "originalArray[0].nested_field": "nested value", + "originalArray[0].tags[0]": "tag_1", + "originalArray[0].tags[1]": "tag_2", + "originalArray[0].tags[2]": "tag_3", + "originalArray[1].nested_field": "nested value", + "originalArray[1].tags[0]": "tag_1", + "originalArray[2].nested_field": "nested value" + } + }, + "XML": {}, + "JSON_ARRAY": {}, + "FORM": {} + }, + "files": {}, + "userId": "58b21c2d-f8d5-4410-a2d0-b268a26b7e33" + } +] diff --git a/test/__tests__/data/intercom_proxy_output.json b/test/__tests__/data/intercom_proxy_output.json new file mode 100644 index 0000000000..5b6f9ef915 --- /dev/null +++ b/test/__tests__/data/intercom_proxy_output.json @@ -0,0 +1,31 @@ +[ + { + "output": { + "status": 500, + "message": "[Intercom Response Handler] Request failed for destination intercom with status: 408", + "destinationResponse": { + "response": { + "type": "error.list", + "request_id": "000on04msi4jpk7d3u60", + "errors": [ + { + "code": "Request Timeout", + "message": "The server would not wait any longer for the client" + } + ] + }, + "status": 408 + }, + "statTags": { + "destType": "INTERCOM", + "errorCategory": "network", + "destinationId": "Non-determininable", + "workspaceId": "Non-determininable", + "errorType": "retryable", + "feature": "dataDelivery", + "implementation": "native", + "module": "destination" + } + } + } +] diff --git a/test/__tests__/data/marketo_bulk_upload_fileUpload_output.json b/test/__tests__/data/marketo_bulk_upload_fileUpload_output.json index 2ba4fadd72..0ea94284ae 100644 --- a/test/__tests__/data/marketo_bulk_upload_fileUpload_output.json +++ b/test/__tests__/data/marketo_bulk_upload_fileUpload_output.json @@ -2,7 +2,6 @@ { "statusCode": 200, "importId": "2977", - "pollURL": "/pollStatus", "metadata": { "successfulJobs": ["17"], "unsuccessfulJobs": [], @@ -12,7 +11,6 @@ { "statusCode": 200, "importId": "2977", - "pollURL": "/pollStatus", "metadata": { "successfulJobs": ["17"], "unsuccessfulJobs": [], @@ -22,7 +20,6 @@ { "statusCode": 200, "importId": "2977", - "pollURL": "/pollStatus", "metadata": { "successfulJobs": ["17"], "unsuccessfulJobs": [], @@ -37,7 +34,6 @@ { "statusCode": 200, "importId": "2977", - "pollURL": "/pollStatus", "metadata": { "successfulJobs": ["17"], "unsuccessfulJobs": [], @@ -46,22 +42,22 @@ }, { "statusCode": 400, - "error": "Could not upload file", + "error": "[Could not upload file]Error message: undefined", "metadata": null }, { "statusCode": 400, - "error": "There are 10 imports currently being processed. Please try again later", + "error": "[Could not upload file]Error message: There are 10 imports currently being processed. Please try again later", "metadata": null }, { "statusCode": 400, - "error": "Empty file", + "error": "[Could not upload file]Error message: Empty file", "metadata": null }, { "statusCode": 400, - "error": "Any other error", + "error": "[Could not upload file]Error message: Any other error", "metadata": null } ] diff --git a/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json b/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json index 7c0d6c2d77..eac43ee82c 100644 --- a/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json +++ b/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json @@ -8,7 +8,7 @@ }, { "statusCode": 400, - "error": "Could not fetch warning job status" + "error": "Unable to fetch job status" } ] }, @@ -21,7 +21,7 @@ }, { "statusCode": 400, - "error": "Could not fetch failure job status" + "error": "Unable to fetch job status" } ] } diff --git a/test/__tests__/data/marketo_bulk_upload_poll_output.json b/test/__tests__/data/marketo_bulk_upload_poll_output.json index c10744bfe7..92e312072e 100644 --- a/test/__tests__/data/marketo_bulk_upload_poll_output.json +++ b/test/__tests__/data/marketo_bulk_upload_poll_output.json @@ -1,11 +1,10 @@ [ { - "success": true, + "Complete": true, "statusCode": 200, "hasFailed": false, - "failedJobsURL": "/getFailedJobs", - "hasWarnings": false, - "warningJobsURL": "/getWarningJobs" + "InProgress": false, + "HasWarning": false }, { "statusCode": 400, @@ -13,6 +12,6 @@ }, { "statusCode": 400, - "error": "Any 500 error" + "error": "[Could not poll status]Error message: Any 500 error" } ] diff --git a/test/__tests__/data/optimizely_fullstack.json b/test/__tests__/data/optimizely_fullstack.json index 8ab14925dc..1331d38033 100644 --- a/test/__tests__/data/optimizely_fullstack.json +++ b/test/__tests__/data/optimizely_fullstack.json @@ -296,7 +296,7 @@ } }, "output": { - "error": "Data File Lookup Failed" + "error": "Data File Lookup Failed due to {\"code\":\"document_not_found\",\"message\":\"document_not_found\"}" } }, { diff --git a/test/__tests__/data/salesforce_proxy_input.json b/test/__tests__/data/salesforce_proxy_input.json index f04c3afe4e..f7b8257510 100644 --- a/test/__tests__/data/salesforce_proxy_input.json +++ b/test/__tests__/data/salesforce_proxy_input.json @@ -233,7 +233,7 @@ { "type": "REST", "method": "POST", - "endpoint": "https://rudderstack.my.salesforce.com/services/data/v50.0/parameterizedSearch/?q=external_id&sobject=object_name&in=External_ID__c&object_name.fields=id,External_ID__c", + "endpoint": "https://rudderstack.my.salesforce.com/services/data/v50.0/parameterizedSearch/?q=123&sobject=object_name&in=External_ID__c&object_name.fields=id,External_ID__c", "headers": { "Content-Type": "application/json", "Authorization": "Bearer token" @@ -241,7 +241,7 @@ "body": { "JSON": { "Planning_Categories__c": "pc", - "External_ID__c": "external_id" + "External_ID__c": 123 }, "JSON_ARRAY": {}, "XML": {}, diff --git a/test/__tests__/proxy.test.ts b/test/__tests__/proxy.test.ts index 2eda94f8b0..9f13c198fc 100644 --- a/test/__tests__/proxy.test.ts +++ b/test/__tests__/proxy.test.ts @@ -48,6 +48,7 @@ const destinations = [ 'marketo_static_list', 'criteo_audience', 'tiktok_ads', + 'intercom' ]; // start of generic tests