diff --git a/src/v0/destinations/kafka/transform.js b/src/v0/destinations/kafka/transform.js index 210469b9e5..39334b5f52 100644 --- a/src/v0/destinations/kafka/transform.js +++ b/src/v0/destinations/kafka/transform.js @@ -5,6 +5,8 @@ const { getIntegrationsObj, getHashFromArray, removeUndefinedAndNullValues, + getSuccessRespEvents, + getErrorRespEvents, } = require('../../util'); // const { InstrumentationError } = require("../../util/errorTypes"); @@ -37,6 +39,10 @@ const filterConfigTopics = (message, destination) => { const batch = (destEvents) => { const respList = []; + if (!Array.isArray(destEvents) || destEvents.length <= 0) { + const respEvents = getErrorRespEvents(null, 400, 'Invalid event array'); + return [respEvents]; + } // Grouping the events by topic const groupedEvents = groupBy(destEvents, (event) => event.message.topic); @@ -52,9 +58,10 @@ const batch = (destEvents) => { metadata: events.map((event) => event.metadata), destination: events[0].destination, }; - respList.push(response); + respList.push( + getSuccessRespEvents(response.batchedRequest, response.metadata, response.destination, true), + ); } - return respList; }; diff --git a/test/__tests__/data/kafka_batch_input.json b/test/__tests__/data/kafka_batch_input.json index f62aa23200..fa4144406a 100644 --- a/test/__tests__/data/kafka_batch_input.json +++ b/test/__tests__/data/kafka_batch_input.json @@ -1,816 +1,910 @@ [ - { - "message": { - "userId": "user1", + [ + { "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", "userId": "user1", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" - }, - "library": { - "name": "http" - } - }, - "messageId": "6f27a4da-cefb-4800-acf1-f467e8aab91c", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" - }, - "topic": "new-topic" - }, - "metadata": { - "rudderId": "user1<<>>new-topic", - "jobId": 1 - }, - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", - "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user1", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" + "messageId": "6f27a4da-cefb-4800-acf1-f467e8aab91c", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "ResponseRules": null + "topic": "new-topic" }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" + "metadata": { + "rudderId": "user1<<>>new-topic", + "jobId": 1 }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "message": { - "userId": "user2", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", - "userId": "user2", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" }, - "library": { - "name": "http" - } + "ResponseRules": null }, - "messageId": "fe182d9e-e86e-4db5-ae12-f4b399555fcc", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" - }, - "topic": "new-topic" - }, - "metadata": { - "rudderId": "user2<<>>new-topic", - "jobId": 2 - }, - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] - }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" - }, - "ResponseRules": null - }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" - }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "message": { - "userId": "user3", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", - "userId": "user3", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" - }, - "library": { - "name": "http" - } + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" }, - "messageId": "2803e656-77ff-47ca-9606-90663f9aed38", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" - }, - "topic": "new-topic" - }, - "metadata": { - "rudderId": "user3<<>>new-topic", - "jobId": 3 + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } }, - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", - "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] - }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" - }, - "ResponseRules": null - }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" - }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "message": { - "userId": "user4", + { "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", - "userId": "user4", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" - }, - "library": { - "name": "http" - } - }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" - }, - "topic": "new-topic" - }, - "metadata": { - "rudderId": "user4<<>>new-topic", - "jobId": 4 - }, - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", - "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] - }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" - }, - "ResponseRules": null - }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" - }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "message": { - "userId": "user5", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", - "userId": "user5", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" - }, - "library": { - "name": "http" - } - }, - "integrations": { - "All": true, - "KAFKA": { - "schemaId": "schema001" - } - }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" - }, - "schemaId": "schema001", - "topic": "new-topic" - }, - "metadata": { - "rudderId": "user5<<>>new-topic", - "jobId": 5 - }, - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", - "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] - }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" - }, - "ResponseRules": null - }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" - }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "message": { - "userId": "user6", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", - "userId": "user6", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" - }, - "library": { - "name": "http" - } - }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-1" - } - }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" - }, - "topic": "topic-1" - }, - "metadata": { - "rudderId": "user6<<>>topic-1", - "jobId": 6 - }, - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", - "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] - }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" - }, - "ResponseRules": null - }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" - }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "message": { - "userId": "user7", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", - "userId": "user7", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" - }, - "library": { - "name": "http" - } - }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-1" - } - }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" - }, - "topic": "topic-1" - }, - "metadata": { - "rudderId": "user7<<>>topic-1", - "jobId": 7 - }, - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", - "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] + "userId": "user2", + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user2", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" + "messageId": "fe182d9e-e86e-4db5-ae12-f4b399555fcc", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "ResponseRules": null + "topic": "new-topic" }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" + "metadata": { + "rudderId": "user2<<>>new-topic", + "jobId": 2 }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "message": { - "userId": "user8", + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": null + }, + "Config": { + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + }, + { "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", - "userId": "user8", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "userId": "user3", + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user3", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } + "messageId": "2803e656-77ff-47ca-9606-90663f9aed38", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-2" - } + "topic": "new-topic" + }, + "metadata": { + "rudderId": "user3<<>>new-topic", + "jobId": 3 + }, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": null }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "Config": { + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + }, + { + "message": { + "userId": "user4", + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user4", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + }, + "topic": "new-topic" }, - "topic": "topic-2" + "metadata": { + "rudderId": "user4<<>>new-topic", + "jobId": 4 + }, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": null + }, + "Config": { + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } }, - "metadata": { - "rudderId": "user8<<>>topic-2", - "jobId": 8 + { + "message": { + "userId": "user5", + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user5", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } + }, + "integrations": { + "All": true, + "KAFKA": { + "schemaId": "schema001" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + }, + "schemaId": "schema001", + "topic": "new-topic" + }, + "metadata": { + "rudderId": "user5<<>>new-topic", + "jobId": 5 + }, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": null + }, + "Config": { + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } }, - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", + { + "message": { + "userId": "user6", + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user6", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } + }, + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-1" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + }, + "topic": "topic-1" + }, + "metadata": { + "rudderId": "user6<<>>topic-1", + "jobId": 6 + }, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": null + }, "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + }, + { + "message": { + "userId": "user7", + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user7", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-1" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "ResponseRules": null + "topic": "topic-1" }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" + "metadata": { + "rudderId": "user7<<>>topic-1", + "jobId": 7 }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "message": { - "userId": "user8", + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": null + }, + "Config": { + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + }, + { "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", "userId": "user8", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user8", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } - }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-2" - } + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-2" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "topic": "topic-2" }, - "topic": "topic-2" - }, - "metadata": { - "rudderId": "user8<<>>topic-2", - "jobId": 9 + "metadata": { + "rudderId": "user8<<>>topic-2", + "jobId": 8 + }, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": null + }, + "Config": { + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } }, - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", + { + "message": { + "userId": "user8", + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user8", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } + }, + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-2" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + }, + "topic": "topic-2" + }, + "metadata": { + "rudderId": "user8<<>>topic-2", + "jobId": 9 + }, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": null + }, "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + }, + { + "message": { + "userId": "user8", + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user8", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-3" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "ResponseRules": null + "topic": "topic-3" }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" + "metadata": { + "rudderId": "user8<<>>topic-3", + "jobId": 10 }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" + }, + "ResponseRules": null + }, + "Config": { + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" + }, + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } } - }, + ], { "message": { - "userId": "user8", + "userId": "user1", "message": { "type": "identify", "sentAt": "2021-07-08T02:45:11.329+05:30", - "userId": "user8", + "userId": "user1", "context": { "ip": "14.5.67.21", "traits": { @@ -820,24 +914,18 @@ "name": "http" } }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-3" - } - }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "messageId": "6f27a4da-cefb-4800-acf1-f467e8aab91c", "timestamp": "2020-02-02T00:23:09.544Z", "receivedAt": "2021-07-08T02:45:06.851+05:30", "request_ip": "[::1]", "anonymousId": "anon-id-new", "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "topic": "topic-3" + "topic": "new-topic" }, "metadata": { - "rudderId": "user8<<>>topic-3", - "jobId": 10 + "rudderId": "user1<<>>new-topic", + "jobId": 1 }, "destination": { "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", diff --git a/test/__tests__/data/kafka_batch_output.json b/test/__tests__/data/kafka_batch_output.json index d4c63523a7..a0e2d935e4 100644 --- a/test/__tests__/data/kafka_batch_output.json +++ b/test/__tests__/data/kafka_batch_output.json @@ -1,564 +1,582 @@ [ - { - "batchedRequest": [ - { - "userId": "user1", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + [ + { + "batchedRequest": [ + { "userId": "user1", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user1", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } + "messageId": "6f27a4da-cefb-4800-acf1-f467e8aab91c", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "messageId": "6f27a4da-cefb-4800-acf1-f467e8aab91c", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "topic": "new-topic" }, - "topic": "new-topic" - }, - { - "userId": "user2", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + { "userId": "user2", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user2", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } + "messageId": "fe182d9e-e86e-4db5-ae12-f4b399555fcc", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "messageId": "fe182d9e-e86e-4db5-ae12-f4b399555fcc", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "topic": "new-topic" }, - "topic": "new-topic" - }, - { - "userId": "user3", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + { "userId": "user3", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user3", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } + "messageId": "2803e656-77ff-47ca-9606-90663f9aed38", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "messageId": "2803e656-77ff-47ca-9606-90663f9aed38", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "topic": "new-topic" }, - "topic": "new-topic" - }, - { - "userId": "user4", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + { "userId": "user4", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user4", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "topic": "new-topic" }, - "topic": "new-topic" - }, - { - "userId": "user5", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + { "userId": "user5", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user5", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } + "integrations": { + "All": true, + "KAFKA": { + "schemaId": "schema001" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "integrations": { - "All": true, - "KAFKA": { - "schemaId": "schema001" - } + "schemaId": "schema001", + "topic": "new-topic" + } + ], + "metadata": [ + { "rudderId": "user1<<>>new-topic", "jobId": 1 }, + { + "rudderId": "user2<<>>new-topic", + "jobId": 2 + }, + { + "rudderId": "user3<<>>new-topic", + "jobId": 3 + }, + { + "rudderId": "user4<<>>new-topic", + "jobId": 4 + }, + { + "rudderId": "user5<<>>new-topic", + "jobId": 5 + } + ], + "batched": true, + "statusCode": 200, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "ResponseRules": null }, - "schemaId": "schema001", - "topic": "new-topic" - } - ], - "metadata": [ - { "rudderId": "user1<<>>new-topic", "jobId": 1 }, - { - "rudderId": "user2<<>>new-topic", - "jobId": 2 - }, - { - "rudderId": "user3<<>>new-topic", - "jobId": 3 - }, - { - "rudderId": "user4<<>>new-topic", - "jobId": 4 - }, - { - "rudderId": "user5<<>>new-topic", - "jobId": 5 - } - ], - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] - }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" }, - "ResponseRules": null - }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" - }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "batchedRequest": [ - { - "userId": "user6", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + }, + { + "batchedRequest": [ + { "userId": "user6", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user6", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } - }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-1" - } + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-1" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "topic": "topic-1" }, - "topic": "topic-1" - }, - { - "userId": "user7", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + { "userId": "user7", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user7", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } + }, + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-1" + } }, - "library": { - "name": "http" - } + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-1" - } + "topic": "topic-1" + } + ], + "metadata": [ + { + "rudderId": "user6<<>>topic-1", + "jobId": 6 + }, + { + "rudderId": "user7<<>>topic-1", + "jobId": 7 + } + ], + "batched": true, + "statusCode": 200, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "ResponseRules": null }, - "topic": "topic-1" - } - ], - "metadata": [ - { - "rudderId": "user6<<>>topic-1", - "jobId": 6 - }, - { - "rudderId": "user7<<>>topic-1", - "jobId": 7 - } - ], - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] - }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" }, - "ResponseRules": null - }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" - }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "batchedRequest": [ - { - "userId": "user8", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + }, + { + "batchedRequest": [ + { "userId": "user8", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user8", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } - }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-2" - } + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-2" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "topic": "topic-2" }, - "topic": "topic-2" - }, - { - "userId": "user8", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + { "userId": "user8", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user8", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } }, - "library": { - "name": "http" - } + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-2" + } + }, + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-2" - } + "topic": "topic-2" + } + ], + "metadata": [ + { + "rudderId": "user8<<>>topic-2", + "jobId": 8 + }, + { + "rudderId": "user8<<>>topic-2", + "jobId": 9 + } + ], + "batched": true, + "statusCode": 200, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "ResponseRules": null }, - "topic": "topic-2" - } - ], - "metadata": [ - { - "rudderId": "user8<<>>topic-2", - "jobId": 8 - }, - { - "rudderId": "user8<<>>topic-2", - "jobId": 9 - } - ], - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] - }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" }, - "ResponseRules": null - }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" - }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true - } - }, - { - "batchedRequest": [ - { - "userId": "user8", - "message": { - "type": "identify", - "sentAt": "2021-07-08T02:45:11.329+05:30", + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + }, + { + "batchedRequest": [ + { "userId": "user8", - "context": { - "ip": "14.5.67.21", - "traits": { - "trait1": "new-val" + "message": { + "type": "identify", + "sentAt": "2021-07-08T02:45:11.329+05:30", + "userId": "user8", + "context": { + "ip": "14.5.67.21", + "traits": { + "trait1": "new-val" + }, + "library": { + "name": "http" + } + }, + "integrations": { + "All": true, + "KAFKA": { + "topic": "topic-3" + } }, - "library": { - "name": "http" - } + "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", + "timestamp": "2020-02-02T00:23:09.544Z", + "receivedAt": "2021-07-08T02:45:06.851+05:30", + "request_ip": "[::1]", + "anonymousId": "anon-id-new", + "originalTimestamp": "2021-07-08T02:45:11.329+05:30" }, - "integrations": { - "All": true, - "KAFKA": { - "topic": "topic-3" - } + "topic": "topic-3" + } + ], + "metadata": [ + { + "rudderId": "user8<<>>topic-3", + "jobId": 10 + } + ], + "batched": true, + "statusCode": 200, + "destination": { + "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", + "Name": "local-kafka-test", + "DestinationDefinition": { + "ID": "1c81NzcId5roSqvQ1R57zhIsC01", + "Name": "KAFKA", + "DisplayName": "Apache Kafka", + "Config": { + "destConfig": { + "defaultConfig": [ + "hostName", + "port", + "topic", + "sslEnabled", + "caCertificate", + "useSASL", + "saslType", + "username", + "password" + ] + }, + "excludeKeys": [], + "includeKeys": [], + "saveDestinationResponse": true, + "secretKeys": ["password"], + "supportedSourceTypes": [ + "android", + "ios", + "web", + "unity", + "amp", + "cloud", + "warehouse", + "reactnative", + "flutter" + ], + "transformAt": "processor", + "transformAtV1": "processor" }, - "messageId": "87c98fc2-561b-4631-8cd6-1d02cdd1429f", - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": "2021-07-08T02:45:06.851+05:30", - "request_ip": "[::1]", - "anonymousId": "anon-id-new", - "originalTimestamp": "2021-07-08T02:45:11.329+05:30" + "ResponseRules": null }, - "topic": "topic-3" - } - ], - "metadata": [ - { - "rudderId": "user8<<>>topic-3", - "jobId": 10 - } - ], - "destination": { - "ID": "1uzGR4rn915R6Xts9KRfWAzmgGL", - "Name": "local-kafka-test", - "DestinationDefinition": { - "ID": "1c81NzcId5roSqvQ1R57zhIsC01", - "Name": "KAFKA", - "DisplayName": "Apache Kafka", "Config": { - "destConfig": { - "defaultConfig": [ - "hostName", - "port", - "topic", - "sslEnabled", - "caCertificate", - "useSASL", - "saslType", - "username", - "password" - ] - }, - "excludeKeys": [], - "includeKeys": [], - "saveDestinationResponse": true, - "secretKeys": ["password"], - "supportedSourceTypes": [ - "android", - "ios", - "web", - "unity", - "amp", - "cloud", - "warehouse", - "reactnative", - "flutter" - ], - "transformAt": "processor", - "transformAtV1": "processor" + "caCertificate": "caCertificate", + "hostName": "localhost", + "password": "password", + "port": "29092", + "saslType": "sha256", + "sslEnabled": true, + "topic": "new-topic", + "useSASL": true, + "username": "username" }, - "ResponseRules": null - }, - "Config": { - "caCertificate": "caCertificate", - "hostName": "localhost", - "password": "password", - "port": "29092", - "saslType": "sha256", - "sslEnabled": true, - "topic": "new-topic", - "useSASL": true, - "username": "username" - }, - "Enabled": true, - "Transformations": [], - "IsProcessorEnabled": true + "Enabled": true, + "Transformations": [], + "IsProcessorEnabled": true + } + } + ], + [ + { + "batched": false, + "error": "Invalid event array", + "metadata": null, + "statusCode": 400 } - } + ] ] diff --git a/test/__tests__/kafka.test.js b/test/__tests__/kafka.test.js index abd85fadaf..c824b9612f 100644 --- a/test/__tests__/kafka.test.js +++ b/test/__tests__/kafka.test.js @@ -34,13 +34,21 @@ describe("Tests", () => { }); }); - test(`${name} Batching Tests`, () => { - const inputData = JSON.parse(batchInputDataFile); - const batchExpectedData = JSON.parse(batchOutputDataFile); - const output = transformer.batch(inputData); - expect(Array.isArray(output)).toEqual(true); - expect(output).toEqual(batchExpectedData); - }); + const batchInputData = JSON.parse(batchInputDataFile); + const batchExpectedData = JSON.parse(batchOutputDataFile); + batchInputData.forEach((input, index) => { + test(`Batching Tests ${index}`, () => { + const output = transformer.batch(input); + expect(Array.isArray(output)).toEqual(true); + expect(output.length).toEqual(batchExpectedData[index].length); + output.forEach((input, indexInner) => { + expect(output[indexInner]).toEqual( + batchExpectedData[index][indexInner] + ); + }); + }); + }); + test(`${name} Metadata parse test`, done => { const inputData = JSON.parse(dataWithMetadata);