From e97e7caed20ffb007f1c543e15c15c6e89e2dfb7 Mon Sep 17 00:00:00 2001 From: shrouti1507 <60211312+shrouti1507@users.noreply.github.com> Date: Wed, 20 Sep 2023 13:03:12 +0530 Subject: [PATCH] fix: bqstream event ordering fix (#2624) * fix: bqstream event ordering fix * fix: review comments addressed * fix: importing the common util function * fix: putting both error and successful events logic back * fix: test cases addition and code clean up * fix: code clean up * fix: resolving sonar error * fix: removing network handler for bqstream * fix: refactor process router dest --- src/v0/destinations/bqstream/transform.js | 73 ++-- src/v0/destinations/bqstream/util.js | 192 +++------ src/v0/destinations/bqstream/util.test.js | 269 +++++++++++++ .../destinations/bqstream/router/data.ts | 364 ++++++++++++++---- 4 files changed, 671 insertions(+), 227 deletions(-) create mode 100644 src/v0/destinations/bqstream/util.test.js diff --git a/src/v0/destinations/bqstream/transform.js b/src/v0/destinations/bqstream/transform.js index 4db1856535..0674f5e679 100644 --- a/src/v0/destinations/bqstream/transform.js +++ b/src/v0/destinations/bqstream/transform.js @@ -6,9 +6,11 @@ const { getSuccessRespEvents, checkInvalidRtTfEvents, handleRtTfSingleEventError, + groupEventsByType, } = require('../../util'); const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config'); const { InstrumentationError } = require('../../util/errorTypes'); +const { getRearrangedEvents } = require('./util'); const getInsertIdColValue = (properties, insertIdCol) => { if ( @@ -50,7 +52,7 @@ const process = (event) => { }; }; -const batchEvents = (eventsChunk) => { +const batchEachUserSuccessEvents = (eventsChunk) => { const batchedResponseList = []; // arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...] @@ -68,7 +70,7 @@ const batchEvents = (eventsChunk) => { chunk.forEach((ev) => { // Pixel code must be added above "batch": [..] batchResponseList.push(ev.message.properties); - metadata.push(ev.metadata); + metadata.push(ev.metadata[0]); }); batchEventResponse.batchedRequest = { @@ -97,26 +99,23 @@ const batchEvents = (eventsChunk) => { return batchedResponseList; }; -const processRouterDest = (inputs) => { - const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION); - if (errorRespEvents.length > 0) { - return errorRespEvents; - } - - const eventsChunk = []; // temporary variable to divide payload into chunks - const errorRespList = []; - - inputs.forEach((event) => { +const processEachTypedEventList = ( + typedEventList, + eachTypeSuccessEventList, + eachTypeErrorEventsList, +) => { + typedEventList.forEach((event) => { try { if (event.message.statusCode) { // already transformed event - eventsChunk.push(event); + eachTypeSuccessEventList.push(event); } else { // if not transformed - let response = process(event); - response = Array.isArray(response) ? response : [response]; - response.forEach((res) => { - eventsChunk.push({ + const response = process(event); + const transformedEvents = Array.isArray(response) ? response : [response]; + + transformedEvents.forEach((res) => { + eachTypeSuccessEventList.push({ message: res, metadata: event.metadata, destination: event.destination, @@ -124,16 +123,44 @@ const processRouterDest = (inputs) => { }); } } catch (error) { - const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION); - errorRespList.push(errRespEvent); + const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION); + eachTypeErrorEventsList.push(eachUserErrorEvent); } }); +}; - let batchedResponseList = []; - if (eventsChunk.length > 0) { - batchedResponseList = batchEvents(eventsChunk); +const processRouterDest = (inputs) => { + const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION); + if (errorRespEvents.length > 0) { + return errorRespEvents; } - return [...batchedResponseList, ...errorRespList]; + const finalResp = []; + + const batchedEvents = groupEventsByType(inputs); + + batchedEvents.forEach((typedEventList) => { + const eachTypeSuccessEventList = []; // list of events that are transformed successfully + const eachTypeErrorEventsList = []; // list of events that are errored out + processEachTypedEventList(typedEventList, eachTypeSuccessEventList, eachTypeErrorEventsList); + + const orderedEventsList = getRearrangedEvents( + eachTypeSuccessEventList, + eachTypeErrorEventsList, + ); + + orderedEventsList.forEach((eventList) => { + // no error event list will have more than one items in the list + if (eventList[0].error) { + finalResp.push([...eventList]); + } else { + // batch the successful events + const eachTypeBatchedResponse = batchEachUserSuccessEvents(eventList); + finalResp.push([...eachTypeBatchedResponse]); + } + }); + }); + + return finalResp.flat(); }; module.exports = { process, processRouterDest }; diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index 2448d72d76..06b7403c87 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -1,149 +1,83 @@ /* eslint-disable no-param-reassign */ -const getValue = require('get-value'); -const { - getDynamicErrorType, - processAxiosResponse, -} = require('../../../adapters/utils/networkUtils'); -const { - REFRESH_TOKEN, - AUTH_STATUS_INACTIVE, -} = require('../../../adapters/networkhandler/authConstants'); -const { isHttpStatusSuccess } = require('../../util'); -const { proxyRequest } = require('../../../adapters/network'); -const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes'); -const tags = require('../../util/tags'); - -const DESTINATION_NAME = 'bqstream'; - -const trimBqStreamResponse = (response) => ({ - code: getValue(response, 'response.response.data.error.code'), // data.error.status which contains PERMISSION_DENIED - status: getValue(response, 'response.response.status'), - statusText: getValue(response, 'response.response.statusText'), - headers: getValue(response, 'response.response.headers'), - data: getValue(response, 'response.response.data'), // Incase of errors, this contains error data - success: getValue(response, 'suceess'), -}); +const { isDefinedAndNotNull } = require('../../util'); /** - * Obtains the Destination OAuth Error Category based on the error code obtained from destination + * Optimizes the error response by merging the metadata of the same error type and adding it to the result array. * - * - If an error code is such that the user will not be allowed inside the destination, - * such error codes fall under AUTH_STATUS_INACTIVE - * - If an error code is such that upon refresh we can get a new token which can be used to send event, - * such error codes fall under REFRESH_TOKEN category - * - If an error code doesn't fall under both categories, we can return an empty string - * @param {string} errorCategory - The error code obtained from the destination - * @returns Destination OAuth Error Category + * @param {Object} item - An object representing an error event with properties like `error`, `jobId`, and `metadata`. + * @param {Map} errorMap - A Map object to store the error events and their metadata. + * @param {Array} resultArray - An array to store the optimized error response. + * @returns {void} */ -const getDestAuthCategory = (errorCategory) => { - switch (errorCategory) { - case 'PERMISSION_DENIED': - return AUTH_STATUS_INACTIVE; - case 'UNAUTHENTICATED': - return REFRESH_TOKEN; - default: - return ''; +const optimizeErrorResponse = (item, errorMap, resultArray) => { + const currentError = item.error; + if (errorMap.has(currentError)) { + // If the error already exists in the map, merge the metadata + const existingErrDetails = errorMap.get(currentError); + existingErrDetails.metadata.push(...item.metadata); + } else { + // Otherwise, add it to the map + errorMap.set(currentError, { ...item }); + resultArray.push([errorMap.get(currentError)]); } }; -const destToRudderStatusMap = { - 403: { - rateLimitExceeded: 429, - default: 400, - }, - 400: { - tableUnavailable: 500, - default: 400, - }, - 500: { default: 500 }, - 503: { default: 500 }, - 401: { default: 500 }, - 404: { default: 400 }, - 501: { default: 400 }, +const convertMetadataToArray = (eventList) => { + const processedEvents = eventList.map((event) => ({ + ...event, + metadata: Array.isArray(event.metadata) ? event.metadata : [event.metadata], + })); + return processedEvents; }; -const getStatusAndCategory = (dresponse, status) => { - const authErrorCategory = getDestAuthCategory(dresponse.error.status); - const reason = - dresponse.error.errors && - Array.isArray(dresponse.error.errors) && - dresponse.error.errors.length > 0 && - dresponse.error.errors[0].reason; +/** + * Formats a list of error events into a composite response. + * + * @param {Array} errorEvents - A list of error events, where each event can have an `error` property and a `metadata` array. + * @returns {Array} The formatted composite response, where each element is an array containing the error details. + */ +const formatCompositeResponse = (errorEvents) => { + const resultArray = []; + const errorMap = new Map(); - const trStatus = destToRudderStatusMap[status] - ? destToRudderStatusMap[status][reason] || destToRudderStatusMap[status].default - : 500; - return { status: trStatus, authErrorCategory }; + for (const item of errorEvents) { + if (isDefinedAndNotNull(item.error)) { + optimizeErrorResponse(item, errorMap, resultArray); + } + } + return resultArray; }; /** - * This class actually handles the response for BigQuery Stream API - * It can also be used for any Google related API but an API related handling has to be done separately - * - * Here we are only trying to handle OAuth related error(s) - * Any destination specific error handling has to be done in their own way - * - * Reference doc for OAuth Errors - * 1. https://cloud.google.com/apigee/docs/api-platform/reference/policies/oauth-http-status-code-reference - * 2. https://cloud.google.com/bigquery/docs/error-messages - * - * Summary: - * Abortable -> 403, 501, 400 - * Retryable -> 5[0-9][02-9], 401(UNAUTHENTICATED) - * "Special Cases": - * status=200, resp.insertErrors.length > 0 === Failure - * 403 => AccessDenied -> AUTH_STATUS_INACTIVE, other 403 => Just abort + * Rearranges the events based on their success or error status. + * If there are no successful events, it groups error events with the same error and their metadata. + * If there are successful events, it returns the batched response of successful events. * + * @param {Array} successEventList - An array of objects representing successful events. + * Each object should have an `id` and `metadata` property. + * @param {Array} errorEventList - An array of objects representing error events. + * Each object should have an `id`, `metadata`, and `error` property. + * @returns {Array} - An array of rearranged events. */ -const processResponse = ({ dresponse, status } = {}) => { - const isSuccess = - !dresponse.error && - isHttpStatusSuccess(status) && - (!dresponse.insertErrors || (dresponse.insertErrors && dresponse.insertErrors.length === 0)); +const getRearrangedEvents = (successEventList, errorEventList) => { + // Convert 'metadata' to an array if it's not already + const processedSuccessfulEvents = convertMetadataToArray(successEventList); + const processedErrorEvents = convertMetadataToArray(errorEventList); - if (!isSuccess) { - if (dresponse.error) { - const { status: trStatus } = getStatusAndCategory(dresponse, status); - throw new NetworkError( - dresponse.error.message || `Request failed for ${DESTINATION_NAME} with status: ${status}`, - trStatus, - { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(trStatus), - }, - dresponse, - ); - } else if (dresponse.insertErrors && dresponse.insertErrors.length > 0) { - const temp = trimBqStreamResponse(dresponse); - throw new AbortedError( - 'Problem during insert operation', - 400, - { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(temp.status || 400), - }, - temp, - getDestAuthCategory(temp.code), - ); - } - throw new UnhandledStatusCodeError('Unhandled error type while sending to destination'); + // if there are no error events, then return the batched response + if (errorEventList.length === 0) { + return [processedSuccessfulEvents]; + } + // if there are no batched response, then return the error events + if (successEventList.length === 0) { + return formatCompositeResponse(processedErrorEvents); } -}; -const responseHandler = (respTransformPayload) => { - const { response, status } = respTransformPayload; - processResponse({ - dresponse: response, - status, - }); - return { - status, - destinationResponse: response, - message: 'Request Processed Successfully', - }; + // if there are both batched response and error events, then order them + const combinedTransformedEventList = [ + [...processedSuccessfulEvents], + ...formatCompositeResponse(processedErrorEvents), + ]; + return combinedTransformedEventList; }; -function networkHandler() { - this.responseHandler = responseHandler; - this.proxy = proxyRequest; - this.processAxiosResponse = processAxiosResponse; -} - -module.exports = { networkHandler }; +module.exports = { getRearrangedEvents }; diff --git a/src/v0/destinations/bqstream/util.test.js b/src/v0/destinations/bqstream/util.test.js new file mode 100644 index 0000000000..1e99a4f20e --- /dev/null +++ b/src/v0/destinations/bqstream/util.test.js @@ -0,0 +1,269 @@ +const { getRearrangedEvents } = require('./util'); + +describe('getRearrangedEvents', () => { + // Tests that the function returns an array of transformed events when there are no error events + it('should return an array of transformed events when all events are track and successfully transformed', () => { + const eachUserSuccessEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 3 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const eachUserErrorEventsList = []; + const expected = [ + [ + { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 3 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 5 }] }, + ], + ]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); + + // Tests that the function returns an empty array when both input arrays are empty + it('should return an empty array when both input arrays are empty', () => { + const eachUserSuccessEventslist = []; + const eachUserErrorEventsList = []; + const expected = [[]]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); + + // Tests that the function returns an array with only error events when all events are erroneous + it('should return an array with only error events when all events are erroneous', () => { + const eachUserSuccessEventslist = []; + const eachUserErrorEventsList = [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 3, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 5, userId: 'user12345' }], + }, + ]; + const expected = [ + [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [ + { jobId: 3, userId: 'user12345' }, + { jobId: 4, userId: 'user12345' }, + ], + }, + ], + [ + { + batched: false, + destination: {}, + error: "Invalid payload for the destination", + metadata: [ + { + jobId: 5, + userId: "user12345", + }, + ], + }, + ], + ]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); + // Tests that the function does not return an ordered array of events with both successful and erroneous events + it('case 1 : 1--> success, 2 --> fail, 3 --> success, 4 --> fail, 5 --> success', () => { + const errorEventsList = [ + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 2, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const successEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 3 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const expected = [ + [ + { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 3 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 5 }] } + ], + [ + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [ + { jobId: 2, userId: 'user12345' }, + { jobId: 4, userId: 'user12345' }, + ], + }, + ], + ]; + const result = getRearrangedEvents(successEventslist, errorEventsList); + expect(result).toEqual(expected); + }); + + it('case 2 : 1--> success, 2 --> success, 3 --> fail, 4 --> fail, 5 --> success', () => { + const errorEventsList = [ + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 3, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const successEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 2 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const expected = [ + [ + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 1 + } + ] + }, + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 2 + } + ] + }, + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 5 + } + ] + } + ], + [ + { + "batched": false, + "destination": {}, + "error": "Invalid payload for the destination", + "metadata": [ + { + "jobId": 3, + "userId": "user12345" + }, + { + "jobId": 4, + "userId": "user12345" + } + ] + } + ] + ] + const result = getRearrangedEvents(successEventslist, errorEventsList); + console.log(JSON.stringify(result)); + expect(result).toEqual(expected); + }); + + it('case 3 : 1--> fail, 2 --> success, 3 --> success, 4 --> fail', () => { + const errorEventsList = [ + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 1, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const successEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 2 } }, + { message: { type: 'track' }, metadata: { jobId: 3 } }, + ]; + const expected = [ + [ + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 2 + } + ] + }, + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 3 + } + ] + } + ], + [ + { + "batched": false, + "destination": {}, + "error": "Invalid payload for the destination", + "metadata": [ + { + "jobId": 1, + "userId": "user12345" + }, + { + "jobId": 4, + "userId": "user12345" + } + ] + } + ] + ] + const result = getRearrangedEvents(successEventslist, errorEventsList); + console.log(JSON.stringify(result)); + expect(result).toEqual(expected); + }); + +}); + diff --git a/test/integrations/destinations/bqstream/router/data.ts b/test/integrations/destinations/bqstream/router/data.ts index 0f6e663ad0..4fac9047cc 100644 --- a/test/integrations/destinations/bqstream/router/data.ts +++ b/test/integrations/destinations/bqstream/router/data.ts @@ -13,66 +13,141 @@ export const data = [ message: { type: 'track', event: 'insert product', - sentAt: '2021-09-08T11:10:45.466Z', userId: 'user12345', - channel: 'web', - context: { - os: { - name: '', - version: '', - }, - app: { - name: 'RudderLabs JavaScript SDK', - build: '1.0.0', - version: '1.1.18', - namespace: 'com.rudderlabs.javascript', - }, - page: { - url: 'http://127.0.0.1:5500/index.html', - path: '/index.html', - title: 'Document', - search: '', - tab_url: 'http://127.0.0.1:5500/index.html', - referrer: '$direct', - initial_referrer: '$direct', - referring_domain: '', - initial_referring_domain: '', - }, - locale: 'en-GB', - screen: { - width: 1536, - height: 960, - density: 2, - innerWidth: 1536, - innerHeight: 776, - }, - traits: {}, - library: { - name: 'RudderLabs JavaScript SDK', - version: '1.1.18', - }, - campaign: {}, - userAgent: - 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36', - }, - rudderId: 'fa2994a5-2a81-45fd-9919-fcf5596ad380', - messageId: 'e2d1a383-d9a2-4e03-a9dc-131d153c4d95', - timestamp: '2021-11-15T14:06:42.497+05:30', + properties: { count: 10, productId: 10, productName: 'Product-10', }, - receivedAt: '2021-11-15T14:06:42.497+05:30', - request_ip: '[::1]', anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', - integrations: { - All: true, - }, - originalTimestamp: '2021-09-08T11:10:45.466Z', }, metadata: { jobId: 1, + userId: 'user12345', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + properties: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + }, + metadata: { + jobId: 2, + userId: 'user12345', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'identify', + event: 'insert product', + userId: 'user12345', + traits: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', + }, + metadata: { + jobId: 3, + userId: 'user12345', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + properties: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', + }, + metadata: { + jobId: 5, + userId: 'user123', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + properties: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', + }, + metadata: { + jobId: 6, + userId: 'user124', }, destination: { Config: { @@ -84,7 +159,7 @@ export const data = [ eventDelivery: true, eventDeliveryTS: 1636965406397, }, - Enabled: true, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, @@ -93,19 +168,70 @@ export const data = [ message: { type: 'track', event: 'insert product', - sentAt: '2021-09-08T11:10:45.466Z', + userId: 'user12345', - channel: 'web', + }, + metadata: { + jobId: 7, + userId: 'user124', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + }, + metadata: { + jobId: 8, + userId: 'user125', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'identify', + event: 'insert product', + + userId: 'user12345', + context: { os: { - name: '', + Name: '', version: '', }, app: { - name: 'RudderLabs JavaScript SDK', + Name: 'RudderLabs JavaScript SDK', build: '1.0.0', version: '1.1.18', - namespace: 'com.rudderlabs.javascript', + Namespace: 'com.rudderlabs.javascript', }, page: { url: 'http://127.0.0.1:5500/index.html', @@ -128,31 +254,25 @@ export const data = [ }, traits: {}, library: { - name: 'RudderLabs JavaScript SDK', + Name: 'RudderLabs JavaScript SDK', version: '1.1.18', }, campaign: {}, userAgent: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36', }, - rudderId: 'fa2994a5-2a81-45fd-9919-fcf5596ad380', - messageId: 'e2d1a383-d9a2-4e03-a9dc-131d153c4d95', - timestamp: '2021-11-15T14:06:42.497+05:30', - properties: { + + traits: { count: 20, productId: 20, productName: 'Product-20', }, receivedAt: '2021-11-15T14:06:42.497+05:30', - request_ip: '[::1]', anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', - integrations: { - All: true, - }, - originalTimestamp: '2021-09-08T11:10:45.466Z', }, metadata: { - jobId: 2, + jobId: 9, + userId: 'user125', }, destination: { Config: { @@ -164,7 +284,7 @@ export const data = [ eventDelivery: true, eventDeliveryTS: 1636965406397, }, - Enabled: true, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, @@ -181,50 +301,144 @@ export const data = [ body: { output: [ { + batched: true, batchedRequest: { datasetId: 'gc_dataset', projectId: 'gc-project-id', properties: [ { count: 10, + insertId: '10', productId: 10, productName: 'Product-10', - insertId: '10', }, { count: 20, + insertId: '20', productId: 20, productName: 'Product-20', + }, + { + count: 20, insertId: '20', + productId: 20, + productName: 'Product-20', + }, + { + count: 20, + insertId: '20', + productId: 20, + productName: 'Product-20', }, ], tableId: 'gc_table', }, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, metadata: [ { jobId: 1, + userId: 'user12345', }, { jobId: 2, + userId: 'user12345', + }, + { + jobId: 5, + userId: 'user123', + }, + { + jobId: 6, + userId: 'user124', }, ], - batched: true, statusCode: 200, + }, + { + batched: false, destination: { Config: { - rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', - projectId: 'gc-project-id', datasetId: 'gc_dataset', - tableId: 'gc_table', - insertId: 'productId', eventDelivery: true, eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', }, - Enabled: true, ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, + error: 'Invalid payload for the destination', + metadata: [ + { + jobId: 7, + userId: 'user124', + }, + { + jobId: 8, + userId: 'user125', + }, + ], + statTags: { + destType: 'BQSTREAM', + errorCategory: 'dataValidation', + errorType: 'instrumentation', + feature: 'router', + implementation: 'native', + module: 'destination', + }, + statusCode: 400, }, + { + batched: false, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + + error: "Message Type not supported: identify", + metadata: [ + { + jobId: 3, + userId: "user12345" + }, + { + jobId: 9, + userId: "user125" + } + ], + statTags: { + destType: "BQSTREAM", + errorCategory: "dataValidation", + errorType: "instrumentation", + feature: "router", + implementation: "native", + module: "destination" + }, + statusCode: 400 + } ], }, },