Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deleting access token cache for marketo bulk upload destination #3029

Merged
merged 10 commits into from
Jan 25, 2024
5 changes: 2 additions & 3 deletions src/v0/destinations/marketo_bulk_upload/fileUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ const getImportID = async (input, config, accessToken, csvHeader) => {
stats.counter('marketo_bulk_upload_upload_file_unsuccJobs', unsuccessfulJobs.length);
if (!isHttpStatusSuccess(resp.status)) {
throw new NetworkError(
`Unable to upload file due to error : ${resp.response}`,
`Unable to upload file due to error : ${JSON.stringify(resp.response)}`,
hydrateStatusForServer(resp.status, 'During uploading file'),
);
}
return handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime, config);
return handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime);
}
return { importId: null, successfulJobs, unsuccessfulJobs };
};
Expand Down Expand Up @@ -242,7 +242,6 @@ const responseHandler = async (input, config) => {
accessToken,
headerForCsv,
);

// if upload is successful
if (importId) {
const csvHeader = headerForCsv.toString();
Expand Down
4 changes: 2 additions & 2 deletions src/v0/destinations/marketo_bulk_upload/poll.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ const getPollStatus = async (event) => {
state: 'Retryable',
});
throw new NetworkError(
`Could not poll status: due to error ${pollStatus.response}`,
`Could not poll status: due to error ${JSON.stringify(pollStatus.response)}`,
hydrateStatusForServer(pollStatus.status, 'During fetching poll status'),
);
}
return handlePollResponse(pollStatus, event.config);
return handlePollResponse(pollStatus);
};

const responseHandler = async (event) => {
Expand Down
120 changes: 50 additions & 70 deletions src/v0/destinations/marketo_bulk_upload/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@
FILE_UPLOAD_ERR_MSG,
ACCESS_TOKEN_FETCH_ERR_MSG,
} = require('./config');
const Cache = require('../../util/cache');
const logger = require('../../../logger');

const { AUTH_CACHE_TTL } = require('../../util/constant');

const authCache = new Cache(AUTH_CACHE_TTL);

const getMarketoFilePath = () =>
`${__dirname}/uploadFile/${Date.now()}_marketo_bulk_upload_${generateUUID()}.csv`;

Expand All @@ -41,10 +36,10 @@
return status;
};

const getAccessTokenCacheKey = (config = {}) => {
const { munchkinId, clientId, clientSecret } = config;
return `${munchkinId}-${clientId}-${clientSecret}`;
};
// const getAccessTokenCacheKey = (config = {}) => {
// const { munchkinId, clientId, clientSecret } = config;
// return `${munchkinId}-${clientId}-${clientSecret}`;
// };

/**
* Handles common error responses returned from API calls.
Expand Down Expand Up @@ -75,29 +70,20 @@
* console.log(error);
* }
*/
const handleCommonErrorResponse = (apiCallResult, OpErrorMessage, OpActivity, config) => {
const handleCommonErrorResponse = (apiCallResult, OpErrorMessage, OpActivity) => {
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
// checking for invalid/expired token errors and evicting cache in that case
// rudderJobMetadata contains some destination info which is being used to evict the cache
if (
authCache &&
// authCache &&
apiCallResult.response?.errors &&
apiCallResult.response?.errors?.length > 0 &&
apiCallResult.response?.errors.some(
(errorObj) => errorObj.code === '601' || errorObj.code === '602',
)
) {
// Special handling for 601 and 602 error codes for access token
authCache.del(getAccessTokenCacheKey(config));
if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '601')) {
throw new AbortedError(
`[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`,
);
}
if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '602')) {
throw new RetryableError(
`[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`,
);
}
throw new RetryableError(

Check warning on line 84 in src/v0/destinations/marketo_bulk_upload/util.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/marketo_bulk_upload/util.js#L84

Added line #L84 was not covered by tests
`[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`,
);
}
if (
apiCallResult.response?.errors?.length > 0 &&
Expand Down Expand Up @@ -142,54 +128,48 @@

// Fetch access token from client id and client secret
// DOC: https://developers.marketo.com/rest-api/authentication/
const getAccessToken = async (config) =>
authCache.get(getAccessTokenCacheKey(config), async () => {
const url = getAccessTokenURL(config);
const { processedResponse: accessTokenResponse } = await handleHttpRequest('get', url, {
destType: 'marketo_bulk_upload',
feature: 'transformation',
});
const getAccessToken = async (config) => {
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
const url = getAccessTokenURL(config);
const { processedResponse: accessTokenResponse } = await handleHttpRequest('get', url, {
destType: 'marketo_bulk_upload',
feature: 'transformation',
});

// sample response : {response: '[ENOTFOUND] :: DNS lookup failed', status: 400}
if (!isHttpStatusSuccess(accessTokenResponse.status)) {
throw new NetworkError(
`Could not retrieve authorisation token due to error ${accessTokenResponse}`,
hydrateStatusForServer(accessTokenResponse.status, FETCH_ACCESS_TOKEN),
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(accessTokenResponse.status),
},
accessTokenResponse,
);
}
if (accessTokenResponse.response?.success === false) {
handleCommonErrorResponse(
accessTokenResponse,
ACCESS_TOKEN_FETCH_ERR_MSG,
FETCH_ACCESS_TOKEN,
config,
);
}
// sample response : {response: '[ENOTFOUND] :: DNS lookup failed', status: 400}
if (!isHttpStatusSuccess(accessTokenResponse.status)) {
throw new NetworkError(

Check warning on line 140 in src/v0/destinations/marketo_bulk_upload/util.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/marketo_bulk_upload/util.js#L140

Added line #L140 was not covered by tests
`Could not retrieve authorisation token due to error ${accessTokenResponse}`,
hydrateStatusForServer(accessTokenResponse.status, FETCH_ACCESS_TOKEN),
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(accessTokenResponse.status),
},
accessTokenResponse,
);
}
if (accessTokenResponse.response?.success === false) {
handleCommonErrorResponse(accessTokenResponse, ACCESS_TOKEN_FETCH_ERR_MSG, FETCH_ACCESS_TOKEN);

Check warning on line 150 in src/v0/destinations/marketo_bulk_upload/util.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/marketo_bulk_upload/util.js#L150

Added line #L150 was not covered by tests
}

// when access token is present
if (accessTokenResponse.response.access_token) {
/* This scenario will handle the case when we get the following response
// when access token is present
if (accessTokenResponse.response.access_token) {
/* This scenario will handle the case when we get the following response
status: 200
respnse: {"access_token":"<dummy-access-token>","token_type":"bearer","expires_in":0,"scope":"[email protected]"}
wherein "expires_in":0 denotes that we should refresh the accessToken but its not expired yet.
*/
if (accessTokenResponse.response?.expires_in === 0) {
throw new RetryableError(
`Request Failed for marketo_bulk_upload, Access Token Expired (Retryable).`,
500,
);
}
return accessTokenResponse.response.access_token;
if (accessTokenResponse.response?.expires_in === 0) {
throw new RetryableError(

Check warning on line 161 in src/v0/destinations/marketo_bulk_upload/util.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/marketo_bulk_upload/util.js#L161

Added line #L161 was not covered by tests
`Request Failed for marketo_bulk_upload, Access Token Expired (Retryable).`,
500,
);
}
throw new AbortedError(
`Could not retrieve authorisation token due to error ${accessTokenResponse}`,
400,
);
});
return accessTokenResponse.response.access_token;
}
throw new AbortedError(

Check warning on line 168 in src/v0/destinations/marketo_bulk_upload/util.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/marketo_bulk_upload/util.js#L168

Added line #L168 was not covered by tests
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
`Could not retrieve authorisation token due to error ${JSON.stringify(accessTokenResponse)}`,
400,
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
);
};

/**
* Handles the response of a polling operation.
Expand All @@ -200,7 +180,7 @@
* @param {object} pollStatus - The response object from the polling operation.
* @returns {object|null} - The response object if the polling operation was successful, otherwise null.
*/
const handlePollResponse = (pollStatus, config) => {
const handlePollResponse = (pollStatus) => {
// DOC: https://developers.marketo.com/rest-api/error-codes/
if (pollStatus.response.errors) {
/* Sample error response for poll is:
Expand All @@ -216,7 +196,7 @@
]
}
*/
handleCommonErrorResponse(pollStatus, POLL_STATUS_ERR_MSG, POLL_ACTIVITY, config);
handleCommonErrorResponse(pollStatus, POLL_STATUS_ERR_MSG, POLL_ACTIVITY);
}

/*
Expand Down Expand Up @@ -257,7 +237,7 @@
if (!isHttpStatusSuccess(marketoReposnseStatus)) {
logger.info('[Network Error]:Failed during fetching job status', { marketoResponse, type });
throw new NetworkError(
`Unable to fetch job status: due to error ${marketoResponse}`,
`Unable to fetch job status: due to error ${JSON.stringify(marketoResponse)}`,
hydrateStatusForServer(marketoReposnseStatus, 'During fetching job status'),
);
}
Expand Down Expand Up @@ -294,7 +274,7 @@
* @param {number} requestTime - The time taken for the request in milliseconds.
* @returns {object} - An object containing the importId, successfulJobs, and unsuccessfulJobs.
*/
const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, requestTime, config) => {
const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, requestTime) => {
/*
For unsuccessful response
{
Expand All @@ -319,7 +299,7 @@
500,
);
} else {
handleCommonErrorResponse(resp, FILE_UPLOAD_ERR_MSG, UPLOAD_FILE, config);
handleCommonErrorResponse(resp, FILE_UPLOAD_ERR_MSG, UPLOAD_FILE);
}
}

Expand Down Expand Up @@ -402,7 +382,7 @@
});
} else {
throw new RetryableError(
`Failed to fetch Marketo Field Schema due to error ${fieldSchemaMapping}`,
`Failed to fetch Marketo Field Schema due to error ${JSON.stringify(fieldSchemaMapping)}`,
500,
fieldSchemaMapping,
);
Expand Down
4 changes: 2 additions & 2 deletions test/__tests__/data/marketo_bulk_upload_jobStatus_output.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
},
{
"statusCode": 400,
"error": "Unable to fetch job status: due to error "
"error": "Unable to fetch job status: due to error \"\""
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
}
]
},
Expand All @@ -21,7 +21,7 @@
},
{
"statusCode": 400,
"error": "Unable to fetch job status: due to error "
"error": "Unable to fetch job status: due to error \"\""
}
]
}
Expand Down
Loading