Skip to content

Commit

Permalink
fix: removing cache and adding debug logs
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed Jan 24, 2024
1 parent de4e321 commit 0b732e8
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 72 deletions.
1 change: 1 addition & 0 deletions src/v0/destinations/marketo_bulk_upload/fetchJobStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const responseHandler = async (event, type) => {

const { config } = event;
const accessToken = await getAccessToken(config);
console.log('access token in fetching job status', accessToken);

/**
* {
Expand Down
7 changes: 4 additions & 3 deletions src/v0/destinations/marketo_bulk_upload/fileUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ const getImportID = async (input, config, accessToken, csvHeader) => {
stats.counter('marketo_bulk_upload_upload_file_unsuccJobs', unsuccessfulJobs.length);
if (!isHttpStatusSuccess(resp.status)) {
throw new NetworkError(
`Unable to upload file due to error : ${resp.response}`,
`Unable to upload file due to error : ${JSON.stringify(resp.response)}`,
hydrateStatusForServer(resp.status, 'During uploading file'),
);
}
return handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime, config);
return handleFileUploadResponse(resp, successfulJobs, unsuccessfulJobs, requestTime);
}
return { importId: null, successfulJobs, unsuccessfulJobs };
};
Expand All @@ -223,6 +223,7 @@ const getImportID = async (input, config, accessToken, csvHeader) => {
*/
const responseHandler = async (input, config) => {
const accessToken = await getAccessToken(config);
console.log('access token while uploading', accessToken);
/**
{
"importId" : <some-id>,
Expand All @@ -242,7 +243,7 @@ const responseHandler = async (input, config) => {
accessToken,
headerForCsv,
);

console.log('import ID', importId);
// if upload is successful
if (importId) {
const csvHeader = headerForCsv.toString();
Expand Down
5 changes: 3 additions & 2 deletions src/v0/destinations/marketo_bulk_upload/poll.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const { POLL_ACTIVITY } = require('./config');

const getPollStatus = async (event) => {
const accessToken = await getAccessToken(event.config);
console.log('accesstoken while polling', accessToken);
const { munchkinId } = event.config;

// To see the status of the import job polling is done
Expand All @@ -34,11 +35,11 @@ const getPollStatus = async (event) => {
state: 'Retryable',
});
throw new NetworkError(
`Could not poll status: due to error ${pollStatus.response}`,
`Could not poll status: due to error ${JSON.stringify(pollStatus.response)}`,
hydrateStatusForServer(pollStatus.status, 'During fetching poll status'),
);
}
return handlePollResponse(pollStatus, event.config);
return handlePollResponse(pollStatus);
};

const responseHandler = async (event) => {
Expand Down
178 changes: 111 additions & 67 deletions src/v0/destinations/marketo_bulk_upload/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ const {
FILE_UPLOAD_ERR_MSG,
ACCESS_TOKEN_FETCH_ERR_MSG,
} = require('./config');
const Cache = require('../../util/cache');
// const Cache = require('../../util/cache');
const logger = require('../../../logger');

const { AUTH_CACHE_TTL } = require('../../util/constant');
// const { AUTH_CACHE_TTL } = require('../../util/constant');

const authCache = new Cache(AUTH_CACHE_TTL);
// const authCache = new Cache(AUTH_CACHE_TTL);

const getMarketoFilePath = () =>
`${__dirname}/uploadFile/${Date.now()}_marketo_bulk_upload_${generateUUID()}.csv`;
Expand All @@ -41,10 +41,10 @@ const hydrateStatusForServer = (statusCode, context) => {
return status;
};

const getAccessTokenCacheKey = (config = {}) => {
const { munchkinId, clientId, clientSecret } = config;
return `${munchkinId}-${clientId}-${clientSecret}`;
};
// const getAccessTokenCacheKey = (config = {}) => {
// const { munchkinId, clientId, clientSecret } = config;
// return `${munchkinId}-${clientId}-${clientSecret}`;
// };

/**
* Handles common error responses returned from API calls.
Expand Down Expand Up @@ -75,29 +75,29 @@ const getAccessTokenCacheKey = (config = {}) => {
* console.log(error);
* }
*/
const handleCommonErrorResponse = (apiCallResult, OpErrorMessage, OpActivity, config) => {
const handleCommonErrorResponse = (apiCallResult, OpErrorMessage, OpActivity) => {
// checking for invalid/expired token errors and evicting cache in that case
// rudderJobMetadata contains some destination info which is being used to evict the cache
if (
authCache &&
// authCache &&
apiCallResult.response?.errors &&
apiCallResult.response?.errors?.length > 0 &&
apiCallResult.response?.errors.some(
(errorObj) => errorObj.code === '601' || errorObj.code === '602',
)
) {
// Special handling for 601 and 602 error codes for access token
authCache.del(getAccessTokenCacheKey(config));
if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '601')) {
throw new AbortedError(
`[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`,
);
}
if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '602')) {
throw new RetryableError(
`[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`,
);
}
// authCache.del(getAccessTokenCacheKey(config));
// if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '601')) {
// throw new AbortedError(
// `[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`,
// );
// }
// if (apiCallResult.response?.errors.some((errorObj) => errorObj.code === '602')) {
throw new RetryableError(
`[${OpErrorMessage}]Error message: ${apiCallResult.response?.errors[0]?.message}`,
);
// }
}
if (
apiCallResult.response?.errors?.length > 0 &&
Expand Down Expand Up @@ -142,54 +142,97 @@ const getAccessTokenURL = (config) => {

// Fetch access token from client id and client secret
// DOC: https://developers.marketo.com/rest-api/authentication/
const getAccessToken = async (config) =>
authCache.get(getAccessTokenCacheKey(config), async () => {
const url = getAccessTokenURL(config);
const { processedResponse: accessTokenResponse } = await handleHttpRequest('get', url, {
destType: 'marketo_bulk_upload',
feature: 'transformation',
});
// const getAccessToken = async (config) =>
// authCache.get(getAccessTokenCacheKey(config), async () => {
// const url = getAccessTokenURL(config);
// const { processedResponse: accessTokenResponse } = await handleHttpRequest('get', url, {
// destType: 'marketo_bulk_upload',
// feature: 'transformation',
// });

// // sample response : {response: '[ENOTFOUND] :: DNS lookup failed', status: 400}
// if (!isHttpStatusSuccess(accessTokenResponse.status)) {
// throw new NetworkError(
// `Could not retrieve authorisation token due to error ${accessTokenResponse}`,
// hydrateStatusForServer(accessTokenResponse.status, FETCH_ACCESS_TOKEN),
// {
// [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(accessTokenResponse.status),
// },
// accessTokenResponse,
// );
// }
// if (accessTokenResponse.response?.success === false) {
// handleCommonErrorResponse(
// accessTokenResponse,
// ACCESS_TOKEN_FETCH_ERR_MSG,
// FETCH_ACCESS_TOKEN,
// config,
// );
// }

// // when access token is present
// if (accessTokenResponse.response.access_token) {
// /* This scenario will handle the case when we get the following response
// status: 200
// respnse: {"access_token":"<dummy-access-token>","token_type":"bearer","expires_in":0,"scope":"[email protected]"}
// wherein "expires_in":0 denotes that we should refresh the accessToken but its not expired yet.
// */
// if (accessTokenResponse.response?.expires_in === 0) {
// throw new RetryableError(
// `Request Failed for marketo_bulk_upload, Access Token Expired (Retryable).`,
// 500,
// );
// }
// return accessTokenResponse.response.access_token;
// }
// throw new AbortedError(
// `Could not retrieve authorisation token due to error ${accessTokenResponse}`,
// 400,
// );
// });

const getAccessToken = async (config) => {
const url = getAccessTokenURL(config);
const { processedResponse: accessTokenResponse } = await handleHttpRequest('get', url, {
destType: 'marketo_bulk_upload',
feature: 'transformation',
});

// sample response : {response: '[ENOTFOUND] :: DNS lookup failed', status: 400}
if (!isHttpStatusSuccess(accessTokenResponse.status)) {
throw new NetworkError(
`Could not retrieve authorisation token due to error ${accessTokenResponse}`,
hydrateStatusForServer(accessTokenResponse.status, FETCH_ACCESS_TOKEN),
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(accessTokenResponse.status),
},
accessTokenResponse,
);
}
if (accessTokenResponse.response?.success === false) {
handleCommonErrorResponse(
accessTokenResponse,
ACCESS_TOKEN_FETCH_ERR_MSG,
FETCH_ACCESS_TOKEN,
config,
);
}
// sample response : {response: '[ENOTFOUND] :: DNS lookup failed', status: 400}
if (!isHttpStatusSuccess(accessTokenResponse.status)) {
throw new NetworkError(
`Could not retrieve authorisation token due to error ${accessTokenResponse}`,
hydrateStatusForServer(accessTokenResponse.status, FETCH_ACCESS_TOKEN),
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(accessTokenResponse.status),
},
accessTokenResponse,
);
}
if (accessTokenResponse.response?.success === false) {
handleCommonErrorResponse(accessTokenResponse, ACCESS_TOKEN_FETCH_ERR_MSG, FETCH_ACCESS_TOKEN);
}

// when access token is present
if (accessTokenResponse.response.access_token) {
/* This scenario will handle the case when we get the following response
// when access token is present
if (accessTokenResponse.response.access_token) {
/* This scenario will handle the case when we get the following response
status: 200
respnse: {"access_token":"<dummy-access-token>","token_type":"bearer","expires_in":0,"scope":"[email protected]"}
wherein "expires_in":0 denotes that we should refresh the accessToken but its not expired yet.
*/
if (accessTokenResponse.response?.expires_in === 0) {
throw new RetryableError(
`Request Failed for marketo_bulk_upload, Access Token Expired (Retryable).`,
500,
);
}
return accessTokenResponse.response.access_token;
if (accessTokenResponse.response?.expires_in === 0) {
throw new RetryableError(
`Request Failed for marketo_bulk_upload, Access Token Expired (Retryable).`,
500,
);
}
throw new AbortedError(
`Could not retrieve authorisation token due to error ${accessTokenResponse}`,
400,
);
});
return accessTokenResponse.response.access_token;
}
throw new AbortedError(
`Could not retrieve authorisation token due to error ${JSON.stringify(accessTokenResponse)}`,
400,
);
};

/**
* Handles the response of a polling operation.
Expand All @@ -200,7 +243,7 @@ const getAccessToken = async (config) =>
* @param {object} pollStatus - The response object from the polling operation.
* @returns {object|null} - The response object if the polling operation was successful, otherwise null.
*/
const handlePollResponse = (pollStatus, config) => {
const handlePollResponse = (pollStatus) => {
// DOC: https://developers.marketo.com/rest-api/error-codes/
if (pollStatus.response.errors) {
/* Sample error response for poll is:
Expand All @@ -216,7 +259,7 @@ const handlePollResponse = (pollStatus, config) => {
]
}
*/
handleCommonErrorResponse(pollStatus, POLL_STATUS_ERR_MSG, POLL_ACTIVITY, config);
handleCommonErrorResponse(pollStatus, POLL_STATUS_ERR_MSG, POLL_ACTIVITY);
}

/*
Expand All @@ -243,6 +286,7 @@ const handlePollResponse = (pollStatus, config) => {
});

if (pollStatus.response?.result?.length > 0) {
console.log('poll status is ', JSON.stringify(pollStatus));
return pollStatus.response;
}
}
Expand All @@ -257,7 +301,7 @@ const handleFetchJobStatusResponse = (resp, type) => {
if (!isHttpStatusSuccess(marketoReposnseStatus)) {
logger.info('[Network Error]:Failed during fetching job status', { marketoResponse, type });
throw new NetworkError(
`Unable to fetch job status: due to error ${marketoResponse}`,
`Unable to fetch job status: due to error ${JSON.stringify(marketoResponse)}`,
hydrateStatusForServer(marketoReposnseStatus, 'During fetching job status'),
);
}
Expand Down Expand Up @@ -294,7 +338,7 @@ const handleFetchJobStatusResponse = (resp, type) => {
* @param {number} requestTime - The time taken for the request in milliseconds.
* @returns {object} - An object containing the importId, successfulJobs, and unsuccessfulJobs.
*/
const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, requestTime, config) => {
const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, requestTime) => {
/*
For unsuccessful response
{
Expand All @@ -319,7 +363,7 @@ const handleFileUploadResponse = (resp, successfulJobs, unsuccessfulJobs, reques
500,
);
} else {
handleCommonErrorResponse(resp, FILE_UPLOAD_ERR_MSG, UPLOAD_FILE, config);
handleCommonErrorResponse(resp, FILE_UPLOAD_ERR_MSG, UPLOAD_FILE);
}
}

Expand Down Expand Up @@ -402,7 +446,7 @@ const getFieldSchemaMap = async (accessToken, munchkinId) => {
});
} else {
throw new RetryableError(
`Failed to fetch Marketo Field Schema due to error ${fieldSchemaMapping}`,
`Failed to fetch Marketo Field Schema due to error ${JSON.stringify(fieldSchemaMapping)}`,
500,
fieldSchemaMapping,
);
Expand Down

0 comments on commit 0b732e8

Please sign in to comment.