Skip to content

Commit

Permalink
chore: skip users and tracks tables with destConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Mar 26, 2024
1 parent f658a36 commit e11a78c
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 117 deletions.
15 changes: 7 additions & 8 deletions src/v0/destinations/azure_datalake/transform.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
const { processWarehouseMessage } = require('../../../warehouse');

const azureDatalake = 'azure_datalake';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}
const provider = 'azure_datalake';

function getDataTypeOverride() {}

function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const provider = azureDatalake;
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
getDataTypeOverride,
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destConfig: event.destination?.Config,
});
}

exports.process = process;
module.exports = {
provider,
process,
};
11 changes: 4 additions & 7 deletions src/v0/destinations/azure_synapse/transform.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
const { processWarehouseMessage } = require('../../../warehouse');

const azureSynapse = 'azure_synapse';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}
const provider = 'azure_synapse';

function getDataTypeOverride() {}

function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const provider = azureSynapse;
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
getDataTypeOverride,
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destConfig: event.destination?.Config,
});
}

module.exports = {
provider,
process,
getDataTypeOverride,
};
11 changes: 4 additions & 7 deletions src/v0/destinations/bq/transform.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
const { processWarehouseMessage } = require('../../../warehouse');

const bigquery = 'bq';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}
const provider = 'bq';

function getDataTypeOverride() {}

Expand All @@ -13,8 +9,7 @@ function process(event) {
const whIDResolve = event.request.query.whIDResolve === 'true' || false;
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const destJsonPaths = event.destination?.Config?.jsonPaths || '';
const provider = bigquery;
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
Expand All @@ -23,10 +18,12 @@ function process(event) {
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destJsonPaths,
destConfig: event.destination?.Config,
});
}

module.exports = {
provider,
process,
getDataTypeOverride,
};
59 changes: 26 additions & 33 deletions src/v0/destinations/clickhouse/transform.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,57 @@
const { processWarehouseMessage } = require('../../../warehouse');
const { getDataType } = require('../../../warehouse/index');

const clickhouse = 'clickhouse';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}
const provider = 'clickhouse';

function getDataTypeOverride(key, val, options) {
if (options.chEnableArraySupport === 'false') {
return 'string';
}
if (Array.isArray(val)) {
// for now returning it as string. confirm this case
if (val.length === 0) {
return 'string';
}
// check for different data types in the array. if there are different then return array(string)
const firstValueDataType = getDataType(key, val[0], {});
let finalDataType = firstValueDataType;
for (let i = 1; i < val.length; i += 1) {
const dataType = getDataType(key, val[i], {});
if (finalDataType !== dataType) {
if (finalDataType === 'string') {
break;
}
if (dataType === 'float' && finalDataType === 'int') {
finalDataType = 'float';
// eslint-disable-next-line no-continue
continue;
}
if (dataType === 'int' && finalDataType === 'float') {
// eslint-disable-next-line no-continue
continue;
}
finalDataType = 'string';
if (!Array.isArray(val) || val.length === 0) {
return 'string';
}

// check for different data types in the array. if there are different -> return array(string)
let finalDataType = getDataType(key, val[0], {});
for (let i = 1; i < val.length; i += 1) {
const dataType = getDataType(key, val[i], {});
if (finalDataType !== dataType) {
if (finalDataType === 'string') {
break;

Check warning on line 20 in src/v0/destinations/clickhouse/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/clickhouse/transform.js#L20

Added line #L20 was not covered by tests
}
if (dataType === 'float' && finalDataType === 'int') {
finalDataType = 'float';
// eslint-disable-next-line no-continue
continue;
}
if (dataType === 'int' && finalDataType === 'float') {
// eslint-disable-next-line no-continue
continue;
}
finalDataType = 'string';
}
return `array(${finalDataType})`;
}
return 'string';
return `array(${finalDataType})`;
}

function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const provider = clickhouse;
const chEnableArraySupport = event.request.query.chEnableArraySupport || 'false';
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
getDataTypeOverride,
provider,
chEnableArraySupport,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destConfig: event.destination?.Config,
});
}

module.exports = {
provider,
process,
getDataTypeOverride,
};
19 changes: 8 additions & 11 deletions src/v0/destinations/deltalake/transform.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
const { processWarehouseMessage } = require('../../../warehouse');

const deltalake = 'deltalake';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}

function getDataTypeOverride() {}
const provider = 'deltalake';

function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const provider = deltalake;
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
getDataTypeOverride,
getDataTypeOverride: () => {},
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destConfig: event.destination?.Config,
});
}

exports.process = process;
module.exports = {
provider,
process,
};
19 changes: 8 additions & 11 deletions src/v0/destinations/gcs_datalake/transform.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
const { processWarehouseMessage } = require('../../../warehouse');

const gcsDatalake = 'gcs_datalake';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}

function getDataTypeOverride() {}
const provider = 'gcs_datalake';

function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const provider = gcsDatalake;
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
getDataTypeOverride,
getDataTypeOverride: () => {},
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destConfig: event.destination?.Config,
});
}

exports.process = process;
module.exports = {
provider,
process,
};
12 changes: 4 additions & 8 deletions src/v0/destinations/mssql/transform.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
const { processWarehouseMessage } = require('../../../warehouse');

const mssql = 'mssql';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}

const provider = 'mssql';
function getDataTypeOverride() {}

function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const provider = mssql;
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
getDataTypeOverride,
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destConfig: event.destination?.Config,
});
}

module.exports = {
provider,
process,
getDataTypeOverride,
};
11 changes: 4 additions & 7 deletions src/v0/destinations/postgres/transform.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
const { processWarehouseMessage } = require('../../../warehouse');

const postgres = 'postgres';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}
const provider = 'postgres';

function getDataTypeOverride(key, val, options, jsonKey = false) {
if (key === 'violationErrors' || jsonKey) {
Expand All @@ -17,19 +13,20 @@ function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const destJsonPaths = event.destination?.Config?.jsonPaths || '';
const provider = postgres;
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
getDataTypeOverride,
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destJsonPaths,
destConfig: event.destination?.Config,
});
}

module.exports = {
provider,
process,
getDataTypeOverride,
};
11 changes: 4 additions & 7 deletions src/v0/destinations/rs/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ const { processWarehouseMessage } = require('../../../warehouse');

// redshift destination string limit, if the string length crosses 512 we will change data type to text which is varchar(max) in redshift
const RSStringLimit = 512;
const redshift = 'rs';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}
const provider = 'rs';

function getDataTypeOverride(key, val, options, jsonKey = false) {
if (jsonKey) {
Expand All @@ -26,19 +22,20 @@ function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const destJsonPaths = event.destination?.Config?.jsonPaths || '';
const provider = redshift;
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
getDataTypeOverride,
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destJsonPaths,
destConfig: event.destination?.Config,
});
}

module.exports = {
provider,
process,
getDataTypeOverride,
};
12 changes: 4 additions & 8 deletions src/v0/destinations/s3_datalake/transform.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
const { processWarehouseMessage } = require('../../../warehouse');

// use postgres providers for s3-datalake
const s3datalakeProvider = 's3_datalake';

function processSingleMessage(message, options) {
return processWarehouseMessage(message, options);
}

const provider = 's3_datalake';
function getDataTypeOverride() {}

function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const provider = s3datalakeProvider;
return processSingleMessage(event.message, {
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
getDataTypeOverride,
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destConfig: event.destination?.Config,
});
}

module.exports = {
provider,
process,
getDataTypeOverride,
};
Loading

0 comments on commit e11a78c

Please sign in to comment.