diff --git a/src/v0/destinations/redis/transform.js b/src/v0/destinations/redis/transform.js index 23c73f0ba4..08009074d7 100644 --- a/src/v0/destinations/redis/transform.js +++ b/src/v0/destinations/redis/transform.js @@ -4,6 +4,7 @@ const flatten = require('flat'); const { InstrumentationError } = require('@rudderstack/integrations-lib'); const { isEmpty, isObject } = require('../../util'); const { EventType } = require('../../../constants'); +const { handleRecordEventsForRedis } = require('./transformV2'); // processValues: // 1. removes keys with empty values or still an object(empty) after flattening @@ -35,7 +36,10 @@ const transformSubEventTypeProfiles = (message, workspaceId, destinationId) => { // form the hash const hash = `${workspaceId}:${destinationId}:${message.context.sources.profiles_entity}:${message.context.sources.profiles_id_type}:${message.userId}`; const key = `${message.context.sources.profiles_model}`; - const value = JSON.stringify(message.traits); + let value = JSON.stringify(message.traits); + if (message.type === EventType.RECORD) { + value = JSON.stringify(message.fields); + } return { message: { hash, @@ -48,6 +52,11 @@ const transformSubEventTypeProfiles = (message, workspaceId, destinationId) => { const process = (event) => { const { message, destination, metadata } = event; + // seperate record events + if (message.type === EventType.RECORD) { + return handleRecordEventsForRedis(message, destination, metadata); + } + const messageType = message && message.type && message.type.toLowerCase(); if (messageType !== EventType.IDENTIFY) { @@ -102,4 +111,7 @@ const process = (event) => { return result; }; -exports.process = process; +module.exports = { + process, + transformSubEventTypeProfiles, +}; diff --git a/src/v0/destinations/redis/transformV2.js b/src/v0/destinations/redis/transformV2.js new file mode 100644 index 0000000000..0e073e18bc --- /dev/null +++ b/src/v0/destinations/redis/transformV2.js @@ -0,0 +1,45 @@ +const { transformSubEventTypeProfiles } = require('./transform'); + +// { +// "type": "record", +// "action": "delete", /* insert, delete */ +// "channel": "sources", +// "context": { +// "sources": { +// "job_id": "2QZYNmlpjxJIoxywM1m2obqFyi7", +// "version": "v1.32.0", +// "job_run_id": "ck0c6g0u5hotr621tqu0", +// "task_run_id": "ck0c6g0u5hotr621tqug" +// "profiles_model": "some-model", +// "profiles_entity": "some-entity" +// "profiles_id_type": "some-id-type" +// }, +// "externalId": [ +// { +// "type": "FB_CUSTOM_AUDIENCE-21304823048", +// "identifierType": "EMAIL" +// } +// ], +// }, +// "recordId": "a111", +// "messageId": "260f9a8d-91a7-46b0-9199-6da961dd6109", +// "fields": { +// MODEL_ID: '1691755780', +// VALID_AT: '2023-08-11T11:32:44.963062Z', +// USER_MAIN_ID: 'rid5530313526204a95efe71d98cd17d5a1', +// CHURN_SCORE_7_DAYS: 0.027986, +// PERCENTILE_CHURN_SCORE_7_DAYS: 0, +// }, +// } + +function handleRecordEventsForRedis(message, destination, metadata) { + // fields -> traits + // metadata -> metadata + // context.sources.profiles_<$$$> -> context.sources.profiles_<$$$> + const { workspaceId } = metadata; + const destinationId = destination.ID; + + return transformSubEventTypeProfiles(message, workspaceId, destinationId); +} + +module.exports = { handleRecordEventsForRedis };