From 31269571c51500b65971c8b61fa22c529181f5b5 Mon Sep 17 00:00:00 2001 From: Vinay Teki Date: Thu, 17 Oct 2024 18:25:20 +0530 Subject: [PATCH] feat: sources v2 spec support along with adapters --- .python-version | 1 + src/controllers/__tests__/source.test.ts | 111 +++++++++++- src/controllers/util/index.test.ts | 163 ++++++++++++++++-- src/controllers/util/index.ts | 62 ++++++- src/features.json | 3 +- src/interfaces/SourceService.ts | 8 +- src/middlewares/routeActivation.ts | 15 ++ .../__tests__/nativeIntegration.test.ts | 20 ++- src/services/source/nativeIntegration.ts | 18 +- src/types/index.ts | 22 +++ test/apitests/service.api.test.ts | 7 + 11 files changed, 393 insertions(+), 37 deletions(-) create mode 100644 .python-version diff --git a/.python-version b/.python-version new file mode 100644 index 0000000000..1effb00340 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +2.7 diff --git a/src/controllers/__tests__/source.test.ts b/src/controllers/__tests__/source.test.ts index 565f39d559..72bee83282 100644 --- a/src/controllers/__tests__/source.test.ts +++ b/src/controllers/__tests__/source.test.ts @@ -6,6 +6,7 @@ import { applicationRoutes } from '../../routes'; import { NativeIntegrationSourceService } from '../../services/source/nativeIntegration'; import { ServiceSelector } from '../../helpers/serviceSelector'; import { ControllerUtility } from '../util/index'; +import { SourceInputConversionResult } from '../../types'; let server: any; const OLD_ENV = process.env; @@ -38,6 +39,19 @@ const getData = () => { return [{ event: { a: 'b1' } }, { event: { a: 'b2' } }]; }; +const getV2Data = () => { + return [ + { request: { body: '{"a": "b"}' }, source: { id: 1 } }, + { request: { body: '{"a": "b"}' }, source: { id: 1 } }, + ]; +}; + +const getConvertedData = () => { + return getData().map((eventInstance) => { + return { output: eventInstance } as SourceInputConversionResult; + }); +}; + describe('Source controller tests', () => { describe('V0 Source transform tests', () => { test('successful source transform', async () => { @@ -49,7 +63,7 @@ describe('Source controller tests', () => { mockSourceService.sourceTransformRoutine = jest .fn() .mockImplementation((i, s, v, requestMetadata) => { - expect(i).toEqual(getData()); + expect(i).toEqual(getConvertedData()); expect(s).toEqual(sourceType); expect(v).toEqual(version); return testOutput; @@ -66,7 +80,7 @@ describe('Source controller tests', () => { expect(s).toEqual(sourceType); expect(v).toEqual(version); expect(e).toEqual(getData()); - return { implementationVersion: version, input: e }; + return { implementationVersion: version, input: getConvertedData() }; }); const response = await request(server) @@ -139,7 +153,7 @@ describe('Source controller tests', () => { mockSourceService.sourceTransformRoutine = jest .fn() .mockImplementation((i, s, v, requestMetadata) => { - expect(i).toEqual(getData()); + expect(i).toEqual(getConvertedData()); expect(s).toEqual(sourceType); expect(v).toEqual(version); return testOutput; @@ -156,7 +170,7 @@ describe('Source controller tests', () => { expect(s).toEqual(sourceType); expect(v).toEqual(version); expect(e).toEqual(getData()); - return { implementationVersion: version, input: e }; + return { implementationVersion: version, input: getConvertedData() }; }); const response = await request(server) @@ -217,4 +231,93 @@ describe('Source controller tests', () => { expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1); }); }); + + describe('V2 Source transform tests', () => { + test('successful source transform', async () => { + const sourceType = '__rudder_test__'; + const version = 'v2'; + const testOutput = [{ event: { a: 'b' }, source: { id: 'id' } }]; + + const mockSourceService = new NativeIntegrationSourceService(); + mockSourceService.sourceTransformRoutine = jest + .fn() + .mockImplementation((i, s, v, requestMetadata) => { + expect(i).toEqual(getConvertedData()); + expect(s).toEqual(sourceType); + expect(v).toEqual(version); + return testOutput; + }); + const getNativeSourceServiceSpy = jest + .spyOn(ServiceSelector, 'getNativeSourceService') + .mockImplementation(() => { + return mockSourceService; + }); + + const adaptInputToVersionSpy = jest + .spyOn(ControllerUtility, 'adaptInputToVersion') + .mockImplementation((s, v, e) => { + expect(s).toEqual(sourceType); + expect(v).toEqual(version); + expect(e).toEqual(getV2Data()); + return { implementationVersion: version, input: getConvertedData() }; + }); + + const response = await request(server) + .post('/v2/sources/__rudder_test__') + .set('Accept', 'application/json') + .send(getV2Data()); + + expect(response.status).toEqual(200); + expect(response.body).toEqual(testOutput); + + expect(response.header['apiversion']).toEqual('2'); + + expect(getNativeSourceServiceSpy).toHaveBeenCalledTimes(1); + expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1); + expect(mockSourceService.sourceTransformRoutine).toHaveBeenCalledTimes(1); + }); + + test('failing source transform', async () => { + const sourceType = '__rudder_test__'; + const version = 'v2'; + const mockSourceService = new NativeIntegrationSourceService(); + const getNativeSourceServiceSpy = jest + .spyOn(ServiceSelector, 'getNativeSourceService') + .mockImplementation(() => { + return mockSourceService; + }); + + const adaptInputToVersionSpy = jest + .spyOn(ControllerUtility, 'adaptInputToVersion') + .mockImplementation((s, v, e) => { + expect(s).toEqual(sourceType); + expect(v).toEqual(version); + expect(e).toEqual(getV2Data()); + throw new Error('test error'); + }); + + const response = await request(server) + .post('/v2/sources/__rudder_test__') + .set('Accept', 'application/json') + .send(getV2Data()); + + const expectedResp = [ + { + error: 'test error', + statTags: { + errorCategory: 'transformation', + }, + statusCode: 500, + }, + ]; + + expect(response.status).toEqual(200); + expect(response.body).toEqual(expectedResp); + + expect(response.header['apiversion']).toEqual('2'); + + expect(getNativeSourceServiceSpy).toHaveBeenCalledTimes(1); + expect(adaptInputToVersionSpy).toHaveBeenCalledTimes(1); + }); + }); }); diff --git a/src/controllers/util/index.test.ts b/src/controllers/util/index.test.ts index 6065920846..6ab2336b71 100644 --- a/src/controllers/util/index.test.ts +++ b/src/controllers/util/index.test.ts @@ -19,9 +19,9 @@ describe('adaptInputToVersion', () => { const expected = { implementationVersion: undefined, input: [ - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, ], }; @@ -40,9 +40,9 @@ describe('adaptInputToVersion', () => { const expected = { implementationVersion: 'v0', input: [ - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, ], }; @@ -71,16 +71,22 @@ describe('adaptInputToVersion', () => { implementationVersion: 'v1', input: [ { - event: { key1: 'val1', key2: 'val2' }, - source: { id: 'source_id', config: { configField1: 'configVal1' } }, + output: { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, }, { - event: { key1: 'val1', key2: 'val2' }, - source: { id: 'source_id', config: { configField1: 'configVal1' } }, + output: { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, }, { - event: { key1: 'val1', key2: 'val2' }, - source: { id: 'source_id', config: { configField1: 'configVal1' } }, + output: { + event: { key1: 'val1', key2: 'val2' }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, }, ], }; @@ -100,9 +106,9 @@ describe('adaptInputToVersion', () => { const expected = { implementationVersion: 'v1', input: [ - { event: { key1: 'val1', key2: 'val2' }, source: undefined }, - { event: { key1: 'val1', key2: 'val2' }, source: undefined }, - { event: { key1: 'val1', key2: 'val2' }, source: undefined }, + { output: { event: { key1: 'val1', key2: 'val2' }, source: undefined } }, + { output: { event: { key1: 'val1', key2: 'val2' }, source: undefined } }, + { output: { event: { key1: 'val1', key2: 'val2' }, source: undefined } }, ], }; @@ -131,9 +137,130 @@ describe('adaptInputToVersion', () => { const expected = { implementationVersion: 'v0', input: [ - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, - { key1: 'val1', key2: 'val2' }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, + { output: { key1: 'val1', key2: 'val2' } }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + + it('should convert input from v2 to v0 format when the request version is v2 and the implementation version is v0', () => { + const sourceType = 'pipedream'; + const requestVersion = 'v2'; + + const input = [ + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + const expected = { + implementationVersion: 'v0', + input: [ + { output: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } } }, + { output: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } } }, + { output: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } } }, + ], + }; + + const result = ControllerUtility.adaptInputToVersion(sourceType, requestVersion, input); + + expect(result).toEqual(expected); + }); + + it('should convert input from v2 to v1 format when the request version is v2 and the implementation version is v1', () => { + const sourceType = 'webhook'; + const requestVersion = 'v2'; + + const input = [ + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + { + request: { + method: 'POST', + url: 'http://example.com', + proto: 'HTTP/2', + headers: { headerkey: ['headervalue'] }, + body: '{"key": "value"}', + query_parameters: { paramkey: ['paramvalue'] }, + }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + ]; + const expected = { + implementationVersion: 'v1', + input: [ + { + output: { + event: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + }, + { + output: { + event: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + }, + { + output: { + event: { key: 'value', query_parameters: { paramkey: ['paramvalue'] } }, + source: { id: 'source_id', config: { configField1: 'configVal1' } }, + }, + }, ], }; diff --git a/src/controllers/util/index.ts b/src/controllers/util/index.ts index c5bf7ab358..b562381ed6 100644 --- a/src/controllers/util/index.ts +++ b/src/controllers/util/index.ts @@ -10,6 +10,8 @@ import { RouterTransformationRequestData, RudderMessage, SourceInput, + SourceInputConversionResult, + SourceInputV2, } from '../../types'; import { getValueFromMessage } from '../../v0/util'; import genericFieldMap from '../../v0/util/data/GenericFieldMapping.json'; @@ -45,28 +47,72 @@ export class ControllerUtility { return this.sourceVersionMap; } - private static convertSourceInputv1Tov0(sourceEvents: SourceInput[]): NonNullable[] { - return sourceEvents.map((sourceEvent) => sourceEvent.event); + private static convertSourceInputv1Tov0( + sourceEvents: SourceInput[], + ): SourceInputConversionResult>[] { + return sourceEvents.map((sourceEvent) => ({ + output: sourceEvent.event as NonNullable, + })); } - private static convertSourceInputv0Tov1(sourceEvents: unknown[]): SourceInput[] { - return sourceEvents.map( - (sourceEvent) => ({ event: sourceEvent, source: undefined }) as SourceInput, - ); + private static convertSourceInputv0Tov1( + sourceEvents: unknown[], + ): SourceInputConversionResult[] { + return sourceEvents.map((sourceEvent) => ({ + output: { event: sourceEvent, source: undefined } as SourceInput, + })); + } + + private static convertSourceInputv2Tov0( + sourceEvents: SourceInputV2[], + ): SourceInputConversionResult>[] { + return sourceEvents.map((sourceEvent) => { + try { + const v0Event = JSON.parse(sourceEvent.request.body); + v0Event.query_parameters = sourceEvent.request.query_parameters; + return { output: v0Event }; + } catch (err) { + const conversionError = + err instanceof Error ? err : new Error('error converting v2 to v0 spec'); + return { output: {} as NonNullable, conversionError }; + } + }); + } + + private static convertSourceInputv2Tov1( + sourceEvents: SourceInputV2[], + ): SourceInputConversionResult[] { + return sourceEvents.map((sourceEvent) => { + try { + const v1Event = { event: JSON.parse(sourceEvent.request.body), source: sourceEvent.source }; + v1Event.event.query_parameters = sourceEvent.request.query_parameters; + return { output: v1Event }; + } catch (err) { + const conversionError = + err instanceof Error ? err : new Error('error converting v2 to v1 spec'); + return { output: {} as SourceInput, conversionError }; + } + }); } public static adaptInputToVersion( sourceType: string, requestVersion: string, input: NonNullable[], - ): { implementationVersion: string; input: NonNullable[] } { + ): { implementationVersion: string; input: SourceInputConversionResult>[] } { const sourceToVersionMap = this.getSourceVersionsMap(); const implementationVersion = sourceToVersionMap.get(sourceType); - let updatedInput: NonNullable[] = input; + let updatedInput: SourceInputConversionResult>[] = input.map((event) => ({ + output: event, + })); if (requestVersion === 'v0' && implementationVersion === 'v1') { updatedInput = this.convertSourceInputv0Tov1(input); } else if (requestVersion === 'v1' && implementationVersion === 'v0') { updatedInput = this.convertSourceInputv1Tov0(input as SourceInput[]); + } else if (requestVersion === 'v2' && implementationVersion === 'v0') { + updatedInput = this.convertSourceInputv2Tov0(input as SourceInputV2[]); + } else if (requestVersion === 'v2' && implementationVersion === 'v1') { + updatedInput = this.convertSourceInputv2Tov1(input as SourceInputV2[]); } return { implementationVersion, input: updatedInput }; } diff --git a/src/features.json b/src/features.json index 63862eefed..fc164aa250 100644 --- a/src/features.json +++ b/src/features.json @@ -101,5 +101,6 @@ "EMARSYS" ], "supportSourceTransformV1": true, - "supportTransformerProxyV1": true + "supportTransformerProxyV1": true, + "upgradedToSourceTransformV2": true } diff --git a/src/interfaces/SourceService.ts b/src/interfaces/SourceService.ts index c7de8cfe8b..32a7125e7a 100644 --- a/src/interfaces/SourceService.ts +++ b/src/interfaces/SourceService.ts @@ -1,10 +1,14 @@ -import { MetaTransferObject, SourceTransformationResponse } from '../types/index'; +import { + MetaTransferObject, + SourceInputConversionResult, + SourceTransformationResponse, +} from '../types/index'; export interface SourceService { getTags(): MetaTransferObject; sourceTransformRoutine( - sourceEvents: NonNullable[], + sourceEvents: SourceInputConversionResult>[], sourceType: string, version: string, requestMetadata: NonNullable, diff --git a/src/middlewares/routeActivation.ts b/src/middlewares/routeActivation.ts index ffb1e15e80..126749b083 100644 --- a/src/middlewares/routeActivation.ts +++ b/src/middlewares/routeActivation.ts @@ -106,4 +106,19 @@ export class RouteActivationMiddleware { RouteActivationMiddleware.shouldActivateRoute(destination, deliveryFilterList), ); } + + // This middleware will be used by source endpoint when we completely deprecate v0, v1 versions. + public static isVersionAllowed(ctx: Context, next: Next) { + const { version } = ctx.params; + if (version === 'v0' || version === 'v1') { + ctx.status = 500; + ctx.body = + '/v0, /v1 versioned endpoints are deprecated. Use /v2 version endpoint. This is probably caused because of source transformation call from an outdated rudder-server version. Please upgrade rudder-server to a minimum of 1.xx.xx version.'; + } else if (version === 'v2') { + next(); + } else { + ctx.status = 404; + ctx.body = 'Path not found. Verify the version of your api call.'; + } + } } diff --git a/src/services/source/__tests__/nativeIntegration.test.ts b/src/services/source/__tests__/nativeIntegration.test.ts index 2ef8129cdc..51bb37f5f1 100644 --- a/src/services/source/__tests__/nativeIntegration.test.ts +++ b/src/services/source/__tests__/nativeIntegration.test.ts @@ -44,7 +44,15 @@ describe('NativeIntegration Source Service', () => { }); const service = new NativeIntegrationSourceService(); - const resp = await service.sourceTransformRoutine(events, sourceType, version, requestMetadata); + const adapterConvertedEvents = events.map((eventInstance) => { + return { output: eventInstance }; + }); + const resp = await service.sourceTransformRoutine( + adapterConvertedEvents, + sourceType, + version, + requestMetadata, + ); expect(resp).toEqual(tresponse); @@ -81,7 +89,15 @@ describe('NativeIntegration Source Service', () => { jest.spyOn(stats, 'increment').mockImplementation(() => {}); const service = new NativeIntegrationSourceService(); - const resp = await service.sourceTransformRoutine(events, sourceType, version, requestMetadata); + const adapterConvertedEvents = events.map((eventInstance) => { + return { output: eventInstance }; + }); + const resp = await service.sourceTransformRoutine( + adapterConvertedEvents, + sourceType, + version, + requestMetadata, + ); expect(resp).toEqual(tresponse); diff --git a/src/services/source/nativeIntegration.ts b/src/services/source/nativeIntegration.ts index 5c89de7b92..58a6a19649 100644 --- a/src/services/source/nativeIntegration.ts +++ b/src/services/source/nativeIntegration.ts @@ -4,6 +4,7 @@ import { ErrorDetailer, MetaTransferObject, RudderMessage, + SourceInputConversionResult, SourceTransformationEvent, SourceTransformationResponse, } from '../../types/index'; @@ -28,7 +29,7 @@ export class NativeIntegrationSourceService implements SourceService { } public async sourceTransformRoutine( - sourceEvents: NonNullable[], + sourceEvents: SourceInputConversionResult>[], sourceType: string, version: string, // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -39,7 +40,20 @@ export class NativeIntegrationSourceService implements SourceService { const respList: SourceTransformationResponse[] = await Promise.all( sourceEvents.map(async (sourceEvent) => { try { - const newSourceEvent = sourceEvent; + if (sourceEvent.conversionError) { + stats.increment('source_transform_errors', { + source: sourceType, + version, + }); + logger.debug(`Error during source Transform: ${sourceEvent.conversionError}`, { + ...logger.getLogMetadata(metaTO.errorDetails), + }); + return SourcePostTransformationService.handleFailureEventsSource( + sourceEvent.conversionError, + metaTO, + ); + } + const newSourceEvent = sourceEvent.output; const { headers } = newSourceEvent; delete newSourceEvent.headers; const respEvents: RudderMessage | RudderMessage[] | SourceTransformationResponse = diff --git a/src/types/index.ts b/src/types/index.ts index 45ec7445c3..0bc2cbc33b 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -355,6 +355,26 @@ type SourceInput = { event: NonNullable[]; source?: Source; }; + +type SourceRequestV2 = { + method: string; + url: string; + proto: string; + body: string; + headers: NonNullable; + query_parameters: NonNullable; +}; + +type SourceInputV2 = { + request: SourceRequestV2; + source?: Source; +}; + +type SourceInputConversionResult = { + output: T; + conversionError?: Error; +}; + export { ComparatorInput, DeliveryJobState, @@ -382,7 +402,9 @@ export { UserDeletionRequest, UserDeletionResponse, SourceInput, + SourceInputV2, Source, + SourceInputConversionResult, UserTransformationLibrary, UserTransformationResponse, UserTransformationServiceResponse, diff --git a/test/apitests/service.api.test.ts b/test/apitests/service.api.test.ts index 30d2c568a6..251c1bc756 100644 --- a/test/apitests/service.api.test.ts +++ b/test/apitests/service.api.test.ts @@ -80,6 +80,13 @@ describe('features tests', () => { const supportTransformerProxyV1 = JSON.parse(response.text).supportTransformerProxyV1; expect(typeof supportTransformerProxyV1).toBe('boolean'); }); + + test('features upgradedToSourceTransformV2 to be boolean', async () => { + const response = await request(server).get('/features'); + expect(response.status).toEqual(200); + const upgradedToSourceTransformV2 = JSON.parse(response.text).upgradedToSourceTransformV2; + expect(typeof upgradedToSourceTransformV2).toBe('boolean'); + }); }); describe('Api tests with a mock source/destination', () => {