diff --git a/src/controllers/source.ts b/src/controllers/source.ts index efb6dc746e..b1e7e8fe12 100644 --- a/src/controllers/source.ts +++ b/src/controllers/source.ts @@ -16,10 +16,15 @@ export default class SourceController { const { version, source }: { version: string; source: string } = ctx.params; const integrationService = ServiceSelector.getNativeSourceService(); try { - const resplist = await integrationService.sourceTransformRoutine( - events, + const { implementationVersion, input } = ControllerUtility.adaptInputToVersion( source, version, + events, + ); + const resplist = await integrationService.sourceTransformRoutine( + input, + source, + implementationVersion, requestMetadata, ); ctx.body = resplist; diff --git a/src/controllers/util/index.test.ts b/src/controllers/util/index.test.ts new file mode 100644 index 0000000000..0bc5f174b0 --- /dev/null +++ b/src/controllers/util/index.test.ts @@ -0,0 +1,149 @@ +import ControllerUtility from './index'; + +describe('adaptInputToVersion', () => { + it('should return the input unchanged when the implementation version is not found', () => { + const sourceType = 'NA_SOURCE'; + const requestVersion = 'v0'; + const input = [ + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + ]; + const expected = { + implementationVersion: undefined, + input: [ + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + it('should return the input unchanged when the implementation version and request version are the same i.e. v0', () => { + const sourceType = 'pipedream'; + const requestVersion = 'v0'; + const input = [ + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + ]; + const expected = { + implementationVersion: 'v0', + input: [ + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + it('should return the input unchanged when the implementation version and request version are the same i.e. v1', () => { + const sourceType = 'webhook'; + const requestVersion = 'v1'; + const input = [ + { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + const expected = { + implementationVersion: 'v1', + input: [ + { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + it('should convert input from v0 to v1 when the request version is v0 and the implementation version is v1', () => { + const sourceType = 'webhook'; + const requestVersion = 'v0'; + const input = [ + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + ]; + const expected = { + implementationVersion: 'v1', + input: [ + { event: { key1: 'val1', key2: 'val2' }, source: undefined }, + { event: { key1: 'val1', key2: 'val2' }, source: undefined }, + { event: { key1: 'val1', key2: 'val2' }, source: undefined }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + + it('should convert input from v1 to v0 format when the request version is v1 and the implementation version is v0', () => { + const sourceType = 'pipedream'; + const requestVersion = 'v1'; + const input = [ + { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + const expected = { + implementationVersion: 'v0', + input: [ + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + { key1: 'val1', key2: 'val2' }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + + // Should return an empty array when the input is an empty array + it('should return an empty array when the input is an empty array', () => { + const sourceType = 'pipedream'; + const requestVersion = 'v1'; + const input = []; + const expected = { implementationVersion: 'v0', input: [] }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); +}); diff --git a/src/controllers/util/index.ts b/src/controllers/util/index.ts index e2071968d7..c52eb6899e 100644 --- a/src/controllers/util/index.ts +++ b/src/controllers/util/index.ts @@ -1,3 +1,5 @@ +import fs = require('fs'); +import path = require('path'); import { Context } from 'koa'; import isEmpty from 'lodash/isEmpty'; import get from 'get-value'; @@ -7,12 +9,15 @@ import { ProcessorTransformationRequest, RouterTransformationRequestData, RudderMessage, + SourceInput, } from '../../types'; import { getValueFromMessage } from '../../v0/util'; import genericFieldMap from '../../v0/util/data/GenericFieldMapping.json'; import { EventType, MappedToDestinationKey } from '../../constants'; export default class ControllerUtility { + private static sourceVersionMap: Map = new Map(); + public static timestampValsMap: Record = { [EventType.IDENTIFY]: [ `context.${RETL_TIMESTAMP}`, @@ -23,6 +28,49 @@ export default class ControllerUtility { [EventType.TRACK]: [`properties.${RETL_TIMESTAMP}`, ...genericFieldMap.timestamp], }; + private static getSourceVersionsMap(): Map { + if (this.sourceVersionMap?.size > 0) { + return this.sourceVersionMap; + } + const versions = ['v0', 'v1']; + versions.forEach((version) => { + const files = fs.readdirSync(path.resolve(__dirname, `../../${version}/sources`), { + withFileTypes: true, + }); + const sources = files.filter((file) => file.isDirectory()).map((folder) => folder.name); + sources.forEach((source) => { + this.sourceVersionMap.set(source, version); + }); + }); + return this.sourceVersionMap; + } + + private static convertSourceInputv1Tov0(sourceEvents: SourceInput[]): NonNullable[] { + return sourceEvents.map((sourceEvent) => sourceEvent.event); + } + + private static convertSourceInputv0Tov1(sourceEvents: unknown[]): SourceInput[] { + return sourceEvents.map( + (sourceEvent) => ({ event: sourceEvent, source: undefined } as SourceInput), + ); + } + + public static adaptInputToVersion( + sourceType: string, + requestVersion: string, + input: NonNullable[], + ): { implementationVersion: string; input: NonNullable[] } { + const sourceToVersionMap = this.getSourceVersionsMap(); + const implementationVersion = sourceToVersionMap.get(sourceType); + let updatedInput: NonNullable[] = input; + if (requestVersion === 'v0' && implementationVersion === 'v1') { + updatedInput = this.convertSourceInputv0Tov1(input); + } else if (requestVersion === 'v1' && implementationVersion === 'v0') { + updatedInput = this.convertSourceInputv1Tov0(input as SourceInput[]); + } + return { implementationVersion, input: updatedInput }; + } + private static getCompatibleStatusCode(status: number): number { return getCompatibleStatusCode(status); } diff --git a/src/features.json b/src/features.json index 9793f667e3..d3ea3e98a6 100644 --- a/src/features.json +++ b/src/features.json @@ -61,5 +61,6 @@ "ORTTO": true, "ONE_SIGNAL": true, "TIKTOK_AUDIENCE": true - } + }, + "supportSourceTransformV1": true } diff --git a/src/types/index.ts b/src/types/index.ts index 79efaecb40..9292fe2cc2 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -224,7 +224,34 @@ type ComparatorInput = { requestMetadata: object; feature: string; }; +type SourceDefinition = { + ID: string; + Name: string; + Category: string; + Type: string; +}; + +type Source = { + ID: string; + OriginalID: string; + Name: string; + SourceDefinition: SourceDefinition; + Config: object; + Enabled: boolean; + WorkspaceID: string; + WriteKey: string; + Transformations?: UserTransformationInput[]; + RevisionID?: string; + Destinations?: Destination[]; + Transient: boolean; + EventSchemasEnabled: boolean; + DgSourceTrackingPlanConfig: object; +}; +type SourceInput = { + event: NonNullable[]; + source?: Source; +}; export { Metadata, MessageIdMetadataMap, @@ -246,4 +273,6 @@ export { UserDeletionResponse, Destination, ComparatorInput, + SourceInput, + Source, }; diff --git a/src/v0/sources/webhook/transform.js b/src/v1/sources/webhook/transform.js similarity index 81% rename from src/v0/sources/webhook/transform.js rename to src/v1/sources/webhook/transform.js index 3a756ef63b..fc13424b8a 100644 --- a/src/v0/sources/webhook/transform.js +++ b/src/v1/sources/webhook/transform.js @@ -1,4 +1,4 @@ -const { removeUndefinedAndNullValues, generateUUID } = require('../../util'); +const { removeUndefinedAndNullValues, generateUUID } = require('../../../v0/util'); function processEvent(event) { const payload = { @@ -10,7 +10,8 @@ function processEvent(event) { return payload; } -function process(event) { +function process(inputEvent) { + const { event } = inputEvent; const response = processEvent(event); return removeUndefinedAndNullValues(response); } diff --git a/test/apitests/data_scenarios/source/failure.json b/test/apitests/data_scenarios/source/v0/failure.json similarity index 100% rename from test/apitests/data_scenarios/source/failure.json rename to test/apitests/data_scenarios/source/v0/failure.json diff --git a/test/apitests/data_scenarios/source/response_to_caller.json b/test/apitests/data_scenarios/source/v0/response_to_caller.json similarity index 100% rename from test/apitests/data_scenarios/source/response_to_caller.json rename to test/apitests/data_scenarios/source/v0/response_to_caller.json diff --git a/test/apitests/data_scenarios/source/successful.json b/test/apitests/data_scenarios/source/v0/successful.json similarity index 100% rename from test/apitests/data_scenarios/source/successful.json rename to test/apitests/data_scenarios/source/v0/successful.json diff --git a/test/apitests/data_scenarios/source/v1/failure.json b/test/apitests/data_scenarios/source/v1/failure.json new file mode 100644 index 0000000000..9bf77f9b53 --- /dev/null +++ b/test/apitests/data_scenarios/source/v1/failure.json @@ -0,0 +1,27 @@ +{ + "input": [ + { + "event": { + "anonymousId": "63767499ca6fb1b7c988d5bb", + "artist": "Gautam", + "genre": "Jazz", + "song": "Take Five" + }, + "source": { + "id": "source_id", + "config": { + "configField1": "configVal1" + } + } + } + ], + "output": [ + { + "statusCode": 500, + "error": "Cannot find module '../undefined/sources/NA_SOURCE/transform' from 'src/services/misc.ts'", + "statTags": { + "errorCategory": "transformation" + } + } + ] +} diff --git a/test/apitests/data_scenarios/source/v1/pipedream.json b/test/apitests/data_scenarios/source/v1/pipedream.json new file mode 100644 index 0000000000..4219f3f6b1 --- /dev/null +++ b/test/apitests/data_scenarios/source/v1/pipedream.json @@ -0,0 +1,49 @@ +{ + "input": [ + { + "event": { + "anonymousId": "63767499ca6fb1b7c988d5bb", + "artist": "Gautam", + "genre": "Jazz", + "song": "Take Five" + }, + "source": { + "id": "source_id", + "config": { + "configField1": "configVal1" + } + } + } + ], + "output": [ + { + "output": { + "batch": [ + { + "event": "pipedream_source_event", + "anonymousId": "63767499ca6fb1b7c988d5bb", + "context": { + "integration": { + "name": "PIPEDREAM" + }, + "library": { + "name": "unknown", + "version": "unknown" + } + }, + "integrations": { + "PIPEDREAM": false + }, + "type": "track", + "properties": { + "anonymousId": "63767499ca6fb1b7c988d5bb", + "artist": "Gautam", + "genre": "Jazz", + "song": "Take Five" + } + } + ] + } + } + ] +} \ No newline at end of file diff --git a/test/apitests/data_scenarios/source/v1/successful.json b/test/apitests/data_scenarios/source/v1/successful.json new file mode 100644 index 0000000000..c42d723800 --- /dev/null +++ b/test/apitests/data_scenarios/source/v1/successful.json @@ -0,0 +1,38 @@ +{ + "input": [ + { + "event": { + "event": "Fulfillments Update", + "data": { + "fulfillment_id": "1234567890", + "status": "pending" + } + }, + "source": { + "id": "source_id", + "config": { + "configField1": "configVal1" + } + } + } + ], + "output": [ + { + "output": { + "batch": [ + { + "type": "track", + "event": "webhook_source_event", + "properties": { + "event": "Fulfillments Update", + "data": { + "fulfillment_id": "1234567890", + "status": "pending" + } + } + } + ] + } + } + ] +} diff --git a/test/apitests/service.api.test.ts b/test/apitests/service.api.test.ts index dfe7e10dd6..ee534d7b37 100644 --- a/test/apitests/service.api.test.ts +++ b/test/apitests/service.api.test.ts @@ -177,7 +177,7 @@ describe('Destination api tests', () => { describe('Source api tests', () => { test('(shopify) successful source transform', async () => { - const data = getDataFromPath('./data_scenarios/source/successful.json'); + const data = getDataFromPath('./data_scenarios/source/v0/successful.json'); const response = await request(server) .post('/v0/sources/shopify') .set('Accept', 'application/json') @@ -189,7 +189,7 @@ describe('Source api tests', () => { }); test('(shopify) failure source transform (shopify)', async () => { - const data = getDataFromPath('./data_scenarios/source/failure.json'); + const data = getDataFromPath('./data_scenarios/source/v0/failure.json'); const response = await request(server) .post('/v0/sources/shopify') .set('Accept', 'application/json') @@ -199,7 +199,7 @@ describe('Source api tests', () => { }); test('(shopify) success source transform (monday)', async () => { - const data = getDataFromPath('./data_scenarios/source/response_to_caller.json'); + const data = getDataFromPath('./data_scenarios/source/v0/response_to_caller.json'); const response = await request(server) .post('/v0/sources/monday') .set('Accept', 'application/json') @@ -207,6 +207,38 @@ describe('Source api tests', () => { expect(response.status).toEqual(200); expect(JSON.parse(response.text)).toEqual(data.output); }); + + test('(webhook) successful source transform for source present in v1 and server providing v0 endpoint', async () => { + const data = getDataFromPath('./data_scenarios/source/v1/successful.json'); + const response = await request(server) + .post('/v1/sources/webhook') + .set('Accept', 'application/json') + .send(data.input); + const parsedResp = JSON.parse(response.text); + delete parsedResp[0].output.batch[0].anonymousId; + expect(response.status).toEqual(200); + expect(parsedResp).toEqual(data.output); + }); + + test('(NA_SOURCE) failure source transform ', async () => { + const data = getDataFromPath('./data_scenarios/source/v1/failure.json'); + const response = await request(server) + .post('/v0/sources/NA_SOURCE') + .set('Accept', 'application/json') + .send(data.input); + expect(response.status).toEqual(200); + expect(JSON.parse(response.text)).toEqual(data.output); + }); + + test('(pipedream) success source transform for source present in v0 and server providing v1 endpoint', async () => { + const data = getDataFromPath('./data_scenarios/source/v1/pipedream.json'); + const response = await request(server) + .post('/v1/sources/pipedream') + .set('Accept', 'application/json') + .send(data.input); + expect(response.status).toEqual(200); + expect(JSON.parse(response.text)).toEqual(data.output); + }); }); describe('CDK V1 api tests', () => {