From 7d80161b38690f204d24c6ba03b74a865450cd92 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 26 Mar 2024 12:56:12 +0530 Subject: [PATCH] chore: skip users and tracks tables with destConfig --- .../destinations/azure_datalake/transform.js | 15 +++-- .../destinations/azure_synapse/transform.js | 11 ++-- src/v0/destinations/bq/transform.js | 11 ++-- src/v0/destinations/clickhouse/transform.js | 59 ++++++++----------- src/v0/destinations/deltalake/transform.js | 19 +++--- src/v0/destinations/gcs_datalake/transform.js | 19 +++--- src/v0/destinations/mssql/transform.js | 12 ++-- src/v0/destinations/postgres/transform.js | 11 ++-- src/v0/destinations/rs/transform.js | 11 ++-- src/v0/destinations/s3_datalake/transform.js | 12 ++-- src/v0/destinations/snowflake/transform.js | 11 ++-- src/warehouse/index.js | 8 ++- .../data/warehouse/dest_config_scenarios.js | 0 13 files changed, 82 insertions(+), 117 deletions(-) create mode 100644 test/__tests__/data/warehouse/dest_config_scenarios.js diff --git a/src/v0/destinations/azure_datalake/transform.js b/src/v0/destinations/azure_datalake/transform.js index 8d29c70e06..6c97e8671f 100644 --- a/src/v0/destinations/azure_datalake/transform.js +++ b/src/v0/destinations/azure_datalake/transform.js @@ -1,25 +1,24 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const azureDatalake = 'azure_datalake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'azure_datalake'; function getDataTypeOverride() {} function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = azureDatalake; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, getDataTypeOverride, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } -exports.process = process; +module.exports = { + provider, + process, +}; diff --git a/src/v0/destinations/azure_synapse/transform.js b/src/v0/destinations/azure_synapse/transform.js index d98f269475..a80ad7cbdc 100644 --- a/src/v0/destinations/azure_synapse/transform.js +++ b/src/v0/destinations/azure_synapse/transform.js @@ -1,28 +1,25 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const azureSynapse = 'azure_synapse'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'azure_synapse'; function getDataTypeOverride() {} function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = azureSynapse; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, getDataTypeOverride, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/bq/transform.js b/src/v0/destinations/bq/transform.js index e9e496f6a4..8c8be140a9 100644 --- a/src/v0/destinations/bq/transform.js +++ b/src/v0/destinations/bq/transform.js @@ -1,10 +1,6 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const bigquery = 'bq'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'bq'; function getDataTypeOverride() {} @@ -13,8 +9,7 @@ function process(event) { const whIDResolve = event.request.query.whIDResolve === 'true' || false; const whStoreEvent = event.destination.Config.storeFullEvent === true; const destJsonPaths = event.destination?.Config?.jsonPaths || ''; - const provider = bigquery; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -23,10 +18,12 @@ function process(event) { provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, destJsonPaths, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/clickhouse/transform.js b/src/v0/destinations/clickhouse/transform.js index 24158cc41f..491475419c 100644 --- a/src/v0/destinations/clickhouse/transform.js +++ b/src/v0/destinations/clickhouse/transform.js @@ -1,53 +1,44 @@ const { processWarehouseMessage } = require('../../../warehouse'); const { getDataType } = require('../../../warehouse/index'); -const clickhouse = 'clickhouse'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'clickhouse'; function getDataTypeOverride(key, val, options) { if (options.chEnableArraySupport === 'false') { return 'string'; } - if (Array.isArray(val)) { - // for now returning it as string. confirm this case - if (val.length === 0) { - return 'string'; - } - // check for different data types in the array. if there are different then return array(string) - const firstValueDataType = getDataType(key, val[0], {}); - let finalDataType = firstValueDataType; - for (let i = 1; i < val.length; i += 1) { - const dataType = getDataType(key, val[i], {}); - if (finalDataType !== dataType) { - if (finalDataType === 'string') { - break; - } - if (dataType === 'float' && finalDataType === 'int') { - finalDataType = 'float'; - // eslint-disable-next-line no-continue - continue; - } - if (dataType === 'int' && finalDataType === 'float') { - // eslint-disable-next-line no-continue - continue; - } - finalDataType = 'string'; + if (!Array.isArray(val) || val.length === 0) { + return 'string'; + } + + // check for different data types in the array. if there are different -> return array(string) + let finalDataType = getDataType(key, val[0], {}); + for (let i = 1; i < val.length; i += 1) { + const dataType = getDataType(key, val[i], {}); + if (finalDataType !== dataType) { + if (finalDataType === 'string') { + break; + } + if (dataType === 'float' && finalDataType === 'int') { + finalDataType = 'float'; + // eslint-disable-next-line no-continue + continue; + } + if (dataType === 'int' && finalDataType === 'float') { + // eslint-disable-next-line no-continue + continue; } + finalDataType = 'string'; } - return `array(${finalDataType})`; } - return 'string'; + return `array(${finalDataType})`; } function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = clickhouse; const chEnableArraySupport = event.request.query.chEnableArraySupport || 'false'; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -55,10 +46,12 @@ function process(event) { provider, chEnableArraySupport, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/deltalake/transform.js b/src/v0/destinations/deltalake/transform.js index 49d40131d4..637b64cf36 100644 --- a/src/v0/destinations/deltalake/transform.js +++ b/src/v0/destinations/deltalake/transform.js @@ -1,25 +1,22 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const deltalake = 'deltalake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} - -function getDataTypeOverride() {} +const provider = 'deltalake'; function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = deltalake; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, - getDataTypeOverride, + getDataTypeOverride: () => {}, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } -exports.process = process; +module.exports = { + provider, + process, +}; diff --git a/src/v0/destinations/gcs_datalake/transform.js b/src/v0/destinations/gcs_datalake/transform.js index 3e5f1dcfa3..366dcf3483 100644 --- a/src/v0/destinations/gcs_datalake/transform.js +++ b/src/v0/destinations/gcs_datalake/transform.js @@ -1,25 +1,22 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const gcsDatalake = 'gcs_datalake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} - -function getDataTypeOverride() {} +const provider = 'gcs_datalake'; function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = gcsDatalake; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, - getDataTypeOverride, + getDataTypeOverride: () => {}, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } -exports.process = process; +module.exports = { + provider, + process, +}; diff --git a/src/v0/destinations/mssql/transform.js b/src/v0/destinations/mssql/transform.js index 2baadebdee..12dd7b40c6 100644 --- a/src/v0/destinations/mssql/transform.js +++ b/src/v0/destinations/mssql/transform.js @@ -1,28 +1,24 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const mssql = 'mssql'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} - +const provider = 'mssql'; function getDataTypeOverride() {} function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = mssql; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, getDataTypeOverride, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/postgres/transform.js b/src/v0/destinations/postgres/transform.js index 32c6b0a069..b57bf4369a 100644 --- a/src/v0/destinations/postgres/transform.js +++ b/src/v0/destinations/postgres/transform.js @@ -1,10 +1,6 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const postgres = 'postgres'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'postgres'; function getDataTypeOverride(key, val, options, jsonKey = false) { if (key === 'violationErrors' || jsonKey) { @@ -17,8 +13,7 @@ function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; const destJsonPaths = event.destination?.Config?.jsonPaths || ''; - const provider = postgres; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -26,10 +21,12 @@ function process(event) { provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, destJsonPaths, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/rs/transform.js b/src/v0/destinations/rs/transform.js index f051ff49d5..781600a8e2 100644 --- a/src/v0/destinations/rs/transform.js +++ b/src/v0/destinations/rs/transform.js @@ -2,11 +2,7 @@ const { processWarehouseMessage } = require('../../../warehouse'); // redshift destination string limit, if the string length crosses 512 we will change data type to text which is varchar(max) in redshift const RSStringLimit = 512; -const redshift = 'rs'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'rs'; function getDataTypeOverride(key, val, options, jsonKey = false) { if (jsonKey) { @@ -26,8 +22,7 @@ function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; const destJsonPaths = event.destination?.Config?.jsonPaths || ''; - const provider = redshift; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -35,10 +30,12 @@ function process(event) { provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, destJsonPaths, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/s3_datalake/transform.js b/src/v0/destinations/s3_datalake/transform.js index 7013224faa..8bbfa1556d 100644 --- a/src/v0/destinations/s3_datalake/transform.js +++ b/src/v0/destinations/s3_datalake/transform.js @@ -1,29 +1,25 @@ const { processWarehouseMessage } = require('../../../warehouse'); // use postgres providers for s3-datalake -const s3datalakeProvider = 's3_datalake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} - +const provider = 's3_datalake'; function getDataTypeOverride() {} function process(event) { const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; const whStoreEvent = event.destination.Config.storeFullEvent === true; - const provider = s3datalakeProvider; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, getDataTypeOverride, provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/v0/destinations/snowflake/transform.js b/src/v0/destinations/snowflake/transform.js index 7682d13db3..bf53c57978 100644 --- a/src/v0/destinations/snowflake/transform.js +++ b/src/v0/destinations/snowflake/transform.js @@ -1,10 +1,6 @@ const { processWarehouseMessage } = require('../../../warehouse'); -const snowflake = 'snowflake'; - -function processSingleMessage(message, options) { - return processWarehouseMessage(message, options); -} +const provider = 'snowflake'; function getDataTypeOverride(key, val, options, jsonKey = false) { if (key === 'violationErrors' || jsonKey) { @@ -18,8 +14,7 @@ function process(event) { const whIDResolve = event.request.query.whIDResolve === 'true' || false; const whStoreEvent = event.destination.Config.storeFullEvent === true; const destJsonPaths = event.destination?.Config?.jsonPaths || ''; - const provider = snowflake; - return processSingleMessage(event.message, { + return processWarehouseMessage(event.message, { metadata: event.metadata, whSchemaVersion, whStoreEvent, @@ -28,10 +23,12 @@ function process(event) { provider, sourceCategory: event.metadata ? event.metadata.sourceCategory : null, destJsonPaths, + destConfig: event.destination?.Config, }); } module.exports = { + provider, process, getDataTypeOverride, }; diff --git a/src/warehouse/index.js b/src/warehouse/index.js index b3d1c5e4bc..f62f02af79 100644 --- a/src/warehouse/index.js +++ b/src/warehouse/index.js @@ -154,7 +154,7 @@ function setDataFromColumnMappingAndComputeColumnTypes( const columnName = utils.safeColumnName(options, key); // do not set column if val is null/empty/object if (typeof val === 'object' || isBlank(val)) { - // delete in output and columnTypes, so as to remove if we user + // delete in output and columnTypes, to remove if the user // has set property with same name // eslint-disable-next-line no-param-reassign delete output[columnName]; @@ -565,8 +565,10 @@ function processWarehouseMessage(message, options) { : {}; const responses = []; const eventType = message.type?.toLowerCase(); - const skipTracksTable = options.integrationOptions.skipTracksTable || false; - const skipUsersTable = options.integrationOptions.skipUsersTable || false; + const skipTracksTable = + options.destConfig?.skipTracksTable || options.integrationOptions.skipTracksTable || false; + const skipUsersTable = + options.destConfig?.skipUsersTable || options.integrationOptions.skipUsersTable || false; const skipReservedKeywordsEscaping = options.integrationOptions.skipReservedKeywordsEscaping || false; diff --git a/test/__tests__/data/warehouse/dest_config_scenarios.js b/test/__tests__/data/warehouse/dest_config_scenarios.js new file mode 100644 index 0000000000..e69de29bb2