diff --git a/README.md b/README.md index 81720fbb..474872ec 100644 --- a/README.md +++ b/README.md @@ -760,6 +760,7 @@ This macro generates the sessions lifecycle manifest table that Snowplow leverag - `event_limits_table`: The name of the table where your Snowplow event limits are stored. - `incremental_manifest_table`: The name of the incremental manifest table. - `package_name`: The name of the package you are running this macro under. +- `allow_null_dvce_tstamps`: A boolean to allow nulls for the dvce_created_tstamp or dvce_sent_tstamp fields for certain tracker users when it is safe to bypass the events sent late check/filter using days_late_allowed **Usage:** @@ -829,6 +830,7 @@ This macro generates the events this run table that Snowplow leverages. - `snowplow_events_table`: The name of your events table where your events land. - `entities_or_sdes`: In Redshift & Postgres, due to the shredded table design (meaning each context is loaded separately into a table), you need to specify which contexts you want to be included in the snowplow_base_events_this_run table, which you can do using this variable. This needs to be an array of key:value maps with the following properties: `name`, `prefix`, `alias`, `single_entity` - `custom_sql`: Any custom SQL you want to include within your `events_this_run` table +- `allow_null_dvce_tstamps`: A boolean to allow nulls for the dvce_created_tstamp or dvce_sent_tstamp fields for certain tracker users when it is safe to bypass the events sent late check/filter using days_late_allowed **Usage:** diff --git a/macros/base/base_create_snowplow_events_this_run.sql b/macros/base/base_create_snowplow_events_this_run.sql index 3b7d1cc6..73a51485 100644 --- a/macros/base/base_create_snowplow_events_this_run.sql +++ b/macros/base/base_create_snowplow_events_this_run.sql @@ -5,11 +5,11 @@ and you may not use this file except in compliance with the Snowplow Personal an You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ #} -{% macro base_create_snowplow_events_this_run(sessions_this_run_table='snowplow_base_sessions_this_run', session_identifiers=[{"schema" : "atomic", "field" : "domain_sessionid"}], session_sql=none, session_timestamp='load_tstamp', derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', entities_or_sdes=none, custom_sql=none) %} - {{ return(adapter.dispatch('base_create_snowplow_events_this_run', 'snowplow_utils')(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql)) }} +{% macro base_create_snowplow_events_this_run(sessions_this_run_table='snowplow_base_sessions_this_run', session_identifiers=[{"schema" : "atomic", "field" : "domain_sessionid"}], session_sql=none, session_timestamp='load_tstamp', derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', entities_or_sdes=none, custom_sql=none, allow_null_dvce_tstamps=false) %} + {{ return(adapter.dispatch('base_create_snowplow_events_this_run', 'snowplow_utils')(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql, allow_null_dvce_tstamps)) }} {% endmacro %} -{% macro default__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql) %} +{% macro default__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql, allow_null_dvce_tstamps) %} {%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table), 'start_tstamp', 'end_tstamp') %} @@ -52,7 +52,13 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 on a.session_identifier = b.session_identifier where a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }} - and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }} + + {% if allow_null_dvce_tstamps %} + and coalesce(a.dvce_sent_tstamp, a.collector_tstamp) <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'coalesce(a.dvce_created_tstamp, a.collector_tstamp)') }} + {% else %} + and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }} + {% endif %} + and a.{{ session_timestamp }} >= {{ lower_limit }} and a.{{ session_timestamp }} <= {{ upper_limit }} and a.{{ session_timestamp }} >= b.start_tstamp -- deal with late loading events @@ -71,7 +77,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endmacro %} -{% macro postgres__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql) %} +{% macro postgres__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql, allow_null_dvce_tstamps) %} {%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table), 'start_tstamp', 'end_tstamp') %} @@ -192,7 +198,11 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 on a.session_identifier = b.session_identifier where a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }} - and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }} + {% if allow_null_dvce_tstamps %} + and coalesce(a.dvce_sent_tstamp, a.collector_tstamp) <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'coalesce(a.dvce_created_tstamp, a.collector_tstamp)') }} + {% else %} + and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }} + {% endif %} and a.{{ session_timestamp }} >= {{ lower_limit }} and a.{{ session_timestamp }} <= {{ upper_limit }} and a.{{ session_timestamp }} >= b.start_tstamp -- deal with late loading events diff --git a/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql b/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql index 32544539..6f8d8fe0 100644 --- a/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql +++ b/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql @@ -5,11 +5,11 @@ and you may not use this file except in compliance with the Snowplow Personal an You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ #} {# Add quarantined_sessions reference properly #} -{% macro base_create_snowplow_sessions_lifecycle_manifest(session_identifiers=[{"schema": "atomic", "field" : "domain_sessionid"}], session_sql=none, session_timestamp='load_tstamp', user_identifiers=[{"schema": "atomic", "field" : "domain_userid"}], user_sql=none, quarantined_sessions=none, derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', event_limits_table='snowplow_base_new_event_limits', incremental_manifest_table='snowplow_incremental_manifest', package_name='snowplow') %} - {{ return(adapter.dispatch('base_create_snowplow_sessions_lifecycle_manifest', 'snowplow_utils')(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name)) }} +{% macro base_create_snowplow_sessions_lifecycle_manifest(session_identifiers=[{"schema": "atomic", "field" : "domain_sessionid"}], session_sql=none, session_timestamp='load_tstamp', user_identifiers=[{"schema": "atomic", "field" : "domain_userid"}], user_sql=none, quarantined_sessions=none, derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', event_limits_table='snowplow_base_new_event_limits', incremental_manifest_table='snowplow_incremental_manifest', package_name='snowplow', allow_null_dvce_tstamps=false) %} + {{ return(adapter.dispatch('base_create_snowplow_sessions_lifecycle_manifest', 'snowplow_utils')(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name, allow_null_dvce_tstamps)) }} {% endmacro %} -{% macro default__base_create_snowplow_sessions_lifecycle_manifest(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name) %} +{% macro default__base_create_snowplow_sessions_lifecycle_manifest(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name, allow_null_dvce_tstamps) %} {% set base_event_limits = ref(event_limits_table) %} {% set lower_limit, upper_limit, _ = snowplow_utils.return_base_new_event_limits(base_event_limits) %} {% set session_lookback_limit = snowplow_utils.get_session_lookback_limit(lower_limit) %} @@ -62,7 +62,11 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 from {{ snowplow_events }} e where - dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} -- don't process data that's too late + {% if allow_null_dvce_tstamps %} + coalesce(dvce_sent_tstamp, collector_tstamp) <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'coalesce(dvce_created_tstamp, collector_tstamp)') }} + {% else %} + dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} + {% endif %} -- don't process data that's too late and {{ session_timestamp }} >= {{ lower_limit }} and {{ session_timestamp }} <= {{ upper_limit }} and {{ snowplow_utils.app_id_filter(app_ids) }} @@ -135,7 +139,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endmacro %} -{% macro postgres__base_create_snowplow_sessions_lifecycle_manifest(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name) %} +{% macro postgres__base_create_snowplow_sessions_lifecycle_manifest(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name, allow_null_dvce_tstamps) %} {% set base_event_limits = ref(event_limits_table) %} {% set lower_limit, upper_limit, _ = snowplow_utils.return_base_new_event_limits(base_event_limits) %} {% set session_lookback_limit = snowplow_utils.get_session_lookback_limit(lower_limit) %} @@ -221,7 +225,11 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endfor %} {% endif %} where - dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} -- don't process data that's too late + {% if allow_null_dvce_tstamps %} + coalesce(dvce_sent_tstamp, collector_tstamp) <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'coalesce(dvce_created_tstamp, collector_tstamp)') }} + {% else %} + dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} + {% endif %}-- don't process data that's too late and {{ session_timestamp }} >= {{ lower_limit }} and {{ session_timestamp }} <= {{ upper_limit }} and {{ snowplow_utils.app_id_filter(app_ids) }}