Skip to content

Commit

Permalink
feat: introduce v1/source/sourceType endpoint (#2722)
Browse files Browse the repository at this point in the history
* source routing handle

* migrate webhook to v1

* using folder structure to fetch varsionMap dynamically

* small fix

* update feature.json for v1 support boolean

* temp commit

* chore: comments addressed

* chore: add test cases

* chore: use map to store source version mapping

* chore: resolve conflicts

* chore: fix test cases and adressed comments

* chore: fix test cases and adressed comments

* chore: removed unnecessary types for source

* chore: add test case for version v1 for webhook source means no alters in payload
  • Loading branch information
anantjain45823 authored Nov 10, 2023
1 parent 8eb1e1d commit 0996e81
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 8 deletions.
9 changes: 7 additions & 2 deletions src/controllers/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ export default class SourceController {
const { version, source }: { version: string; source: string } = ctx.params;
const integrationService = ServiceSelector.getNativeSourceService();
try {
const resplist = await integrationService.sourceTransformRoutine(
events,
const { implementationVersion, input } = ControllerUtility.adaptInputToVersion(
source,
version,
events,
);
const resplist = await integrationService.sourceTransformRoutine(
input,
source,
implementationVersion,
requestMetadata,
);
ctx.body = resplist;
Expand Down
149 changes: 149 additions & 0 deletions src/controllers/util/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import ControllerUtility from './index';

describe('adaptInputToVersion', () => {
it('should return the input unchanged when the implementation version is not found', () => {
const sourceType = 'NA_SOURCE';
const requestVersion = 'v0';
const input = [
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
];
const expected = {
implementationVersion: undefined,
input: [
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
],
};

const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input);

expect(result).toEqual(expected);
});
it('should return the input unchanged when the implementation version and request version are the same i.e. v0', () => {
const sourceType = 'pipedream';
const requestVersion = 'v0';
const input = [
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
];
const expected = {
implementationVersion: 'v0',
input: [
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
],
};

const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input);

expect(result).toEqual(expected);
});
it('should return the input unchanged when the implementation version and request version are the same i.e. v1', () => {
const sourceType = 'webhook';
const requestVersion = 'v1';
const input = [
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
];
const expected = {
implementationVersion: 'v1',
input: [
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
],
};

const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input);

expect(result).toEqual(expected);
});
it('should convert input from v0 to v1 when the request version is v0 and the implementation version is v1', () => {
const sourceType = 'webhook';
const requestVersion = 'v0';
const input = [
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
];
const expected = {
implementationVersion: 'v1',
input: [
{ event: { key1: 'val1', key2: 'val2' }, source: undefined },
{ event: { key1: 'val1', key2: 'val2' }, source: undefined },
{ event: { key1: 'val1', key2: 'val2' }, source: undefined },
],
};

const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input);

expect(result).toEqual(expected);
});

it('should convert input from v1 to v0 format when the request version is v1 and the implementation version is v0', () => {
const sourceType = 'pipedream';
const requestVersion = 'v1';
const input = [
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
];
const expected = {
implementationVersion: 'v0',
input: [
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
],
};

const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input);

expect(result).toEqual(expected);
});

// Should return an empty array when the input is an empty array
it('should return an empty array when the input is an empty array', () => {
const sourceType = 'pipedream';
const requestVersion = 'v1';
const input = [];
const expected = { implementationVersion: 'v0', input: [] };

const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input);

expect(result).toEqual(expected);
});
});
48 changes: 48 additions & 0 deletions src/controllers/util/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import fs = require('fs');
import path = require('path');
import { Context } from 'koa';
import isEmpty from 'lodash/isEmpty';
import get from 'get-value';
Expand All @@ -7,12 +9,15 @@ import {
ProcessorTransformationRequest,
RouterTransformationRequestData,
RudderMessage,
SourceInput,
} from '../../types';
import { getValueFromMessage } from '../../v0/util';
import genericFieldMap from '../../v0/util/data/GenericFieldMapping.json';
import { EventType, MappedToDestinationKey } from '../../constants';

export default class ControllerUtility {
private static sourceVersionMap: Map<string, string> = new Map();

public static timestampValsMap: Record<string, string[]> = {
[EventType.IDENTIFY]: [
`context.${RETL_TIMESTAMP}`,
Expand All @@ -23,6 +28,49 @@ export default class ControllerUtility {
[EventType.TRACK]: [`properties.${RETL_TIMESTAMP}`, ...genericFieldMap.timestamp],
};

private static getSourceVersionsMap(): Map<string, any> {
if (this.sourceVersionMap?.size > 0) {
return this.sourceVersionMap;
}
const versions = ['v0', 'v1'];
versions.forEach((version) => {
const files = fs.readdirSync(path.resolve(__dirname, `../../${version}/sources`), {
withFileTypes: true,
});
const sources = files.filter((file) => file.isDirectory()).map((folder) => folder.name);
sources.forEach((source) => {
this.sourceVersionMap.set(source, version);
});
});
return this.sourceVersionMap;
}

private static convertSourceInputv1Tov0(sourceEvents: SourceInput[]): NonNullable<unknown>[] {
return sourceEvents.map((sourceEvent) => sourceEvent.event);
}

private static convertSourceInputv0Tov1(sourceEvents: unknown[]): SourceInput[] {
return sourceEvents.map(
(sourceEvent) => ({ event: sourceEvent, source: undefined } as SourceInput),
);
}

public static adaptInputToVersion(
sourceType: string,
requestVersion: string,
input: NonNullable<unknown>[],
): { implementationVersion: string; input: NonNullable<unknown>[] } {
const sourceToVersionMap = this.getSourceVersionsMap();
const implementationVersion = sourceToVersionMap.get(sourceType);
let updatedInput: NonNullable<unknown>[] = input;
if (requestVersion === 'v0' && implementationVersion === 'v1') {
updatedInput = this.convertSourceInputv0Tov1(input);
} else if (requestVersion === 'v1' && implementationVersion === 'v0') {
updatedInput = this.convertSourceInputv1Tov0(input as SourceInput[]);
}
return { implementationVersion, input: updatedInput };
}

private static getCompatibleStatusCode(status: number): number {
return getCompatibleStatusCode(status);
}
Expand Down
3 changes: 2 additions & 1 deletion src/features.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@
"ORTTO": true,
"ONE_SIGNAL": true,
"TIKTOK_AUDIENCE": true
}
},
"supportSourceTransformV1": true
}
29 changes: 29 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,34 @@ type ComparatorInput = {
requestMetadata: object;
feature: string;
};
type SourceDefinition = {
ID: string;
Name: string;
Category: string;
Type: string;
};

type Source = {
ID: string;
OriginalID: string;
Name: string;
SourceDefinition: SourceDefinition;
Config: object;
Enabled: boolean;
WorkspaceID: string;
WriteKey: string;
Transformations?: UserTransformationInput[];
RevisionID?: string;
Destinations?: Destination[];
Transient: boolean;
EventSchemasEnabled: boolean;
DgSourceTrackingPlanConfig: object;
};

type SourceInput = {
event: NonNullable<unknown>[];
source?: Source;
};
export {
Metadata,
MessageIdMetadataMap,
Expand All @@ -246,4 +273,6 @@ export {
UserDeletionResponse,
Destination,
ComparatorInput,
SourceInput,
Source,
};
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { removeUndefinedAndNullValues, generateUUID } = require('../../util');
const { removeUndefinedAndNullValues, generateUUID } = require('../../../v0/util');

function processEvent(event) {
const payload = {
Expand All @@ -10,7 +10,8 @@ function processEvent(event) {
return payload;
}

function process(event) {
function process(inputEvent) {
const { event } = inputEvent;
const response = processEvent(event);
return removeUndefinedAndNullValues(response);
}
Expand Down
27 changes: 27 additions & 0 deletions test/apitests/data_scenarios/source/v1/failure.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"input": [
{
"event": {
"anonymousId": "63767499ca6fb1b7c988d5bb",
"artist": "Gautam",
"genre": "Jazz",
"song": "Take Five"
},
"source": {
"id": "source_id",
"config": {
"configField1": "configVal1"
}
}
}
],
"output": [
{
"statusCode": 500,
"error": "Cannot find module '../undefined/sources/NA_SOURCE/transform' from 'src/services/misc.ts'",
"statTags": {
"errorCategory": "transformation"
}
}
]
}
49 changes: 49 additions & 0 deletions test/apitests/data_scenarios/source/v1/pipedream.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"input": [
{
"event": {
"anonymousId": "63767499ca6fb1b7c988d5bb",
"artist": "Gautam",
"genre": "Jazz",
"song": "Take Five"
},
"source": {
"id": "source_id",
"config": {
"configField1": "configVal1"
}
}
}
],
"output": [
{
"output": {
"batch": [
{
"event": "pipedream_source_event",
"anonymousId": "63767499ca6fb1b7c988d5bb",
"context": {
"integration": {
"name": "PIPEDREAM"
},
"library": {
"name": "unknown",
"version": "unknown"
}
},
"integrations": {
"PIPEDREAM": false
},
"type": "track",
"properties": {
"anonymousId": "63767499ca6fb1b7c988d5bb",
"artist": "Gautam",
"genre": "Jazz",
"song": "Take Five"
}
}
]
}
}
]
}
Loading

0 comments on commit 0996e81

Please sign in to comment.