diff --git a/README.md b/README.md index a5f1b07e..6a6da571 100644 --- a/README.md +++ b/README.md @@ -234,6 +234,7 @@ By default, the materialization keeps the last 4 table versions, you can change In case high performances are needed consider bucketing instead of partitions * By default, Glue "duplicate" the versions internally, so the last 2 versions of a table point to the same location * It's recommended to have versions_to_keep>= 4, as this will avoid to have the older location removed +* The macro athena__end_of_time needs to be overwritten by the user if using Athena v3 since it requires a precision parameter for timestamps ### Snapshots diff --git a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql index 77e71c3c..ceaf435d 100644 --- a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql @@ -22,7 +22,7 @@ {%- endif -%} {% set sql -%} - SELECT * FROM {{ source }}; + select * from {{ source }}; {%- endset -%} {{ create_table_as(False, target_relation, sql) }} @@ -33,15 +33,15 @@ %} {% macro build_snapshot_table(strategy, source_sql) %} - SELECT * - , {{ strategy.unique_key }} AS dbt_unique_key - , {{ strategy.updated_at }} AS dbt_valid_from - , {{ strategy.scd_id }} AS dbt_scd_id - , 'insert' AS dbt_change_type - , CAST('9999-01-01' as timestamp) AS dbt_valid_to - , True AS is_current_record - , {{ current_timestamp() }} AS dbt_snapshot_at - FROM ({{ source_sql }}) source; + select * + , {{ strategy.unique_key }} as dbt_unique_key + , {{ strategy.updated_at }} as dbt_valid_from + , {{ strategy.scd_id }} as dbt_scd_id + , 'insert' as dbt_change_type + , {{ end_of_time() }} as dbt_valid_to + , True as is_current_record + , {{ strategy.updated_at }} as dbt_snapshot_at + from ({{ source_sql }}) source; {% endmacro %} {% @@ -49,42 +49,42 @@ %} {% macro snapshot_staging_table(strategy, source_sql, target_relation) -%} - WITH snapshot_query AS ( + with snapshot_query as ( {{ source_sql }} ) - , snapshotted_data_base AS ( - SELECT * + , snapshotted_data_base as ( + select * , ROW_NUMBER() OVER ( PARTITION BY dbt_unique_key ORDER BY dbt_valid_from DESC - ) AS dbt_snapshot_rn - FROM {{ target_relation }} + ) as dbt_snapshot_rn + from {{ target_relation }} ) - , snapshotted_data AS ( - SELECT * - FROM snapshotted_data_base - WHERE dbt_snapshot_rn = 1 + , snapshotted_data as ( + select * + from snapshotted_data_base + where dbt_snapshot_rn = 1 AND dbt_change_type != 'delete' ) - , source_data AS ( - SELECT * - , {{ strategy.unique_key }} AS dbt_unique_key - , {{ strategy.updated_at }} AS dbt_valid_from - , {{ strategy.scd_id }} AS dbt_scd_id - FROM snapshot_query + , source_data as ( + select * + , {{ strategy.unique_key }} as dbt_unique_key + , {{ strategy.updated_at }} as dbt_valid_from + , {{ strategy.scd_id }} as dbt_scd_id + from snapshot_query ) - , upserts AS ( - SELECT source_data.* - , CASE - WHEN snapshotted_data.dbt_unique_key IS NULL THEN 'insert' - ELSE 'update' - END as dbt_change_type - , CAST('9999-01-01' as timestamp) AS dbt_valid_to - , True AS is_current_record - FROM source_data - LEFT JOIN snapshotted_data - ON snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - WHERE snapshotted_data.dbt_unique_key IS NULL + , upserts as ( + select source_data.* + , case + when snapshotted_data.dbt_unique_key IS NULL THEN 'insert' + else 'update' + end as dbt_change_type + , {{ end_of_time() }} as dbt_valid_to + , True as is_current_record + from source_data + LEFT join snapshotted_data + on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where snapshotted_data.dbt_unique_key IS NULL OR ( snapshotted_data.dbt_unique_key IS NOT NULL AND ( @@ -93,32 +93,22 @@ ) ) {%- if strategy.invalidate_hard_deletes -%} - {%- set target_columns = adapter.get_columns_in_relation(target_relation) -%} - , deletes AS ( - SELECT - {% for column in target_columns if not column.name == 'dbt_snapshot_at' %} - {% if column.name == 'dbt_valid_from' %} - {{ current_timestamp() }} AS dbt_valid_from {%- if not loop.last -%},{%- endif -%} - {% elif column.name == 'dbt_change_type' %} - 'delete' AS dbt_change_type {%- if not loop.last -%},{%- endif -%} - {% elif column.name == 'dbt_valid_to' %} - CAST('9999-01-01' as timestamp) AS dbt_valid_to {%- if not loop.last -%},{%- endif -%} - {% elif column.name == 'is_current_record' %} - True AS is_current_record {%- if not loop.last -%},{%- endif -%} - {% else %} - snapshotted_data.{{ column.name }} {%- if not loop.last -%},{%- endif -%} - {% endif %} - {% endfor %} - FROM snapshotted_data - LEFT JOIN source_data - ON snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - WHERE source_data.dbt_unique_key IS NULL - ) - SELECT * FROM upserts - UNION ALL - SELECT * FROM deletes; + , deletes as ( + select + source_data.* + , 'delete' as dbt_change_type + , {{ end_of_time() }} as dbt_valid_to + , True as is_current_record + from snapshotted_data + LEFT join source_data + on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where source_data.dbt_unique_key IS NULL + ) + select * from upserts + union all + select * from deletes; {% else %} - SELECT * FROM upserts; + select * from upserts; {% endif %} {%- endmacro %} @@ -151,28 +141,24 @@ #} {% macro athena__create_columns(relation, columns) -%} + {% set query -%} + alter table {{ relation }} add columns ( {%- for column in columns -%} {% if column.data_type|lower == 'boolean' %} - {% set query -%} - ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} BOOLEAN); - {%- endset -%} + {{ column.name }} boolean {%- if not loop.last -%},{%- endif -%} {% elif column.data_type|lower == 'character varying(256)' %} - {% set query -%} - ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} VARCHAR); - {%- endset -%} + {{ column.name }} varchar(255) {%- if not loop.last -%},{%- endif -%} {% elif column.data_type|lower == 'integer' %} - {% set query -%} - ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} INT); - {%- endset -%} + {{ column.name }} bigint {%- if not loop.last -%},{%- endif -%} {% elif column.data_type|lower == 'float' %} - {% set query -%} - ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} FLOAT); - {%- endset -%} + {{ column.name }} float {%- if not loop.last -%},{%- endif -%} {% else %} - ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} {{ column.data_type }}); + {{ column.name }} {{ column.data_type }} {%- if not loop.last -%},{%- endif -%} {% endif %} - {% do run_query(query) %} {%- endfor %} + ) + {%- endset -%} + {% do run_query(query) %} {% endmacro %} {# @@ -180,14 +166,14 @@ snapshot rows being updated and create a new temporary table to hold them #} -{% macro athena__create_new_snapshot_table(target, source) %} +{% macro athena__create_new_snapshot_table(strategy, strategy_name, target, source) %} {%- set tmp_identifier = target.identifier ~ '__dbt_tmp_1' -%} {%- set tmp_relation = adapter.get_relation(database=target.database, schema=target.schema, identifier=tmp_identifier) -%} {%- set target_relation = api.Relation.create(identifier=tmp_identifier, schema=target.schema, - database=none, + database=target.database, type='table') -%} {%- set source_columns = adapter.get_columns_in_relation(source) -%} @@ -197,39 +183,49 @@ {% endif %} {% set sql -%} - SELECT + select {% for column in source_columns %} {{ column.name }} {%- if not loop.last -%},{%- endif -%} {% endfor %} ,dbt_snapshot_at from {{ target }} - WHERE dbt_unique_key NOT IN ( SELECT dbt_unique_key FROM {{ source }} ) - UNION ALL - SELECT + where dbt_unique_key NOT IN ( select dbt_unique_key from {{ source }} ) + union all + select {% for column in source_columns %} - {% if column.name == 'dbt_valid_to' %} - CASE - WHEN dbt_valid_to=CAST('9999-01-01' as timestamp) AND is_current_record=True - THEN {{ current_timestamp() }} - ELSE dbt_valid_to - END AS dbt_valid_to {%- if not loop.last -%},{%- endif -%} - {% elif column.name == 'is_current_record' %} - CASE WHEN is_current_record=True THEN False ELSE is_current_record END - AS is_current_record {%- if not loop.last -%},{%- endif -%} - {% else %} - {{ column.name }} {%- if not loop.last -%},{%- endif -%} - {% endif %} + {% if column.name == 'dbt_valid_to' %} + case + when tgt.is_current_record + THEN + {% if strategy_name == 'timestamp' %} + src.{{ strategy.updated_at }} + {% else %} + {{ strategy.updated_at }} + {% endif %} + else tgt.dbt_valid_to + end as dbt_valid_to {%- if not loop.last -%},{%- endif -%} + {% elif column.name == 'is_current_record' %} + False as is_current_record {%- if not loop.last -%},{%- endif -%} + {% else %} + tgt.{{ column.name }} {%- if not loop.last -%},{%- endif -%} + {% endif %} {% endfor %} - ,{{ current_timestamp() }} AS dbt_snapshot_at - from {{ target }} - WHERE dbt_unique_key IN ( SELECT dbt_unique_key FROM {{ source }} ) - UNION ALL - SELECT + , + {% if strategy_name == 'timestamp' %} + tgt.{{ strategy.updated_at }} + {% else %} + {{ strategy.updated_at }} + {% endif %} as dbt_snapshot_at + from {{ target }} tgt + join {{ source }} src + on tgt.dbt_unique_key = src.dbt_unique_key + union all + select {% for column in source_columns %} {{ column.name }} {%- if not loop.last -%},{%- endif -%} {% endfor %} - ,{{ current_timestamp() }} AS dbt_snapshot_at - FROM {{ source }}; + ,{{ strategy.updated_at }} as dbt_snapshot_at + from {{ source }}; {%- endset -%} {% call statement('create_new_snapshot_table') %} @@ -277,9 +273,11 @@ {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) %} - {% do create_columns(target_relation, missing_columns) %} + {% if missing_columns %} + {% do create_columns(target_relation, missing_columns) %} + {% endif %} - {% set new_snapshot_table = athena__create_new_snapshot_table(target = target_relation, source = staging_table) %} + {% set new_snapshot_table = athena__create_new_snapshot_table(strategy, strategy_name, target_relation, staging_table) %} {% set final_sql = athena__snapshot_merge_sql( target = target_relation, diff --git a/dbt/include/athena/macros/utils/timestamps.sql b/dbt/include/athena/macros/utils/timestamps.sql index b980d220..328ba6cc 100644 --- a/dbt/include/athena/macros/utils/timestamps.sql +++ b/dbt/include/athena/macros/utils/timestamps.sql @@ -1,5 +1,19 @@ +{% + pyathena converts time zoned timestamps to strings so lets avoid them now() +%} + {% macro athena__current_timestamp() -%} - -- pyathena converts time zoned timestamps to strings so lets avoid them - -- now() cast(now() as timestamp) {%- endmacro %} + +{% + Macro to get the end_of_time timestamp +%} + +{% macro end_of_time() -%} + {{ return(adapter.dispatch('end_of_time')()) }} +{%- endmacro %} + +{% macro athena__end_of_time() -%} + cast('9999-01-01' AS timestamp) +{%- endmacro %}