From 5b0fdbd696eb4484ecb0ea311383a2ce55fcbc2d Mon Sep 17 00:00:00 2001 From: achettyiitr Date: Wed, 18 Sep 2024 03:52:13 +0530 Subject: [PATCH] feat: snowpipe streaming --- .../snowpipe_streaming/transform.js | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 src/v0/destinations/snowpipe_streaming/transform.js diff --git a/src/v0/destinations/snowpipe_streaming/transform.js b/src/v0/destinations/snowpipe_streaming/transform.js new file mode 100644 index 0000000000..bf53c57978 --- /dev/null +++ b/src/v0/destinations/snowpipe_streaming/transform.js @@ -0,0 +1,34 @@ +const { processWarehouseMessage } = require('../../../warehouse'); + +const provider = 'snowflake'; + +function getDataTypeOverride(key, val, options, jsonKey = false) { + if (key === 'violationErrors' || jsonKey) { + return 'json'; + } + return 'string'; +} + +function process(event) { + const whSchemaVersion = event.request.query.whSchemaVersion || 'v1'; + const whIDResolve = event.request.query.whIDResolve === 'true' || false; + const whStoreEvent = event.destination.Config.storeFullEvent === true; + const destJsonPaths = event.destination?.Config?.jsonPaths || ''; + return processWarehouseMessage(event.message, { + metadata: event.metadata, + whSchemaVersion, + whStoreEvent, + whIDResolve, + getDataTypeOverride, + provider, + sourceCategory: event.metadata ? event.metadata.sourceCategory : null, + destJsonPaths, + destConfig: event.destination?.Config, + }); +} + +module.exports = { + provider, + process, + getDataTypeOverride, +};