diff --git a/src/v0/destinations/azure_datalake/transform.js b/src/v0/destinations/azure_datalake/transform.js index 8d29c70e06..6c97e8671f 100644 --- a/src/v0/destinations/azure_datalake/transform.js +++ b/src/v0/destinations/azure_datalake/transform.js @@ -1,25 +1,24 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const azureDatalake = 'azure_datalake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'azure_datalake'; function getDataTypeOverride() {} function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = azureDatalake; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, getDataTypeOverride, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } -exports.process = process; +module.exports = { + provider, + process, +}; diff --git a/src/v0/destinations/azure_synapse/transform.js b/src/v0/destinations/azure_synapse/transform.js index d98f269475..a80ad7cbdc 100644 --- a/src/v0/destinations/azure_synapse/transform.js +++ b/src/v0/destinations/azure_synapse/transform.js @@ -1,28 +1,25 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const azureSynapse = 'azure_synapse'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'azure_synapse'; function getDataTypeOverride() {} function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = azureSynapse; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, getDataTypeOverride, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/bq/transform.js b/src/v0/destinations/bq/transform.js index e9e496f6a4..8c8be140a9 100644 --- a/src/v0/destinations/bq/transform.js +++ b/src/v0/destinations/bq/transform.js @@ -1,10 +1,6 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const bigquery = 'bq'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'bq'; function getDataTypeOverride() {} @@ -13,8 +9,7 @@ function process(event) { const whIDResolve = event.request.query.whIDResolve === 'true' || false; const whStoreEvent = event.destination.Config.storeFullEvent === true; const destJsonPaths = event.destination?.Config?.jsonPaths || ''; - const provider = bigquery; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -23,10 +18,12 @@ function process(event) { provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, destJsonPaths, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/clickhouse/transform.js b/src/v0/destinations/clickhouse/transform.js index 24158cc41f..491475419c 100644 --- a/src/v0/destinations/clickhouse/transform.js +++ b/src/v0/destinations/clickhouse/transform.js @@ -1,53 +1,44 @@ const { processWarehouseMessage } = require('../../../warehouse'); const { getDataType } = require('../../../warehouse/index'); -const clickhouse = 'clickhouse'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'clickhouse'; function getDataTypeOverride(key, val, options) { if (options.chEnableArraySupport === 'false') { return 'string'; } - if (Array.isArray(val)) { - // for now returning it as string. confirm this case - if (val.length === 0) { - return 'string'; - } - // check for different data types in the array. if there are different then return array(string) - const firstValueDataType = getDataType(key, val[0], {}); - let finalDataType = firstValueDataType; - for (let i = 1; i < val.length; i += 1) { - const dataType = getDataType(key, val[i], {}); - if (finalDataType !== dataType) { - if (finalDataType === 'string') { - break; - } - if (dataType === 'float' && finalDataType === 'int') { - finalDataType = 'float'; - // eslint-disable-next-line no-continue - continue; - } - if (dataType === 'int' && finalDataType === 'float') { - // eslint-disable-next-line no-continue - continue; - } - finalDataType = 'string'; + if (!Array.isArray(val) || val.length === 0) { + return 'string'; + } + + // check for different data types in the array. if there are different -> return array(string) + let finalDataType = getDataType(key, val[0], {}); + for (let i = 1; i < val.length; i += 1) { + const dataType = getDataType(key, val[i], {}); + if (finalDataType !== dataType) { + if (finalDataType === 'string') { + break; + } + if (dataType === 'float' && finalDataType === 'int') { + finalDataType = 'float'; + // eslint-disable-next-line no-continue + continue; + } + if (dataType === 'int' && finalDataType === 'float') { + // eslint-disable-next-line no-continue + continue; } + finalDataType = 'string'; } - return `array(${finalDataType})`; } - return 'string'; + return `array(${finalDataType})`; } function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = clickhouse; const chEnableArraySupport = event.request.query.chEnableArraySupport || 'false'; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -55,10 +46,12 @@ function process(event) { provider, chEnableArraySupport, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/deltalake/transform.js b/src/v0/destinations/deltalake/transform.js index 49d40131d4..637b64cf36 100644 --- a/src/v0/destinations/deltalake/transform.js +++ b/src/v0/destinations/deltalake/transform.js @@ -1,25 +1,22 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const deltalake = 'deltalake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} - -function getDataTypeOverride() {} +const provider = 'deltalake'; function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = deltalake; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, - getDataTypeOverride, + getDataTypeOverride: () => {}, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } -exports.process = process; +module.exports = { + provider, + process, +}; diff --git a/src/v0/destinations/gcs_datalake/transform.js b/src/v0/destinations/gcs_datalake/transform.js index 3e5f1dcfa3..366dcf3483 100644 --- a/src/v0/destinations/gcs_datalake/transform.js +++ b/src/v0/destinations/gcs_datalake/transform.js @@ -1,25 +1,22 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const gcsDatalake = 'gcs_datalake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} - -function getDataTypeOverride() {} +const provider = 'gcs_datalake'; function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = gcsDatalake; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, - getDataTypeOverride, + getDataTypeOverride: () => {}, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } -exports.process = process; +module.exports = { + provider, + process, +}; diff --git a/src/v0/destinations/mssql/transform.js b/src/v0/destinations/mssql/transform.js index 2baadebdee..12dd7b40c6 100644 --- a/src/v0/destinations/mssql/transform.js +++ b/src/v0/destinations/mssql/transform.js @@ -1,28 +1,24 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const mssql = 'mssql'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} - +const provider = 'mssql'; function getDataTypeOverride() {} function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = mssql; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, getDataTypeOverride, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/postgres/transform.js b/src/v0/destinations/postgres/transform.js index 32c6b0a069..b57bf4369a 100644 --- a/src/v0/destinations/postgres/transform.js +++ b/src/v0/destinations/postgres/transform.js @@ -1,10 +1,6 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const postgres = 'postgres'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'postgres'; function getDataTypeOverride(key, val, options, jsonKey = false) { if (key === 'violationErrors' || jsonKey) { @@ -17,8 +13,7 @@ function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; const destJsonPaths = event.destination?.Config?.jsonPaths || ''; - const provider = postgres; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -26,10 +21,12 @@ function process(event) { provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, destJsonPaths, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/rs/transform.js b/src/v0/destinations/rs/transform.js index f051ff49d5..781600a8e2 100644 --- a/src/v0/destinations/rs/transform.js +++ b/src/v0/destinations/rs/transform.js @@ -2,11 +2,7 @@ const { processWarehouseMessage } = require('../../../warehouse'); // redshift destination string limit, if the string length crosses 512 we will change data type to text which is varchar(max) in redshift const RSStringLimit = 512; -const redshift = 'rs'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'rs'; function getDataTypeOverride(key, val, options, jsonKey = false) { if (jsonKey) { @@ -26,8 +22,7 @@ function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; const destJsonPaths = event.destination?.Config?.jsonPaths || ''; - const provider = redshift; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -35,10 +30,12 @@ function process(event) { provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, destJsonPaths, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/s3_datalake/transform.js b/src/v0/destinations/s3_datalake/transform.js index 7013224faa..8bbfa1556d 100644 --- a/src/v0/destinations/s3_datalake/transform.js +++ b/src/v0/destinations/s3_datalake/transform.js @@ -1,29 +1,25 @@ const { processWarehouseMessage } = require('../../../warehouse'); // use postgres providers for s3-datalake -const s3datalakeProvider = 's3_datalake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} - +const provider = 's3_datalake'; function getDataTypeOverride() {} function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = s3datalakeProvider; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, getDataTypeOverride, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/snowflake/transform.js b/src/v0/destinations/snowflake/transform.js index 7682d13db3..bf53c57978 100644 --- a/src/v0/destinations/snowflake/transform.js +++ b/src/v0/destinations/snowflake/transform.js @@ -1,10 +1,6 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const snowflake = 'snowflake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'snowflake'; function getDataTypeOverride(key, val, options, jsonKey = false) { if (key === 'violationErrors' || jsonKey) { @@ -18,8 +14,7 @@ function process(event) { const whIDResolve = event.request.query.whIDResolve === 'true' || false; const whStoreEvent = event.destination.Config.storeFullEvent === true; const destJsonPaths = event.destination?.Config?.jsonPaths || ''; - const provider = snowflake; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -28,10 +23,12 @@ function process(event) { provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, destJsonPaths, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/warehouse/index.js b/src/warehouse/index.js index b3d1c5e4bc..f62f02af79 100644 --- a/src/warehouse/index.js +++ b/src/warehouse/index.js @@ -154,7 +154,7 @@ function setDataFromColumnMappingAndComputeColumnTypes( const columnName = utils.safeColumnName(options, key); // do not set column if val is null/empty/object if (typeof val === 'object' || isBlank(val)) { - // delete in output and columnTypes, so as to remove if we user + // delete in output and columnTypes, to remove if the user // has set property with same name // eslint-disable-next-line no-param-reassign delete output[columnName]; @@ -565,8 +565,10 @@ function processWarehouseMessage(message, options) { : {}; const responses = []; const eventType = message.type?.toLowerCase(); - const skipTracksTable = options.integrationOptions.skipTracksTable || false; - const skipUsersTable = options.integrationOptions.skipUsersTable || false; + const skipTracksTable = + options.destConfig?.skipTracksTable || options.integrationOptions.skipTracksTable || false; + const skipUsersTable = + options.destConfig?.skipUsersTable || options.integrationOptions.skipUsersTable || false; const skipReservedKeywordsEscaping = options.integrationOptions.skipReservedKeywordsEscaping || false; diff --git a/test/__tests__/data/warehouse/dest_config_scenarios.js b/test/__tests__/data/warehouse/dest_config_scenarios.js new file mode 100644 index 0000000000..0e290b7153 --- /dev/null +++ b/test/__tests__/data/warehouse/dest_config_scenarios.js @@ -0,0 +1,223 @@ +const _ = require("lodash"); + +const trackMessage = { + destination: { Config: {} }, + message: { + type: "track", + messageId: "my-track-message-id-1", + userId: "9bb5d4c2-a7aa-4a36-9efb-dd2b1aec5d33", + anonymousId: "e6ab2c5e-2cda-44a9-a962-e2f67df78bca", + channel: "web", + context: { + app: { + build: "1.0.0", + name: "RudderLabs JavaScript SDK", + namespace: "com.rudderlabs.javascript", + version: "1.0.5" + }, + ip: "0.0.0.0", + library: { + name: "RudderLabs JavaScript SDK", + version: "1.0.5" + }, + locale: "en-GB", + os: { + name: "", + version: "" + }, + screen: { density: 2 }, + traits: { + city: "Disney", + country: "USA", + email: "mickey@disney.com", + firstname: "Mickey" + }, + userAgent: "Mozilla/5.0 Chrome/79.0.3945.117 Safari/537.36" + }, + event: "groups", + integrations: { All: true }, + originalTimestamp: "2020-01-24T06:29:02.364Z", + properties: { currency: "USD" }, + receivedAt: "2020-01-24T11:59:02.403+05:30", + request_ip: "[::1]:53708", + sentAt: "2020-01-24T06:29:02.364Z", + timestamp: "2020-01-24T11:59:02.403+05:30" + }, + request: { query: { whSchemaVersion: "v1" } } +}; + +const identifyMessage = { + destination: { Config: {} }, + message: { + type: "identify", + messageId: "my-identify-message-id-1", + sentAt: "2021-01-03T17:02:53.195Z", + userId: "user123", + channel: "web", + integrations: { All: true }, + context: { + os: { + "name": "android", + "version": "1.12.3" + }, + app: { + name: "RudderLabs JavaScript SDK", + build: "1.0.0", + version: "1.1.11", + namespace: "com.rudderlabs.javascript" + }, + traits: { + email: "user123@email.com", + phone: "+917836362334", + userId: "user123" + }, + locale: "en-US", + device: { + token: "token", + id: "id", + type: "ios" + }, + library: { + name: "RudderLabs JavaScript SDK", + version: "1.1.11" + }, + userAgent: "Gecko/20100101 Firefox/84.0" + }, + rudderId: "8f8fa6b5-8e24-489c-8e22-61f23f2e364f", + anonymousId: "97c46c81-3140-456d-b2a9-690d70aaca35", + originalTimestamp: "2020-01-24T06:29:02.364Z", + receivedAt: "2020-01-24T11:59:02.403+05:30", + request_ip: "[::1]:53708", + timestamp: "2020-01-24T11:59:02.403+05:30" + }, + request: { query: { whSchemaVersion: "v1" } } +}; + +const scenarios = [ + { + name: "track event is not skipped when options are not provided", + skipUsersTable: null, + skipTracksTable: null, + event: _.cloneDeep(trackMessage), + expected: [ + { + id: "my-track-message-id-1", + table: "tracks" + }, + { + id: "my-track-message-id-1", + table: "_groups" + } + ] + }, + { + name: "track event is not skipped when skipTracksTable is false", + skipUsersTable: null, + skipTracksTable: false, + event: _.cloneDeep(trackMessage), + expected: [ + { + id: "my-track-message-id-1", + table: "tracks" + }, + { + id: "my-track-message-id-1", + table: "_groups" + } + ] + }, + { + name: "track event is skipped when skipTracksTable is true", + skipUsersTable: null, + skipTracksTable: true, + event: _.cloneDeep(trackMessage), + expected: [ + { + id: "my-track-message-id-1", + table: "_groups" + } + ] + }, + { + name: "track event is not affected by skipUsersTable", + skipUsersTable: true, + skipTracksTable: null, + event: _.cloneDeep(trackMessage), + expected: [ + { + id: "my-track-message-id-1", + table: "tracks" + }, + { + id: "my-track-message-id-1", + table: "_groups" + } + ] + }, + { + name: "user event is not skipped when options are not provided", + skipUsersTable: null, + skipTracksTable: null, + event: _.cloneDeep(identifyMessage), + expected: [ + { + id: "my-identify-message-id-1", + table: "identifies" + }, + { + id: "user123", + table: "users" + } + ] + }, + { + name: "user event is not skipped when skipUsersTable is false", + skipUsersTable: false, + skipTracksTable: null, + event: _.cloneDeep(identifyMessage), + expected: [ + { + id: "my-identify-message-id-1", + table: "identifies" + }, + { + id: "user123", + table: "users" + } + ] + }, + { + name: "user event is skipped when skipUsersTable is true", + skipUsersTable: true, + skipTracksTable: null, + event: _.cloneDeep(identifyMessage), + expected: [ + { + id: "my-identify-message-id-1", + table: "identifies" + } + ] + }, + { + name: "user event is not affected by skipTracksTable", + skipUsersTable: null, + skipTracksTable: true, + event: _.cloneDeep(identifyMessage), + expected: [ + { + id: "my-identify-message-id-1", + table: "identifies" + }, + { + id: "user123", + table: "users" + } + ] + }, +]; + +module.exports = { + scenarios: function() { + return _.cloneDeep(scenarios); + } +}; \ No newline at end of file diff --git a/test/__tests__/warehouse.test.js b/test/__tests__/warehouse.test.js index 772e59e65a..2c89120686 100644 --- a/test/__tests__/warehouse.test.js +++ b/test/__tests__/warehouse.test.js @@ -8,6 +8,7 @@ const { opOutput } = require(`./data/warehouse/integration_options_events.js`); const { names } = require(`./data/warehouse/names.js`); +const destConfig = require(`./data/warehouse/dest_config_scenarios.js`); const { largeNoOfColumnsevent } = require(`./data/warehouse/event_columns_length`); @@ -1009,6 +1010,28 @@ describe("Add receivedAt for events missing it", () => { }); describe("Integration options", () => { + describe("Destination config options", () => { + destConfig.scenarios().forEach(scenario => { + it(scenario.name, () => { + if (scenario.skipUsersTable !== null) { + scenario.event.destination.Config.skipUsersTable = scenario.skipUsersTable + } + if (scenario.skipTracksTable !== null) { + scenario.event.destination.Config.skipTracksTable = scenario.skipTracksTable + } + + transformers.forEach((transformer, index) => { + const received = transformer.process(scenario.event); + expect(received).toHaveLength(scenario.expected.length); + for (const i in received) { + const evt = received[i]; + expect(evt.data.id ? evt.data.id : evt.data.ID).toEqual(scenario.expected[i].id); + expect(evt.metadata.table.toLowerCase()).toEqual(scenario.expected[i].table); + } + }); + }); + }); + }); describe("track", () => { it("should generate two events for every track call", () => { const i = opInput("track");