Skip to content

Commit

Permalink
fix: json paths for all events for warehouse
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 8, 2023
1 parent dfd6af6 commit 67819a7
Show file tree
Hide file tree
Showing 18 changed files with 7,426 additions and 73 deletions.
114 changes: 77 additions & 37 deletions src/warehouse/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const {
isObject,
isBlank,
isValidJsonPathKey,
getKeysFromJsonPaths,
isValidLegacyJsonPathKey,
keysFromJsonPaths,
validTimestamp,
getVersionedUtils,
isRudderSourcesEvent,
Expand Down Expand Up @@ -195,45 +196,51 @@ function setDataFromColumnMappingAndComputeColumnTypes(
columnTypes = {context_library_name: 'string', context_library_version: 'string'}
*/

function setDataFromInputAndComputeColumnTypes(
utils,
eventType,
output,
input,
columnTypes,
options,
prefix = '',
level = 0,
utils,
eventType,
output,
input,
columnTypes,
options,
completePrefix = '',
completeLevel = 0,
prefix = '',
level = 0,
) {
if (!input || !isObject(input)) return;
Object.keys(input).forEach((key) => {
if (isValidJsonPathKey(eventType, `${prefix + key}`, input[key], level, options.jsonKeys)) {
const isValidLegacyJSONPath = isValidLegacyJsonPathKey(eventType, `${prefix + key}`, level, options.jsonLegacyPathKeys);
const isValidJSONPath = isValidJsonPathKey(`${completePrefix + key}`, completeLevel, options.jsonPathKeys);

if (isValidJSONPath || isValidLegacyJSONPath) {
if (isBlank(input[key])) {
return;
}

const val = JSON.stringify(input[key]);
appendColumnNameAndType(
utils,
eventType,
`${prefix + key}`,
val,
output,
columnTypes,
options,
true,
utils,
eventType,
`${prefix + key}`,
val,
output,
columnTypes,
options,
true,
);
} else if (isObject(input[key]) && (options.sourceCategory !== 'cloud' || level < 3)) {
setDataFromInputAndComputeColumnTypes(
utils,
eventType,
output,
input[key],
columnTypes,
options,
`${prefix + key}_`,
level + 1,
utils,
eventType,
output,
input[key],
columnTypes,
options,
`${completePrefix + key}_`,
completeLevel + 1,
`${prefix + key}_`,
level + 1,
);
} else {
let val = input[key];
Expand All @@ -245,13 +252,13 @@ function setDataFromInputAndComputeColumnTypes(
val = JSON.stringify(val);
}
appendColumnNameAndType(
utils,
eventType,
`${prefix + key}`,
val,
output,
columnTypes,
options,
utils,
eventType,
`${prefix + key}`,
val,
output,
columnTypes,
options,
);
}
});
Expand Down Expand Up @@ -316,12 +323,15 @@ function storeRudderEvent(utils, message, output, columnTypes, options) {
function addJsonKeysToOptions(options) {
// Add json key paths from integration options and destination config
const jsonPaths = Array.isArray(options.integrationOptions?.jsonPaths)
? options.integrationOptions.jsonPaths
: [];
? options.integrationOptions.jsonPaths
: [];
if (options.destJsonPaths) {
jsonPaths.push(...options.destJsonPaths.split(','));
}
options.jsonKeys = getKeysFromJsonPaths(jsonPaths);

const keys = keysFromJsonPaths(jsonPaths);
options.jsonPathKeys = keys.jsonPathKeys;
options.jsonLegacyPathKeys = keys.jsonLegacyPathKeys;
}

/*
Expand Down Expand Up @@ -583,6 +593,8 @@ function processWarehouseMessage(message, options) {
message.context,
commonColumnTypes,
options,
`${eventType + '_context_'}`,
2,
'context_',
);

Expand All @@ -605,6 +617,8 @@ function processWarehouseMessage(message, options) {
message.properties,
eventTableColumnTypes,
options,
`${eventType + '_properties_'}`,
2,
);
setDataFromColumnMappingAndComputeColumnTypes(
utils,
Expand Down Expand Up @@ -658,6 +672,8 @@ function processWarehouseMessage(message, options) {
message.context,
commonColumnTypes,
options,
`${eventType + '_context_'}`,
2,
'context_',
);
setDataFromColumnMappingAndComputeColumnTypes(
Expand Down Expand Up @@ -732,6 +748,8 @@ function processWarehouseMessage(message, options) {
message.properties,
eventTableColumnTypes,
options,
`${eventType + '_properties_'}`,
2,
);
setDataFromInputAndComputeColumnTypes(
utils,
Expand All @@ -740,6 +758,8 @@ function processWarehouseMessage(message, options) {
message.userProperties,
eventTableColumnTypes,
options,
`${eventType + '_userProperties_'}`,
2,
);
setDataFromColumnMappingAndComputeColumnTypes(
utils,
Expand Down Expand Up @@ -794,6 +814,8 @@ function processWarehouseMessage(message, options) {
message.userProperties,
commonColumnTypes,
options,
`${eventType + '_userProperties_'}`,
2,
);
setDataFromInputAndComputeColumnTypes(
utils,
Expand All @@ -802,6 +824,8 @@ function processWarehouseMessage(message, options) {
message.context ? message.context.traits : {},
commonColumnTypes,
options,
`${eventType + '_context_traits_'}`,
3,
);
setDataFromInputAndComputeColumnTypes(
utils,
Expand All @@ -810,6 +834,8 @@ function processWarehouseMessage(message, options) {
message.traits,
commonColumnTypes,
options,
`${eventType + '_traits_'}`,
2,
'',
);

Expand All @@ -821,6 +847,8 @@ function processWarehouseMessage(message, options) {
message.context,
commonColumnTypes,
options,
`${eventType + '_context_'}`,
2,
'context_',
);

Expand Down Expand Up @@ -925,6 +953,8 @@ function processWarehouseMessage(message, options) {
message.properties,
columnTypes,
options,
`${eventType + '_properties_'}`,
2,
);
// set rudder properties after user set properties to prevent overwriting
setDataFromInputAndComputeColumnTypes(
Expand All @@ -934,6 +964,8 @@ function processWarehouseMessage(message, options) {
message.context,
columnTypes,
options,
`${eventType + '_context_'}`,
2,
'context_',
);
setDataFromColumnMappingAndComputeColumnTypes(
Expand Down Expand Up @@ -992,6 +1024,8 @@ function processWarehouseMessage(message, options) {
message.traits,
columnTypes,
options,
`${eventType + '_traits_'}`,
2,
);
setDataFromInputAndComputeColumnTypes(
utils,
Expand All @@ -1000,6 +1034,8 @@ function processWarehouseMessage(message, options) {
message.context,
columnTypes,
options,
`${eventType + '_context_'}`,
2,
'context_',
);
setDataFromColumnMappingAndComputeColumnTypes(
Expand Down Expand Up @@ -1046,6 +1082,8 @@ function processWarehouseMessage(message, options) {
message.traits,
columnTypes,
options,
`${eventType + '_traits_'}`,
2,
);
setDataFromInputAndComputeColumnTypes(
utils,
Expand All @@ -1054,6 +1092,8 @@ function processWarehouseMessage(message, options) {
message.context,
columnTypes,
options,
`${eventType + '_context_'}`,
2,
'context_',
);
setDataFromColumnMappingAndComputeColumnTypes(
Expand Down
64 changes: 54 additions & 10 deletions src/warehouse/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,69 @@ const isObject = (value) => {
return value != null && (type === 'object' || type === 'function') && !Array.isArray(value);
};

const isValidJsonPathKey = (eventType, key, val, level, jsonKeys = {}) => {
const isValidJsonPathKey = (key, level, jsonKeys = {}) => {
return jsonKeys[key] === level;
};
const isValidLegacyJsonPathKey = (eventType, key, level, jsonKeys = {}) => {
return eventType === 'track' && jsonKeys[key] === level;
};

const isBlank = (value) => {
return _.isEmpty(_.toString(value));
};

/*
* input => ["a", "b.c"]
* output => { "a": 0, "b_c": 1}
This function takes in an array of json paths and returns an object with keys as the json path and value as the position of the key in the json path
Example:
Input:
[
"a",
"b.c"
]
Output:
{
"a": 0,
"b_c": 1
}
Input:
[
"track.context.a",
"track.properties.b",
"pages.properties.c.d",
"groups.traits.e.f"
]
Output:
{
"track_context_a": 2,
"track_properties_b": 2,
"pages_properties_c_d": 3,
"groups_traits_e_f": 3
}
*/
const getKeysFromJsonPaths = (jsonPaths) => {
const jsonKeys = {};
const keysFromJsonPaths = (jsonPaths) => {
const jsonPathKeys = {};
const jsonLegacyPathKeys = {};

const supportedEventPrefixes = ['track.', 'identify.', 'page.', 'screen.', 'alias.', 'group.', 'extract.'];

jsonPaths.forEach((jsonPath) => {
if (jsonPath.trim()) {
const paths = jsonPath.trim().split('.');
jsonKeys[paths.join('_')] = paths.length - 1;
const trimmedJSONPath = jsonPath.trim();
if (!trimmedJSONPath) {
return;

Check warning on line 67 in src/warehouse/util.js

View check run for this annotation

Codecov / codecov/patch

src/warehouse/util.js#L67

Added line #L67 was not covered by tests
}

const paths = trimmedJSONPath.split('.');
const key = paths.join('_');
const pos = paths.length - 1;

if (supportedEventPrefixes.some(prefix => trimmedJSONPath.startsWith(prefix))) {
jsonPathKeys[key] = pos;
return;
}
jsonLegacyPathKeys[key] = pos;
});
return jsonKeys;
return {jsonPathKeys, jsonLegacyPathKeys};
};

// https://www.myintervals.com/blog/2009/05/20/iso-8601-date-validation-that-doesnt-suck/
Expand Down Expand Up @@ -98,7 +141,8 @@ module.exports = {
isObject,
isBlank,
isValidJsonPathKey,
getKeysFromJsonPaths,
isValidLegacyJsonPathKey,
keysFromJsonPaths,
timestampRegex,
validTimestamp,
getVersionedUtils,
Expand Down
Loading

0 comments on commit 67819a7

Please sign in to comment.