Skip to content

Commit

Permalink
feat: onboard plugin integration service
Browse files Browse the repository at this point in the history
  • Loading branch information
utsabc committed Nov 16, 2023
1 parent ff80b88 commit 57729c6
Show file tree
Hide file tree
Showing 16 changed files with 3,859 additions and 634 deletions.
3,060 changes: 2,911 additions & 149 deletions package-lock.json

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"@koa/router": "^12.0.0",
"@ndhoule/extend": "^2.0.0",
"@pyroscope/nodejs": "^0.2.6",
"@rudderstack/integrations-lib": "^0.1.8",
"@rudderstack/integrations-lib": "^0.1.9",
"@rudderstack/integrations-store": "^0.1.1",
"@rudderstack/workflow-engine": "^0.6.9",
"ajv": "^8.12.0",
"ajv-draft-04": "^1.0.0",
Expand All @@ -80,6 +81,7 @@
"handlebars": "^4.7.7",
"http-graceful-shutdown": "^3.1.13",
"https-proxy-agent": "^5.0.1",
"i": "^0.3.7",
"ioredis": "^5.3.2",
"is": "^3.3.0",
"is-ip": "^3.1.0",
Expand All @@ -99,6 +101,7 @@
"moment-timezone": "^0.5.43",
"node-cache": "^5.1.2",
"node-fetch": "^2.6.12",
"npm": "^10.2.4",
"oauth-1.0a": "^2.2.6",
"object-hash": "^3.0.0",
"parse-static-imports": "^1.1.0",
Expand Down
4 changes: 2 additions & 2 deletions src/cdk/v2/destinations/webhook/rtWorkflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ steps:
- name: successfulEvents
template: |
$.outputs.transform#idx.output.({
"batchedRequest": .,
"batched": false,
"batchedRequest": [.],
"batched": true,
"destination": ^[idx].destination,
"metadata": ^[idx].metadata[],
"statusCode": 200
Expand Down
251 changes: 251 additions & 0 deletions src/helpers/pluginAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable @typescript-eslint/naming-convention */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable no-restricted-syntax */
import {
RudderStackEvent,
RudderStackEventPayload,
Integration,
Destination,
WorkflowType,
Metadata,
get,
} from '@rudderstack/integrations-lib';
import { IntegrationsFactory } from '@rudderstack/integrations-store';
import groupBy from 'lodash/groupBy';
import {
ProcessorTransformationRequest,
RouterTransformationRequestData,
TransformedOutput,
} from '../types';
import { generateErrorObject } from '../v0/util';
import { MappedToDestinationKey } from '../constants';

// error handling

export class PluginAdapter {
private static pluginCache: Map<string, Integration> = new Map();

private static async getPlugin(
integrationName: string,
workflowType: WorkflowType,
): Promise<Integration> {
const cacheKey = `${integrationName}_${workflowType}`;
if (this.pluginCache.has(cacheKey)) {
return this.pluginCache.get(cacheKey) as Integration;
}
// TODO: default integration config need to make it dynamic by making some sort of config call or get it from config file
// const integrationConfig: IntegrationConfig = {
// name: integrationName,
// saveResponse: true,
// eventOrdering: true,
// plugins: ['preprocessor', 'multiplexer'],
// };

const integration = await IntegrationsFactory.createIntegration(integrationName, workflowType);
this.pluginCache.set(cacheKey, integration);
return integration;
}

public static async transformAtProcessor(
inputs: ProcessorTransformationRequest[],
integrationName: string,
) {
const mappedToDestination = get(inputs[0].message, MappedToDestinationKey);
const workflowType = mappedToDestination ? WorkflowType.RETL : WorkflowType.STREAM;
const integrationPlugin = await PluginAdapter.getPlugin(integrationName, workflowType);

const groupedEventsByDestinationId = groupBy(
inputs,
(ev: ProcessorTransformationRequest) => ev.destination?.ID,
);
const eventsPerDestinationId: ProcessorTransformationRequest[][] = Object.values(
groupedEventsByDestinationId,
);

const result = await Promise.all(
eventsPerDestinationId.map(async (inputs) => {
const events = inputs.map((input) => ({
event: [{ message: input.message as RudderStackEvent } as RudderStackEventPayload],
metadata: [input.metadata],
}));
const { destination } = inputs[0];
const output = await integrationPlugin.execute(events, destination);
const responseList = output.resultContext;
const errors = output.errorResults;

const errorList: { metadata: Metadata; response: any; destination: Destination }[] = [];
// handle the error scenario
if (errors.length > 0) {
const nestedErrorList = errors.map((e) => {
const errResponses = e.metadata.map((metadata) => ({

Check warning on line 81 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L80-L81

Added lines #L80 - L81 were not covered by tests
metadata,
response: generateErrorObject(e.error), // add further tags here
destination: e.destination,
}));
return errResponses;

Check warning on line 86 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L86

Added line #L86 was not covered by tests
});
errorList.push(...nestedErrorList.flat());

Check warning on line 88 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L88

Added line #L88 was not covered by tests
}

// handle the success scenario
const transformedPayloadList: {
payload: TransformedOutput;
metadata: Metadata;
destination: Destination;
}[] = [];
for (const [_, response] of responseList.entries()) {
for (const [index, payload] of response.payload.entries()) {
const transformedPayload = payload as TransformedOutput;
transformedPayloadList.push({
payload: transformedPayload,
metadata: response.metadata[index],
destination,
});
}
}
return { transformedPayloadList, errorList };
}),
);

const allSuccessList: {
payload: TransformedOutput;
metadata: Metadata;
destination: Destination;
}[] = result.flatMap((res) => res.transformedPayloadList);
const allErrorList: { metadata: Metadata; response: any; destination: Destination }[] =
result.flatMap((res) => res.errorList);

return { allSuccessList, allErrorList };
}

public static async transformAtRouter(
inputs: RouterTransformationRequestData[],
integrationName: string,
) {
const mappedToDestination = get(inputs[0].message, MappedToDestinationKey);
const workflowType = mappedToDestination ? WorkflowType.RETL : WorkflowType.STREAM;

const integrationPlugin = await PluginAdapter.getPlugin(integrationName, workflowType);
// group events by destinationId
// example: { destinationId1: [event1, event2], destinationId2: [event3, event4]}
const groupedEventsByDestinationId = groupBy(
inputs,
(ev: RouterTransformationRequestData) => ev.destination?.ID,
);
// example: [[event1, event2], [event3, event4]]
const eventsPerDestinationId: RouterTransformationRequestData[][] = Object.values(
groupedEventsByDestinationId,
);

const result = await Promise.all(
eventsPerDestinationId.map(async (inputs) => {
const input = inputs.map((input) => ({
event: [{ message: input.message as RudderStackEvent } as RudderStackEventPayload],
metadata: [input.metadata],
}));

const { destination } = inputs[0];

// calling the plugin and we can expect batched and multiplexed responses
// example: [ { payload: [event1, event2, event3], metadata: [metadata1, metadata2, metdata3] }, { payload: [event3, event4], metadata: [metadata3, metadata4] } ]

const output = await integrationPlugin.execute(input, destination);
const responseList = output.resultContext;
const errors = output.errorResults;

// handle error scenario
const errorList: { metadata: Metadata; response: any; destination: Destination }[] = [];
if (errors.length > 0) {
const nestedErrorList = errors.map((e) => {
const errResponses = e.metadata.map((metadata) => ({

Check warning on line 161 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L160-L161

Added lines #L160 - L161 were not covered by tests
metadata,
response: generateErrorObject(e.error), // add further tags here
destination,
}));
return errResponses;

Check warning on line 166 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L166

Added line #L166 was not covered by tests
});
errorList.push(...nestedErrorList.flat());

Check warning on line 168 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L168

Added line #L168 was not covered by tests
}

// handle success scenraio
// ranking the responses based on the number of unique jobIds in the metadata array
const uniqueJobRank: { uniqueJobIds: number; index: number }[] = [];
for (const [index, response] of responseList.entries()) {
const uniqueJobIds = Array.from(new Set(response.metadata.map((meta) => meta.jobId)));
uniqueJobRank.push({
uniqueJobIds: uniqueJobIds.length,
index,
});
}
uniqueJobRank.sort((a, b) => b.uniqueJobIds - a.uniqueJobIds);
// ranking ends here with uniqueJobRank containing the index of the responseList in the order of the number of unique jobIds in the metadata array
// example: [ { uniqueJobIds: 3, index: 0 }, { uniqueJobIds: 2, index: 1 } ]

const finalResponse: {
payload: TransformedOutput[];
metadata: Metadata[];
destination: Destination;
}[] = [];
// creating a map of jobId to position in the metadata array
// example: { jobId1: 1, jobId2: 1, jobId3: 0, jobId4: 2}
// motivation: prevent metadata duplication in the final response at all levels
const jobIdPositionMap: Map<number, number> = new Map();
for (const rank of uniqueJobRank) {
// iteratively checking payloads with the highest number of unique jobIds to lowest
const rankedResponse = responseList[rank.index];
let isCurrentResponseAddedToFinalPayload = false;
// iterate each metadata in the rankedResponse to check if any jobId is already present in the finalResponse
for (const meta of rankedResponse.metadata) {
// check if the jobId already has a position in final response
if (jobIdPositionMap.has(meta.jobId)) {
// if yes, then we need append the entire rankedResponse including all the payloads and metadata at same position
const position = jobIdPositionMap.get(meta.jobId) as number;
const currentOutput = rankedResponse.payload.map(
(payload) => payload as TransformedOutput,

Check warning on line 205 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L203-L205

Added lines #L203 - L205 were not covered by tests
);
finalResponse[position].payload.push(...currentOutput);

Check warning on line 207 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L207

Added line #L207 was not covered by tests
// push metdata to final response only if it is not already present
rankedResponse.metadata.forEach((meta) => {

Check warning on line 209 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L209

Added line #L209 was not covered by tests
// get all the exisitng jobIds in the metadata array at the position from the finalResponse
const jobIdsInResponse = finalResponse[position].metadata.map(
(fRmeta) => fRmeta.jobId,

Check warning on line 212 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L211-L212

Added lines #L211 - L212 were not covered by tests
);
// check if the jobId is already present in the metadata array
if (!jobIdsInResponse.includes(meta.jobId)) {
finalResponse[position].metadata.push(meta);

Check warning on line 216 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L216

Added line #L216 was not covered by tests
}
});
finalResponse[position].destination = destination;
isCurrentResponseAddedToFinalPayload = true;

Check warning on line 220 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L219-L220

Added lines #L219 - L220 were not covered by tests
// break the loop as we have already appended the entire rankedResponse to the finalResponse
break;

Check warning on line 222 in src/helpers/pluginAdapter.ts

View check run for this annotation

Codecov / codecov/patch

src/helpers/pluginAdapter.ts#L222

Added line #L222 was not covered by tests
}
}
// if the current rankedResponse is not added to the finalResponse, then we need to add it as a new entry
if (!isCurrentResponseAddedToFinalPayload) {
finalResponse.push({
payload: rankedResponse.payload.map((payload) => payload as TransformedOutput),
metadata: rankedResponse.metadata,
destination,
});
// update the jobIdPositionMap for all the jobIds in the rankedResponse
rankedResponse.metadata.forEach((meta) => {
jobIdPositionMap.set(meta.jobId, finalResponse.length - 1);
});
}
}
return { transformedPayloadList: finalResponse, errorList };
}),
);
const allSuccessList: {
payload: TransformedOutput[];
metadata: Metadata[];
destination: Destination;
}[] = result.flatMap((res) => res.transformedPayloadList);
const allErrorList: { metadata: Metadata; response: any; destination: Destination }[] =
result.flatMap((res) => res.errorList);

return { allSuccessList, allErrorList };
}
}
12 changes: 12 additions & 0 deletions src/helpers/serviceSelector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { NativeIntegrationDestinationService } from '../services/destination/nat
import { SourceService } from '../interfaces/SourceService';
import { NativeIntegrationSourceService } from '../services/source/nativeIntegration';
import { ComparatorService } from '../services/comparator';
import { PluginIntegrationService } from '../services/destination/pluginIntegration';
import { FixMe } from '../util/types';

export class ServiceSelector {
Expand All @@ -17,6 +18,7 @@ export class ServiceSelector {
[INTEGRATION_SERVICE.CDK_V1_DEST]: CDKV1DestinationService,
[INTEGRATION_SERVICE.CDK_V2_DEST]: CDKV2DestinationService,
[INTEGRATION_SERVICE.NATIVE_DEST]: NativeIntegrationDestinationService,
[INTEGRATION_SERVICE.PLUGIN_DEST]: PluginIntegrationService,
[INTEGRATION_SERVICE.NATIVE_SOURCE]: NativeIntegrationSourceService,
};

Expand All @@ -28,6 +30,10 @@ export class ServiceSelector {
return Boolean(destinationDefinitionConfig?.cdkV2Enabled);
}

private static isPluginDestination(destinationDefinitionConfig: FixMe) {
return !!destinationDefinitionConfig?.isPlugin;
}

private static isComparatorEnabled(destinationDefinitionConfig: FixMe): boolean {
return (
process.env.COMPARATOR_ENABLED === 'true' &&
Expand Down Expand Up @@ -68,6 +74,12 @@ export class ServiceSelector {
): DestinationService {
const destinationDefinitionConfig: FixMe =
events[0]?.destination?.DestinationDefinition?.Config;

if (this.isPluginDestination(destinationDefinitionConfig)) {
return this.fetchCachedService(INTEGRATION_SERVICE.PLUGIN_DEST);
}
// Legacy Services

if (this.isCdkDestination(destinationDefinitionConfig)) {
return this.fetchCachedService(INTEGRATION_SERVICE.CDK_V1_DEST);
}
Expand Down
5 changes: 3 additions & 2 deletions src/routes/utils/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ const INTEGRATION_SERVICE = {
CDK_V2_DEST: 'cdkv2_dest',
NATIVE_DEST: 'native_dest',
NATIVE_SOURCE: 'native_source',
PLUGIN_DEST: 'plugin_dest',
};
const CHANNELS= {
sources: 'sources'
const CHANNELS = {
sources: 'sources',
};

const RETL_TIMESTAMP = 'timestamp';
Expand Down
7 changes: 5 additions & 2 deletions src/services/comparator.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* eslint-disable class-methods-use-this */
import { Destination } from '@rudderstack/integrations-lib';
import { DestinationService } from '../interfaces/DestinationService';
import {
DeliveryResponse,
Destination,
ErrorDetailer,
MetaTransferObject,
ProcessorTransformationOutput,
Expand Down Expand Up @@ -60,7 +60,10 @@ export class ComparatorService implements DestinationService {
}

private getTestThreshold(destination: Destination) {
return destination.DestinationDefinition?.Config?.camparisonTestThreshold || 0;
const config = destination.DestinationDefinition?.Config as {
camparisonTestThreshold?: number;
};
return config?.camparisonTestThreshold || 0;
}

private getComparisonLogs(
Expand Down
Loading

0 comments on commit 57729c6

Please sign in to comment.