Skip to content

Commit

Permalink
feat: sources v2 spec support along with adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
vinayteki95 committed Oct 17, 2024
1 parent d730daf commit 3126957
Show file tree
Hide file tree
Showing 11 changed files with 393 additions and 37 deletions.
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2.7
111 changes: 107 additions & 4 deletions src/controllers/__tests__/source.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { applicationRoutes } from '../../routes';
import { NativeIntegrationSourceService } from '../../services/source/nativeIntegration';
import { ServiceSelector } from '../../helpers/serviceSelector';
import { ControllerUtility } from '../util/index';
import { SourceInputConversionResult } from '../../types';

let server: any;
const OLD_ENV = process.env;
Expand Down Expand Up @@ -38,6 +39,19 @@ const getData = () => {
return [{ event: { a: 'b1' } }, { event: { a: 'b2' } }];
};

const getV2Data = () => {
return [
{ request: { body: '{"a": "b"}' }, source: { id: 1 } },
{ request: { body: '{"a": "b"}' }, source: { id: 1 } },
];
};

const getConvertedData = () => {
return getData().map((eventInstance) => {
return { output: eventInstance } as SourceInputConversionResult<typeof eventInstance>;
});
};

describe('Source controller tests', () => {
describe('V0 Source transform tests', () => {
test('successful source transform', async () => {
Expand All @@ -49,7 +63,7 @@ describe('Source controller tests', () => {
mockSourceService.sourceTransformRoutine = jest
.fn()
.mockImplementation((i, s, v, requestMetadata) => {
expect(i).toEqual(getData());
expect(i).toEqual(getConvertedData());
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
return testOutput;
Expand All @@ -66,7 +80,7 @@ describe('Source controller tests', () => {
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
expect(e).toEqual(getData());
return { implementationVersion: version, input: e };
return { implementationVersion: version, input: getConvertedData() };
});

const response = await request(server)
Expand Down Expand Up @@ -139,7 +153,7 @@ describe('Source controller tests', () => {
mockSourceService.sourceTransformRoutine = jest
.fn()
.mockImplementation((i, s, v, requestMetadata) => {
expect(i).toEqual(getData());
expect(i).toEqual(getConvertedData());
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
return testOutput;
Expand All @@ -156,7 +170,7 @@ describe('Source controller tests', () => {
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
expect(e).toEqual(getData());
return { implementationVersion: version, input: e };
return { implementationVersion: version, input: getConvertedData() };
});

const response = await request(server)
Expand Down Expand Up @@ -217,4 +231,93 @@ describe('Source controller tests', () => {
expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1);
});
});

describe('V2 Source transform tests', () => {
test('successful source transform', async () => {
const sourceType = '__rudder_test__';
const version = 'v2';
const testOutput = [{ event: { a: 'b' }, source: { id: 'id' } }];

const mockSourceService = new NativeIntegrationSourceService();
mockSourceService.sourceTransformRoutine = jest
.fn()
.mockImplementation((i, s, v, requestMetadata) => {
expect(i).toEqual(getConvertedData());
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
return testOutput;
});
const getNativeSourceServiceSpy = jest
.spyOn(ServiceSelector, 'getNativeSourceService')
.mockImplementation(() => {
return mockSourceService;
});

const adaptInputToVersionSpy = jest
.spyOn(ControllerUtility, 'adaptInputToVersion')
.mockImplementation((s, v, e) => {
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
expect(e).toEqual(getV2Data());
return { implementationVersion: version, input: getConvertedData() };
});

const response = await request(server)
.post('/v2/sources/__rudder_test__')
.set('Accept', 'application/json')
.send(getV2Data());

expect(response.status).toEqual(200);
expect(response.body).toEqual(testOutput);

expect(response.header['apiversion']).toEqual('2');

expect(getNativeSourceServiceSpy).toHaveBeenCalledTimes(1);
expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1);
expect(mockSourceService.sourceTransformRoutine).toHaveBeenCalledTimes(1);
});

test('failing source transform', async () => {
const sourceType = '__rudder_test__';
const version = 'v2';
const mockSourceService = new NativeIntegrationSourceService();
const getNativeSourceServiceSpy = jest
.spyOn(ServiceSelector, 'getNativeSourceService')
.mockImplementation(() => {
return mockSourceService;
});

const adaptInputToVersionSpy = jest
.spyOn(ControllerUtility, 'adaptInputToVersion')
.mockImplementation((s, v, e) => {
expect(s).toEqual(sourceType);
expect(v).toEqual(version);
expect(e).toEqual(getV2Data());
throw new Error('test error');
});

const response = await request(server)
.post('/v2/sources/__rudder_test__')
.set('Accept', 'application/json')
.send(getV2Data());

const expectedResp = [
{
error: 'test error',
statTags: {
errorCategory: 'transformation',
},
statusCode: 500,
},
];

expect(response.status).toEqual(200);
expect(response.body).toEqual(expectedResp);

expect(response.header['apiversion']).toEqual('2');

expect(getNativeSourceServiceSpy).toHaveBeenCalledTimes(1);
expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1);
});
});
});
163 changes: 145 additions & 18 deletions src/controllers/util/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ describe('adaptInputToVersion', () => {
const expected = {
implementationVersion: undefined,
input: [
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ output: { key1: 'val1', key2: 'val2' } },
{ output: { key1: 'val1', key2: 'val2' } },
{ output: { key1: 'val1', key2: 'val2' } },
],
};

Expand All @@ -40,9 +40,9 @@ describe('adaptInputToVersion', () => {
const expected = {
implementationVersion: 'v0',
input: [
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ output: { key1: 'val1', key2: 'val2' } },
{ output: { key1: 'val1', key2: 'val2' } },
{ output: { key1: 'val1', key2: 'val2' } },
],
};

Expand Down Expand Up @@ -71,16 +71,22 @@ describe('adaptInputToVersion', () => {
implementationVersion: 'v1',
input: [
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
output: {
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
},
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
output: {
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
},
{
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
output: {
event: { key1: 'val1', key2: 'val2' },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
},
],
};
Expand All @@ -100,9 +106,9 @@ describe('adaptInputToVersion', () => {
const expected = {
implementationVersion: 'v1',
input: [
{ event: { key1: 'val1', key2: 'val2' }, source: undefined },
{ event: { key1: 'val1', key2: 'val2' }, source: undefined },
{ event: { key1: 'val1', key2: 'val2' }, source: undefined },
{ output: { event: { key1: 'val1', key2: 'val2' }, source: undefined } },
{ output: { event: { key1: 'val1', key2: 'val2' }, source: undefined } },
{ output: { event: { key1: 'val1', key2: 'val2' }, source: undefined } },
],
};

Expand Down Expand Up @@ -131,9 +137,130 @@ describe('adaptInputToVersion', () => {
const expected = {
implementationVersion: 'v0',
input: [
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ key1: 'val1', key2: 'val2' },
{ output: { key1: 'val1', key2: 'val2' } },
{ output: { key1: 'val1', key2: 'val2' } },
{ output: { key1: 'val1', key2: 'val2' } },
],
};

const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input);

expect(result).toEqual(expected);
});

it('should convert input from v2 to v0 format when the request version is v2 and the implementation version is v0', () => {
const sourceType = 'pipedream';
const requestVersion = 'v2';

const input = [
{
request: {
method: 'POST',
url: 'http://example.com',
proto: 'HTTP/2',
headers: { headerkey: ['headervalue'] },
body: '{"key": "value"}',
query_parameters: { paramkey: ['paramvalue'] },
},
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
request: {
method: 'POST',
url: 'http://example.com',
proto: 'HTTP/2',
headers: { headerkey: ['headervalue'] },
body: '{"key": "value"}',
query_parameters: { paramkey: ['paramvalue'] },
},
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
request: {
method: 'POST',
url: 'http://example.com',
proto: 'HTTP/2',
headers: { headerkey: ['headervalue'] },
body: '{"key": "value"}',
query_parameters: { paramkey: ['paramvalue'] },
},
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
];
const expected = {
implementationVersion: 'v0',
input: [
{ output: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } } },
{ output: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } } },
{ output: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } } },
],
};

const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input);

expect(result).toEqual(expected);
});

it('should convert input from v2 to v1 format when the request version is v2 and the implementation version is v1', () => {
const sourceType = 'webhook';
const requestVersion = 'v2';

const input = [
{
request: {
method: 'POST',
url: 'http://example.com',
proto: 'HTTP/2',
headers: { headerkey: ['headervalue'] },
body: '{"key": "value"}',
query_parameters: { paramkey: ['paramvalue'] },
},
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
request: {
method: 'POST',
url: 'http://example.com',
proto: 'HTTP/2',
headers: { headerkey: ['headervalue'] },
body: '{"key": "value"}',
query_parameters: { paramkey: ['paramvalue'] },
},
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
{
request: {
method: 'POST',
url: 'http://example.com',
proto: 'HTTP/2',
headers: { headerkey: ['headervalue'] },
body: '{"key": "value"}',
query_parameters: { paramkey: ['paramvalue'] },
},
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
];
const expected = {
implementationVersion: 'v1',
input: [
{
output: {
event: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
},
{
output: {
event: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
},
{
output: {
event: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } },
source: { id: 'source_id', config: { configField1: 'configVal1' } },
},
},
],
};

Expand Down
Loading

0 comments on commit 3126957

Please sign in to comment.