Skip to content

Commit

Permalink
feat(hs): chunking data based on batch limit
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir-4116 committed Dec 13, 2023
1 parent eb28f4a commit 1adef48
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 63 deletions.
3 changes: 3 additions & 0 deletions src/v0/destinations/hs/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const API_VERSION = {
v3: 'newApi',
};

const MAX_CONTACTS_PER_REQUEST = 100;

const ConfigCategory = {
COMMON: {
name: 'HSCommonConfig',
Expand Down Expand Up @@ -109,5 +111,6 @@ module.exports = {
SEARCH_LIMIT_VALUE,
RETL_SOURCE,
RETL_CREATE_ASSOCIATION_OPERATION,
MAX_CONTACTS_PER_REQUEST,
DESTINATION: 'HS',
};
134 changes: 71 additions & 63 deletions src/v0/destinations/hs/util.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const lodash = require('lodash');
const get = require('get-value');
const {
NetworkInstrumentationError,
Expand Down Expand Up @@ -25,6 +26,7 @@ const {
SEARCH_LIMIT_VALUE,
hsCommonConfigJson,
DESTINATION,
MAX_CONTACTS_PER_REQUEST,
} = require('./config');

const tags = require('../../util/tags');
Expand Down Expand Up @@ -491,84 +493,90 @@ const getExistingData = async (inputs, destination) => {
});

values = Array.from(new Set(values));
const requestData = {
filterGroups: [
{
filters: [
{
propertyName: identifierType,
values,
operator: 'IN',
},
],
},
],
properties: [identifierType],
limit: SEARCH_LIMIT_VALUE,
after: 0,
};
const valuesChunk = lodash.chunk(values, MAX_CONTACTS_PER_REQUEST);

const requestOptions = {
headers: {
'Content-Type': JSON_MIME_TYPE,
Authorization: `Bearer ${Config.accessToken}`,
},
};
let checkAfter = 1; // variable to keep checking if we have more results

/* eslint-disable no-await-in-loop */

/* *
* This is needed for processing paginated response when searching hubspot.
* we can't avoid await in loop as response to the request contains the pagination details
* */

while (checkAfter) {
const endpoint = IDENTIFY_CRM_SEARCH_ALL_OBJECTS.replace(':objectType', objectType);
const endpointPath = `objects/:objectType/search`;

const url =
Config.authorizationType === 'newPrivateAppApi'
? endpoint
: `${endpoint}?hapikey=${Config.apiKey}`;
searchResponse =
Config.authorizationType === 'newPrivateAppApi'
? await httpPOST(url, requestData, requestOptions, {
// eslint-disable-next-line no-restricted-syntax
for(const chunk of valuesChunk){
const requestData = {
filterGroups: [
{
filters: [
{
propertyName: identifierType,
values: chunk,
operator: 'IN',
},
],
},
],
properties: [identifierType],
limit: SEARCH_LIMIT_VALUE,
after: 0,
};

const requestOptions = {
headers: {
'Content-Type': JSON_MIME_TYPE,
Authorization: `Bearer ${Config.accessToken}`,
},
};
let checkAfter = 1; // variable to keep checking if we have more results

/* eslint-disable no-await-in-loop */

/* *
* This is needed for processing paginated response when searching hubspot.
* we can't avoid await in loop as response to the request contains the pagination details
* */

while (checkAfter) {
const endpoint = IDENTIFY_CRM_SEARCH_ALL_OBJECTS.replace(':objectType', objectType);
const endpointPath = `objects/:objectType/search`;

const url =
Config.authorizationType === 'newPrivateAppApi'
? endpoint
: `${endpoint}?hapikey=${Config.apiKey}`;
searchResponse =
Config.authorizationType === 'newPrivateAppApi'
? await httpPOST(url, requestData, requestOptions, {
destType: 'hs',
feature: 'transformation',
endpointPath,
})
: await httpPOST(url, requestData, {
: await httpPOST(url, requestData, {
destType: 'hs',
feature: 'transformation',
endpointPath,
});
searchResponse = processAxiosResponse(searchResponse);
searchResponse = processAxiosResponse(searchResponse);

if (searchResponse.status !== 200) {
throw new NetworkError(
`rETL - Error during searching object record. ${searchResponse.response?.message}`,
searchResponse.status,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(searchResponse.status),
},
searchResponse,
);
}
if (searchResponse.status !== 200) {
throw new NetworkError(
`rETL - Error during searching object record. ${searchResponse.response?.message}`,
searchResponse.status,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(searchResponse.status),
},
searchResponse,
);
}

const after = searchResponse.response?.paging?.next?.after || 0;
const after = searchResponse.response?.paging?.next?.after || 0;

requestData.after = after; // assigning to the new value of after
checkAfter = after; // assigning to the new value if no after we assign it to 0 and no more calls will take place
requestData.after = after; // assigning to the new value of after
checkAfter = after; // assigning to the new value if no after we assign it to 0 and no more calls will take place

const results = searchResponse.response?.results;
if (results) {
updateHubspotIds = results.map((result) => {
const propertyValue = result.properties[identifierType];
return { id: result.id, property: propertyValue };
});
const results = searchResponse.response?.results;
if (results) {
updateHubspotIds = results.map((result) => {
const propertyValue = result.properties[identifierType];
return { id: result.id, property: propertyValue };
});
}
}
}

return updateHubspotIds;
};

Expand Down

0 comments on commit 1adef48

Please sign in to comment.