diff --git a/src/v0/destinations/marketo_bulk_upload/fileUpload.js b/src/v0/destinations/marketo_bulk_upload/fileUpload.js index 4c1679cbfc..9c42fdc98d 100644 --- a/src/v0/destinations/marketo_bulk_upload/fileUpload.js +++ b/src/v0/destinations/marketo_bulk_upload/fileUpload.js @@ -206,11 +206,11 @@ const getImportID = async (input, config, accessToken, csvHeader) => { stats.counter('marketo_bulk_upload_upload_file_unsuccJobs', unsuccessfulJobs.length); if (!isHttpStatusSuccess(resp.status)) { throw new NetworkError( - `Unable to upload file due to error : ${resp.response}`, + `Unable to upload file due to error : ${JSON.stringify(resp.response)}`, hydrateStatusForServer(resp.status, 'During uploading file'), ); } - return handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime, config); + return handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime); } return { importId: null, successfulJobs, unsuccessfulJobs }; }; @@ -242,7 +242,6 @@ const responseHandler = async (input, config) => { accessToken, headerForCsv, ); - // if upload is successful if (importId) { const csvHeader = headerForCsv.toString(); diff --git a/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js b/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js index 78ac7c9e48..769fa4006d 100644 --- a/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js +++ b/src/v0/destinations/marketo_bulk_upload/marketo_bulk_upload.util.test.js @@ -2,9 +2,49 @@ const { handleCommonErrorResponse, handlePollResponse, handleFileUploadResponse, + getAccessToken, } = require('./util'); -const { AbortedError, RetryableError } = require('@rudderstack/integrations-lib'); +const { + AbortedError, + RetryableError, + NetworkError, + TransformationError, +} = require('@rudderstack/integrations-lib'); +const util = require('./util.js'); +const networkAdapter = require('../../../adapters/network'); +const { handleHttpRequest } = networkAdapter; + +// Mock the handleHttpRequest function +jest.mock('../../../adapters/network'); + +const successfulResponse = { + status: 200, + response: { + access_token: '', + token_type: 'bearer', + expires_in: 3600, + scope: 'dummy@scope.com', + success: true, + }, +}; + +const unsuccessfulResponse = { + status: 400, + response: '[ENOTFOUND] :: DNS lookup failed', +}; + +const emptyResponse = { + response: '', +}; + +const invalidClientErrorResponse = { + status: 401, + response: { + error: 'invalid_client', + error_description: 'Bad client credentials', + }, +}; describe('handleCommonErrorResponse', () => { test('should throw AbortedError for abortable error codes', () => { @@ -13,7 +53,7 @@ describe('handleCommonErrorResponse', () => { errors: [{ code: 1003, message: 'Aborted' }], }, }; - expect(() => handleCommonErrorResponse(resp, 'OpErrorMessage', 'OpActivity')).toThrow( + expect(() => handleCommonErrorResponse(resp, 'opErrorMessage', 'opActivity')).toThrow( AbortedError, ); }); @@ -24,7 +64,7 @@ describe('handleCommonErrorResponse', () => { errors: [{ code: 615, message: 'Throttled' }], }, }; - expect(() => handleCommonErrorResponse(resp, 'OpErrorMessage', 'OpActivity')).toThrow( + expect(() => handleCommonErrorResponse(resp, 'opErrorMessage', 'opActivity')).toThrow( RetryableError, ); }); @@ -35,7 +75,7 @@ describe('handleCommonErrorResponse', () => { errors: [{ code: 2000, message: 'Retryable' }], }, }; - expect(() => handleCommonErrorResponse(resp, 'OpErrorMessage', 'OpActivity')).toThrow( + expect(() => handleCommonErrorResponse(resp, 'opErrorMessage', 'opActivity')).toThrow( RetryableError, ); }); @@ -46,7 +86,7 @@ describe('handleCommonErrorResponse', () => { errors: [], }, }; - expect(() => handleCommonErrorResponse(resp, 'OpErrorMessage', 'OpActivity')).toThrow( + expect(() => handleCommonErrorResponse(resp, 'opErrorMessage', 'opActivity')).toThrow( RetryableError, ); }); @@ -228,3 +268,88 @@ describe('handleFileUploadResponse', () => { }).toThrow(AbortedError); }); }); + +describe('getAccessToken', () => { + beforeEach(() => { + handleHttpRequest.mockClear(); + }); + + it('should retrieve and return access token on successful response', async () => { + const url = + 'https://dummyMunchkinId.mktorest.com/identity/oauth/token?client_id=dummyClientId&client_secret=dummyClientSecret&grant_type=client_credentials'; + + handleHttpRequest.mockResolvedValueOnce({ + processedResponse: successfulResponse, + }); + + const config = { + clientId: 'dummyClientId', + clientSecret: 'dummyClientSecret', + munchkinId: 'dummyMunchkinId', + }; + + const result = await getAccessToken(config); + expect(result).toBe(''); + expect(handleHttpRequest).toHaveBeenCalledTimes(1); + // Ensure your mock response structure is consistent with the actual behavior + expect(handleHttpRequest).toHaveBeenCalledWith('get', url, { + destType: 'marketo_bulk_upload', + feature: 'transformation', + }); + }); + + it('should throw a NetworkError on unsuccessful HTTP status', async () => { + handleHttpRequest.mockResolvedValueOnce({ + processedResponse: unsuccessfulResponse, + }); + + const config = { + clientId: 'dummyClientId', + clientSecret: 'dummyClientSecret', + munchkinId: 'dummyMunchkinId', + }; + + await expect(getAccessToken(config)).rejects.toThrow(NetworkError); + }); + + it('should throw a RetryableError when expires_in is 0', async () => { + handleHttpRequest.mockResolvedValueOnce({ + processedResponse: { + ...successfulResponse, + response: { ...successfulResponse.response, expires_in: 0 }, + }, + }); + + const config = { + clientId: 'dummyClientId', + clientSecret: 'dummyClientSecret', + munchkinId: 'dummyMunchkinId', + }; + + await expect(getAccessToken(config)).rejects.toThrow(RetryableError); + }); + + it('should throw an AbortedError on unsuccessful response', async () => { + handleHttpRequest.mockResolvedValueOnce({ processedResponse: invalidClientErrorResponse }); + + const config = { + clientId: 'invalidClientID', + clientSecret: 'dummyClientSecret', + munchkinId: 'dummyMunchkinId', + }; + + await expect(getAccessToken(config)).rejects.toThrow(NetworkError); + }); + + it('should throw transformation error response', async () => { + handleHttpRequest.mockResolvedValueOnce({ processedResponse: emptyResponse }); + + const config = { + clientId: 'dummyClientId', + clientSecret: 'dummyClientSecret', + munchkinId: 'dummyMunchkinId', + }; + + await expect(getAccessToken(config)).rejects.toThrow(TransformationError); + }); +}); diff --git a/src/v0/destinations/marketo_bulk_upload/poll.js b/src/v0/destinations/marketo_bulk_upload/poll.js index 97211c4763..db7a634774 100644 --- a/src/v0/destinations/marketo_bulk_upload/poll.js +++ b/src/v0/destinations/marketo_bulk_upload/poll.js @@ -34,11 +34,11 @@ const getPollStatus = async (event) => { state: 'Retryable', }); throw new NetworkError( - `Could not poll status: due to error ${pollStatus.response}`, + `Could not poll status: due to error ${JSON.stringify(pollStatus.response)}`, hydrateStatusForServer(pollStatus.status, 'During fetching poll status'), ); } - return handlePollResponse(pollStatus, event.config); + return handlePollResponse(pollStatus); }; const responseHandler = async (event) => { diff --git a/src/v0/destinations/marketo_bulk_upload/util.js b/src/v0/destinations/marketo_bulk_upload/util.js index 9661b0e4cb..8b46212b87 100644 --- a/src/v0/destinations/marketo_bulk_upload/util.js +++ b/src/v0/destinations/marketo_bulk_upload/util.js @@ -19,13 +19,8 @@ const { FILE_UPLOAD_ERR_MSG, ACCESS_TOKEN_FETCH_ERR_MSG, } = require('./config'); -const Cache = require('../../util/cache'); const logger = require('../../../logger'); -const { AUTH_CACHE_TTL } = require('../../util/constant'); - -const authCache = new Cache(AUTH_CACHE_TTL); - const getMarketoFilePath = () => `${__dirname}/uploadFile/${Date.now()}_marketo_bulk_upload_${generateUUID()}.csv`; @@ -41,18 +36,13 @@ const hydrateStatusForServer = (statusCode, context) => { return status; }; -const getAccessTokenCacheKey = (config = {}) => { - const { munchkinId, clientId, clientSecret } = config; - return `${munchkinId}-${clientId}-${clientSecret}`; -}; - /** * Handles common error responses returned from API calls. * Checks the error code and throws the appropriate error object based on the code. * * @param {object} resp - The response object containing the error information. - * @param {string} OpErrorMessage - The error message to be used if the error code is not recognized. - * @param {string} OpActivity - The activity name for tracking purposes. + * @param {string} opErrorMessage - The error message to be used if the error code is not recognized. + * @param {string} opActivity - The activity name for tracking purposes. * @throws {AbortedError} - If the error code is abortable. * @throws {ThrottledError} - If the error code is within the range of throttled codes. * @throws {RetryableError} - If the error code is neither abortable nor throttled. @@ -75,29 +65,19 @@ const getAccessTokenCacheKey = (config = {}) => { * console.log(error); * } */ -const handleCommonErrorResponse = (apiCallResult, OpErrorMessage, OpActivity, config) => { +const handleCommonErrorResponse = (apiCallResult, opErrorMessage, opActivity) => { // checking for invalid/expired token errors and evicting cache in that case // rudderJobMetadata contains some destination info which is being used to evict the cache if ( - authCache && apiCallResult.response?.errors && apiCallResult.response?.errors?.length > 0 && apiCallResult.response?.errors.some( (errorObj) => errorObj.code === '601' || errorObj.code === '602', ) ) { - // Special handling for 601 and 602 error codes for access token - authCache.del(getAccessTokenCacheKey(config)); - if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '601')) { - throw new AbortedError( - `[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, - ); - } - if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '602')) { - throw new RetryableError( - `[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, - ); - } + throw new RetryableError( + `[${opErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, + ); } if ( apiCallResult.response?.errors?.length > 0 && @@ -107,29 +87,29 @@ const handleCommonErrorResponse = (apiCallResult, OpErrorMessage, OpActivity, co ABORTABLE_CODES.includes(apiCallResult.response?.errors[0]?.code)) ) { // for empty file the code is 1003 and that should be retried - stats.increment(OpActivity, { + stats.increment(opActivity, { status: 400, state: 'Abortable', }); - throw new AbortedError(apiCallResult.response?.errors[0]?.message || OpErrorMessage, 400); + throw new AbortedError(apiCallResult.response?.errors[0]?.message || opErrorMessage, 400); } else if (THROTTLED_CODES.includes(apiCallResult.response?.errors[0]?.code)) { // for more than 10 concurrent uses the code is 615 and that should be retried - stats.increment(OpActivity, { + stats.increment(opActivity, { status: 429, state: 'Retryable', }); throw new RetryableError( - `[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, + `[${opErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, 500, ); } // by default every thing will be retried - stats.increment(OpActivity, { + stats.increment(opActivity, { status: 500, state: 'Retryable', }); throw new RetryableError( - `[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, + `[${opErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`, 500, ); }; @@ -142,54 +122,48 @@ const getAccessTokenURL = (config) => { // Fetch access token from client id and client secret // DOC: https://developers.marketo.com/rest-api/authentication/ -const getAccessToken = async (config) => - authCache.get(getAccessTokenCacheKey(config), async () => { - const url = getAccessTokenURL(config); - const { processedResponse: accessTokenResponse } = await handleHttpRequest('get', url, { - destType: 'marketo_bulk_upload', - feature: 'transformation', - }); +const getAccessToken = async (config) => { + const url = getAccessTokenURL(config); + const { processedResponse: accessTokenResponse } = await handleHttpRequest('get', url, { + destType: 'marketo_bulk_upload', + feature: 'transformation', + }); - // sample response : {response: '[ENOTFOUND] :: DNS lookup failed', status: 400} - if (!isHttpStatusSuccess(accessTokenResponse.status)) { - throw new NetworkError( - `Could not retrieve authorisation token due to error ${accessTokenResponse}`, - hydrateStatusForServer(accessTokenResponse.status, FETCH_ACCESS_TOKEN), - { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(accessTokenResponse.status), - }, - accessTokenResponse, - ); - } - if (accessTokenResponse.response?.success === false) { - handleCommonErrorResponse( - accessTokenResponse, - ACCESS_TOKEN_FETCH_ERR_MSG, - FETCH_ACCESS_TOKEN, - config, - ); - } + // sample response : {response: '[ENOTFOUND] :: DNS lookup failed', status: 400} + if (!isHttpStatusSuccess(accessTokenResponse.status)) { + throw new NetworkError( + `Could not retrieve authorisation token due to error ${JSON.stringify(accessTokenResponse)}`, + hydrateStatusForServer(accessTokenResponse.status, FETCH_ACCESS_TOKEN), + { + [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(accessTokenResponse.status), + }, + accessTokenResponse, + ); + } + if (accessTokenResponse.response?.success === false) { + handleCommonErrorResponse(accessTokenResponse, ACCESS_TOKEN_FETCH_ERR_MSG, FETCH_ACCESS_TOKEN); + } - // when access token is present - if (accessTokenResponse.response.access_token) { - /* This scenario will handle the case when we get the following response + // when access token is present + if (accessTokenResponse.response.access_token) { + /* This scenario will handle the case when we get the following response status: 200 respnse: {"access_token":"","token_type":"bearer","expires_in":0,"scope":"dummy@scope.com"} wherein "expires_in":0 denotes that we should refresh the accessToken but its not expired yet. */ - if (accessTokenResponse.response?.expires_in === 0) { - throw new RetryableError( - `Request Failed for marketo_bulk_upload, Access Token Expired (Retryable).`, - 500, - ); - } - return accessTokenResponse.response.access_token; + if (accessTokenResponse.response?.expires_in === 0) { + throw new RetryableError( + `Request Failed for marketo_bulk_upload, Access Token Expired (Retryable).`, + 500, + ); } - throw new AbortedError( - `Could not retrieve authorisation token due to error ${accessTokenResponse}`, - 400, - ); - }); + return accessTokenResponse.response.access_token; + } + throw new RetryableError( + `Could not retrieve authorisation token due to error ${JSON.stringify(accessTokenResponse)}`, + 500, + ); +}; /** * Handles the response of a polling operation. @@ -200,7 +174,7 @@ const getAccessToken = async (config) => * @param {object} pollStatus - The response object from the polling operation. * @returns {object|null} - The response object if the polling operation was successful, otherwise null. */ -const handlePollResponse = (pollStatus, config) => { +const handlePollResponse = (pollStatus) => { // DOC: https://developers.marketo.com/rest-api/error-codes/ if (pollStatus.response.errors) { /* Sample error response for poll is: @@ -216,7 +190,7 @@ const handlePollResponse = (pollStatus, config) => { ] } */ - handleCommonErrorResponse(pollStatus, POLL_STATUS_ERR_MSG, POLL_ACTIVITY, config); + handleCommonErrorResponse(pollStatus, POLL_STATUS_ERR_MSG, POLL_ACTIVITY); } /* @@ -257,7 +231,7 @@ const handleFetchJobStatusResponse = (resp, type) => { if (!isHttpStatusSuccess(marketoReposnseStatus)) { logger.info('[Network Error]:Failed during fetching job status', { marketoResponse, type }); throw new NetworkError( - `Unable to fetch job status: due to error ${marketoResponse}`, + `Unable to fetch job status: due to error ${JSON.stringify(marketoResponse)}`, hydrateStatusForServer(marketoReposnseStatus, 'During fetching job status'), ); } @@ -294,7 +268,7 @@ const handleFetchJobStatusResponse = (resp, type) => { * @param {number} requestTime - The time taken for the request in milliseconds. * @returns {object} - An object containing the importId, successfulJobs, and unsuccessfulJobs. */ -const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, requestTime, config) => { +const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, requestTime) => { /* For unsuccessful response { @@ -319,7 +293,7 @@ const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, reques 500, ); } else { - handleCommonErrorResponse(resp, FILE_UPLOAD_ERR_MSG, UPLOAD_FILE, config); + handleCommonErrorResponse(resp, FILE_UPLOAD_ERR_MSG, UPLOAD_FILE); } } @@ -402,7 +376,7 @@ const getFieldSchemaMap = async (accessToken, munchkinId) => { }); } else { throw new RetryableError( - `Failed to fetch Marketo Field Schema due to error ${fieldSchemaMapping}`, + `Failed to fetch Marketo Field Schema due to error ${JSON.stringify(fieldSchemaMapping)}`, 500, fieldSchemaMapping, ); diff --git a/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json b/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json index 320ed050c5..60628f6b3f 100644 --- a/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json +++ b/test/__tests__/data/marketo_bulk_upload_jobStatus_output.json @@ -8,7 +8,7 @@ }, { "statusCode": 400, - "error": "Unable to fetch job status: due to error " + "error": "Unable to fetch job status: due to error \"\"" } ] }, @@ -21,7 +21,7 @@ }, { "statusCode": 400, - "error": "Unable to fetch job status: due to error " + "error": "Unable to fetch job status: due to error \"\"" } ] }