Skip to content

Commit

Permalink
chore: metadata propagation in active_campaign
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Sankeerth <[email protected]>
  • Loading branch information
Sai Sankeerth committed Jun 19, 2024
1 parent 40ed6b1 commit 57eafca
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions src/v0/destinations/active_campaign/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const responseBuilderSimple = (payload, category, destination) => {
throw new TransformationError('Payload could not be constructed');
};

const syncContact = async (contactPayload, category, destination) => {
const syncContact = async (contactPayload, category, { destination, metadata }) => {
const { endPoint } = category;
const endpoint = `${destination.Config.apiUrl}${endPoint}`;
const requestData = {
Expand All @@ -60,6 +60,7 @@ const syncContact = async (contactPayload, category, destination) => {
};
const res = await httpPOST(endpoint, requestData, requestOptions, {
destType: 'active_campaign',
metadata,
feature: 'transformation',
endpointPath: endPoint,
requestMethod: 'POST',
Expand All @@ -75,7 +76,7 @@ const syncContact = async (contactPayload, category, destination) => {
return createdContact.id;
};

const customTagProcessor = async (message, category, destination, contactId) => {
const customTagProcessor = async ({ message, destination, metadata }, category, contactId) => {
const tagsToBeCreated = [];
const tagIds = [];
let res;
Expand All @@ -102,6 +103,7 @@ const customTagProcessor = async (message, category, destination, contactId) =>
destType: 'active_campaign',
feature: 'transformation',
tagEndPoint,
metadata,
});
if (res.success === false) {
errorHandler(res, 'Failed to fetch already created tags');
Expand Down Expand Up @@ -133,6 +135,7 @@ const customTagProcessor = async (message, category, destination, contactId) =>
endpointPath: `/api/3/tags`,
requestMethod: 'GET',
module: 'router',
metadata,
});
promises.push(resp);
}
Expand Down Expand Up @@ -174,6 +177,7 @@ const customTagProcessor = async (message, category, destination, contactId) =>
res = await httpPOST(endpoint, requestData, requestOptions, {
destType: 'active_campaign',
feature: 'transformation',
metadata,
});
if (res.success === false) {
errorHandler(res, 'Failed to create new tag');
Expand Down Expand Up @@ -202,6 +206,7 @@ const customTagProcessor = async (message, category, destination, contactId) =>
res = httpPOST(endpoint, requestData, requestOptions, {
destType: 'active_campaign',
feature: 'transformation',
metadata,
});
return res;
}),
Expand All @@ -212,7 +217,7 @@ const customTagProcessor = async (message, category, destination, contactId) =>
});
};

const customFieldProcessor = async (message, category, destination) => {
const customFieldProcessor = async ({ message, destination, metadata }, category) => {
const responseStaging = [];
const { fieldEndPoint } = category;
// Step - 1
Expand All @@ -235,6 +240,7 @@ const customFieldProcessor = async (message, category, destination) => {
};
const res = await httpGET(endpoint, requestOptions, {
destType: 'active_campaign',
metadata,
feature: 'transformation',
fieldEndPoint,
});
Expand All @@ -258,6 +264,7 @@ const customFieldProcessor = async (message, category, destination) => {
feature: 'transformation',
endpointPath: `/api/3/fields`,
requestMethod: 'GET',
metadata,
module: 'router',
});
promises.push(resp);
Expand Down Expand Up @@ -317,7 +324,7 @@ const customFieldProcessor = async (message, category, destination) => {
return fieldsArrValues;
};

const customListProcessor = async (message, category, destination, contactId) => {
const customListProcessor = async ({ message, destination, metadata }, category, contactId) => {
const { mergeListWithContactUrl } = category;
// Here we extract the list info from the message
const listInfo = get(message?.context?.traits, 'lists')
Expand Down Expand Up @@ -359,6 +366,7 @@ const customListProcessor = async (message, category, destination, contactId) =>
endpointPath: mergeListWithContactUrl,
requestMethod: 'POST',
module: 'router',
metadata,
});
promises.push(res);
}
Expand All @@ -374,18 +382,18 @@ const customListProcessor = async (message, category, destination, contactId) =>
// This the handler func for identify type of events here before we transform the event
// and return to rudder server we process the message by calling specific destination apis
// for handling tag information and custom field information.
const identifyRequestHandler = async (message, category, destination) => {
const identifyRequestHandler = async ({ message, destination, metadata }, category) => {
// create skeleton contact payload
let contactPayload = constructPayload(message, MAPPING_CONFIG[category.name]);
contactPayload = removeUndefinedAndNullValues(contactPayload);
// sync to Active Campaign
const contactId = await syncContact(contactPayload, category, destination);
const contactId = await syncContact(contactPayload, category, { destination, metadata });
// create, and merge tags
await customTagProcessor(message, category, destination, contactId);
await customTagProcessor({ message, destination, metadata }, category, contactId);
// add the contact to lists if applicabale
await customListProcessor(message, category, destination, contactId);
await customListProcessor({ message, destination, metadata }, category, contactId);
// extract fieldValues to merge with contact
const fieldValues = await customFieldProcessor(message, category, destination);
const fieldValues = await customFieldProcessor({ message, destination, metadata }, category);
contactPayload.fieldValues = fieldValues;
contactPayload = removeUndefinedAndNullValues(contactPayload);
const payload = {
Expand All @@ -404,7 +412,7 @@ const pageRequestHandler = (message, category, destination) => {
return responseBuilderSimple(payload, category, destination);
};

const screenRequestHandler = async (message, category, destination) => {
const screenRequestHandler = async ({ message, destination, metadata }, category) => {
// Need to check if the event with same name already exists if not need to create
// Retrieve All events from destination
// https://developers.activecampaign.com/reference/list-all-event-types
Expand All @@ -417,6 +425,7 @@ const screenRequestHandler = async (message, category, destination) => {
destType: 'active_campaign',
feature: 'transformation',
endpointPath: `/api/3/eventTrackingEvents`,
metadata,
requestMethod: 'GET',
module: 'router',
});
Expand Down Expand Up @@ -445,6 +454,7 @@ const screenRequestHandler = async (message, category, destination) => {
};
res = await httpPOST(endpoint, requestData, requestOpt, {
destType: 'active_campaign',
metadata,
feature: 'transformation',
});
if (res.success === false) {
Expand All @@ -469,7 +479,7 @@ const screenRequestHandler = async (message, category, destination) => {
return responseBuilderSimple(payload, category, destination);
};

const trackRequestHandler = async (message, category, destination) => {
const trackRequestHandler = async ({ message, destination, metadata }, category) => {
// Need to check if the event with same name already exists if not need to create
// Retrieve All events from destination
// https://developers.activecampaign.com/reference/list-all-event-types
Expand All @@ -480,6 +490,7 @@ const trackRequestHandler = async (message, category, destination) => {
},
};
let res = await httpGET(endpoint, requestOptions, {
metadata,
destType: 'active_campaign',
feature: 'transformation',
endpointPath: `/api/3/eventTrackingEvents`,
Expand Down Expand Up @@ -511,6 +522,7 @@ const trackRequestHandler = async (message, category, destination) => {
headers: getHeader(destination),
};
res = await httpPOST(endpoint, requestData, requestOpt, {
metadata,
destType: 'active_campaign',
feature: 'transformation',
});
Expand All @@ -537,7 +549,8 @@ const trackRequestHandler = async (message, category, destination) => {
// The main entry point where the message is processed based on what type of event
// each scenario is resolved by using specific handler function which does
// subsquent processing and transformations and the response is sent to rudder-server
const processEvent = async (message, destination) => {
const processEvent = async (event) => {
const { message, destination } = event;
if (!message.type) {
throw new InstrumentationError('Message Type is not present. Aborting message.');
}
Expand All @@ -547,19 +560,19 @@ const processEvent = async (message, destination) => {
switch (messageType) {
case EventType.IDENTIFY:
category = CONFIG_CATEGORIES.IDENTIFY;
response = await identifyRequestHandler(message, category, destination);
response = await identifyRequestHandler(event, category);
break;
case EventType.PAGE:
category = CONFIG_CATEGORIES.PAGE;
response = pageRequestHandler(message, category, destination);
break;
case EventType.SCREEN:
category = CONFIG_CATEGORIES.SCREEN;
response = await screenRequestHandler(message, category, destination);
response = await screenRequestHandler(event, category);
break;
case EventType.TRACK:
category = CONFIG_CATEGORIES.TRACK;
response = await trackRequestHandler(message, category, destination);
response = await trackRequestHandler(event, category);
break;
default:
throw new InstrumentationError('Message type not supported');
Expand All @@ -568,7 +581,7 @@ const processEvent = async (message, destination) => {
};

const process = async (event) => {
const result = await processEvent(event.message, event.destination);
const result = await processEvent(event);
return result;
};

Expand Down

0 comments on commit 57eafca

Please sign in to comment.