From 57eafca84f7734b1c936da59f63fbb8cb7ac8fd8 Mon Sep 17 00:00:00 2001 From: Sai Sankeerth Date: Wed, 19 Jun 2024 11:16:14 +0530 Subject: [PATCH] chore: metadata propagation in active_campaign Signed-off-by: Sai Sankeerth --- .../destinations/active_campaign/transform.js | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/v0/destinations/active_campaign/transform.js b/src/v0/destinations/active_campaign/transform.js index 3978f868b1..f21bb1a70d 100644 --- a/src/v0/destinations/active_campaign/transform.js +++ b/src/v0/destinations/active_campaign/transform.js @@ -49,7 +49,7 @@ const responseBuilderSimple = (payload, category, destination) => { throw new TransformationError('Payload could not be constructed'); }; -const syncContact = async (contactPayload, category, destination) => { +const syncContact = async (contactPayload, category, { destination, metadata }) => { const { endPoint } = category; const endpoint = `${destination.Config.apiUrl}${endPoint}`; const requestData = { @@ -60,6 +60,7 @@ const syncContact = async (contactPayload, category, destination) => { }; const res = await httpPOST(endpoint, requestData, requestOptions, { destType: 'active_campaign', + metadata, feature: 'transformation', endpointPath: endPoint, requestMethod: 'POST', @@ -75,7 +76,7 @@ const syncContact = async (contactPayload, category, destination) => { return createdContact.id; }; -const customTagProcessor = async (message, category, destination, contactId) => { +const customTagProcessor = async ({ message, destination, metadata }, category, contactId) => { const tagsToBeCreated = []; const tagIds = []; let res; @@ -102,6 +103,7 @@ const customTagProcessor = async (message, category, destination, contactId) => destType: 'active_campaign', feature: 'transformation', tagEndPoint, + metadata, }); if (res.success === false) { errorHandler(res, 'Failed to fetch already created tags'); @@ -133,6 +135,7 @@ const customTagProcessor = async (message, category, destination, contactId) => endpointPath: `/api/3/tags`, requestMethod: 'GET', module: 'router', + metadata, }); promises.push(resp); } @@ -174,6 +177,7 @@ const customTagProcessor = async (message, category, destination, contactId) => res = await httpPOST(endpoint, requestData, requestOptions, { destType: 'active_campaign', feature: 'transformation', + metadata, }); if (res.success === false) { errorHandler(res, 'Failed to create new tag'); @@ -202,6 +206,7 @@ const customTagProcessor = async (message, category, destination, contactId) => res = httpPOST(endpoint, requestData, requestOptions, { destType: 'active_campaign', feature: 'transformation', + metadata, }); return res; }), @@ -212,7 +217,7 @@ const customTagProcessor = async (message, category, destination, contactId) => }); }; -const customFieldProcessor = async (message, category, destination) => { +const customFieldProcessor = async ({ message, destination, metadata }, category) => { const responseStaging = []; const { fieldEndPoint } = category; // Step - 1 @@ -235,6 +240,7 @@ const customFieldProcessor = async (message, category, destination) => { }; const res = await httpGET(endpoint, requestOptions, { destType: 'active_campaign', + metadata, feature: 'transformation', fieldEndPoint, }); @@ -258,6 +264,7 @@ const customFieldProcessor = async (message, category, destination) => { feature: 'transformation', endpointPath: `/api/3/fields`, requestMethod: 'GET', + metadata, module: 'router', }); promises.push(resp); @@ -317,7 +324,7 @@ const customFieldProcessor = async (message, category, destination) => { return fieldsArrValues; }; -const customListProcessor = async (message, category, destination, contactId) => { +const customListProcessor = async ({ message, destination, metadata }, category, contactId) => { const { mergeListWithContactUrl } = category; // Here we extract the list info from the message const listInfo = get(message?.context?.traits, 'lists') @@ -359,6 +366,7 @@ const customListProcessor = async (message, category, destination, contactId) => endpointPath: mergeListWithContactUrl, requestMethod: 'POST', module: 'router', + metadata, }); promises.push(res); } @@ -374,18 +382,18 @@ const customListProcessor = async (message, category, destination, contactId) => // This the handler func for identify type of events here before we transform the event // and return to rudder server we process the message by calling specific destination apis // for handling tag information and custom field information. -const identifyRequestHandler = async (message, category, destination) => { +const identifyRequestHandler = async ({ message, destination, metadata }, category) => { // create skeleton contact payload let contactPayload = constructPayload(message, MAPPING_CONFIG[category.name]); contactPayload = removeUndefinedAndNullValues(contactPayload); // sync to Active Campaign - const contactId = await syncContact(contactPayload, category, destination); + const contactId = await syncContact(contactPayload, category, { destination, metadata }); // create, and merge tags - await customTagProcessor(message, category, destination, contactId); + await customTagProcessor({ message, destination, metadata }, category, contactId); // add the contact to lists if applicabale - await customListProcessor(message, category, destination, contactId); + await customListProcessor({ message, destination, metadata }, category, contactId); // extract fieldValues to merge with contact - const fieldValues = await customFieldProcessor(message, category, destination); + const fieldValues = await customFieldProcessor({ message, destination, metadata }, category); contactPayload.fieldValues = fieldValues; contactPayload = removeUndefinedAndNullValues(contactPayload); const payload = { @@ -404,7 +412,7 @@ const pageRequestHandler = (message, category, destination) => { return responseBuilderSimple(payload, category, destination); }; -const screenRequestHandler = async (message, category, destination) => { +const screenRequestHandler = async ({ message, destination, metadata }, category) => { // Need to check if the event with same name already exists if not need to create // Retrieve All events from destination // https://developers.activecampaign.com/reference/list-all-event-types @@ -417,6 +425,7 @@ const screenRequestHandler = async (message, category, destination) => { destType: 'active_campaign', feature: 'transformation', endpointPath: `/api/3/eventTrackingEvents`, + metadata, requestMethod: 'GET', module: 'router', }); @@ -445,6 +454,7 @@ const screenRequestHandler = async (message, category, destination) => { }; res = await httpPOST(endpoint, requestData, requestOpt, { destType: 'active_campaign', + metadata, feature: 'transformation', }); if (res.success === false) { @@ -469,7 +479,7 @@ const screenRequestHandler = async (message, category, destination) => { return responseBuilderSimple(payload, category, destination); }; -const trackRequestHandler = async (message, category, destination) => { +const trackRequestHandler = async ({ message, destination, metadata }, category) => { // Need to check if the event with same name already exists if not need to create // Retrieve All events from destination // https://developers.activecampaign.com/reference/list-all-event-types @@ -480,6 +490,7 @@ const trackRequestHandler = async (message, category, destination) => { }, }; let res = await httpGET(endpoint, requestOptions, { + metadata, destType: 'active_campaign', feature: 'transformation', endpointPath: `/api/3/eventTrackingEvents`, @@ -511,6 +522,7 @@ const trackRequestHandler = async (message, category, destination) => { headers: getHeader(destination), }; res = await httpPOST(endpoint, requestData, requestOpt, { + metadata, destType: 'active_campaign', feature: 'transformation', }); @@ -537,7 +549,8 @@ const trackRequestHandler = async (message, category, destination) => { // The main entry point where the message is processed based on what type of event // each scenario is resolved by using specific handler function which does // subsquent processing and transformations and the response is sent to rudder-server -const processEvent = async (message, destination) => { +const processEvent = async (event) => { + const { message, destination } = event; if (!message.type) { throw new InstrumentationError('Message Type is not present. Aborting message.'); } @@ -547,7 +560,7 @@ const processEvent = async (message, destination) => { switch (messageType) { case EventType.IDENTIFY: category = CONFIG_CATEGORIES.IDENTIFY; - response = await identifyRequestHandler(message, category, destination); + response = await identifyRequestHandler(event, category); break; case EventType.PAGE: category = CONFIG_CATEGORIES.PAGE; @@ -555,11 +568,11 @@ const processEvent = async (message, destination) => { break; case EventType.SCREEN: category = CONFIG_CATEGORIES.SCREEN; - response = await screenRequestHandler(message, category, destination); + response = await screenRequestHandler(event, category); break; case EventType.TRACK: category = CONFIG_CATEGORIES.TRACK; - response = await trackRequestHandler(message, category, destination); + response = await trackRequestHandler(event, category); break; default: throw new InstrumentationError('Message type not supported'); @@ -568,7 +581,7 @@ const processEvent = async (message, destination) => { }; const process = async (event) => { - const result = await processEvent(event.message, event.destination); + const result = await processEvent(event); return result; };