Skip to content

Commit

Permalink
fix: Fix snapshot valid to value (Tomme#149)
Browse files Browse the repository at this point in the history
Co-authored-by: nicor88 <[email protected]>
Co-authored-by: Jérémy Guiselin <[email protected]>
  • Loading branch information
3 people authored Mar 6, 2023
1 parent a96179d commit 8272eb2
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 105 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ By default, the materialization keeps the last 4 table versions, you can change
In case high performances are needed consider bucketing instead of partitions
* By default, Glue "duplicate" the versions internally, so the last 2 versions of a table point to the same location
* It's recommended to have versions_to_keep>= 4, as this will avoid to have the older location removed
* The macro athena__end_of_time needs to be overwritten by the user if using Athena v3 since it requires a precision parameter for timestamps
### Snapshots
Expand Down
204 changes: 101 additions & 103 deletions dbt/include/athena/macros/materializations/snapshots/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{%- endif -%}

{% set sql -%}
SELECT * FROM {{ source }};
select * from {{ source }};
{%- endset -%}

{{ create_table_as(False, target_relation, sql) }}
Expand All @@ -33,58 +33,58 @@
%}

{% macro build_snapshot_table(strategy, source_sql) %}
SELECT *
, {{ strategy.unique_key }} AS dbt_unique_key
, {{ strategy.updated_at }} AS dbt_valid_from
, {{ strategy.scd_id }} AS dbt_scd_id
, 'insert' AS dbt_change_type
, CAST('9999-01-01' as timestamp) AS dbt_valid_to
, True AS is_current_record
, {{ current_timestamp() }} AS dbt_snapshot_at
FROM ({{ source_sql }}) source;
select *
, {{ strategy.unique_key }} as dbt_unique_key
, {{ strategy.updated_at }} as dbt_valid_from
, {{ strategy.scd_id }} as dbt_scd_id
, 'insert' as dbt_change_type
, {{ end_of_time() }} as dbt_valid_to
, True as is_current_record
, {{ strategy.updated_at }} as dbt_snapshot_at
from ({{ source_sql }}) source;
{% endmacro %}

{%
Identify records that needs to be upserted or deleted into the snapshot table
%}

{% macro snapshot_staging_table(strategy, source_sql, target_relation) -%}
WITH snapshot_query AS (
with snapshot_query as (
{{ source_sql }}
)
, snapshotted_data_base AS (
SELECT *
, snapshotted_data_base as (
select *
, ROW_NUMBER() OVER (
PARTITION BY dbt_unique_key
ORDER BY dbt_valid_from DESC
) AS dbt_snapshot_rn
FROM {{ target_relation }}
) as dbt_snapshot_rn
from {{ target_relation }}
)
, snapshotted_data AS (
SELECT *
FROM snapshotted_data_base
WHERE dbt_snapshot_rn = 1
, snapshotted_data as (
select *
from snapshotted_data_base
where dbt_snapshot_rn = 1
AND dbt_change_type != 'delete'
)
, source_data AS (
SELECT *
, {{ strategy.unique_key }} AS dbt_unique_key
, {{ strategy.updated_at }} AS dbt_valid_from
, {{ strategy.scd_id }} AS dbt_scd_id
FROM snapshot_query
, source_data as (
select *
, {{ strategy.unique_key }} as dbt_unique_key
, {{ strategy.updated_at }} as dbt_valid_from
, {{ strategy.scd_id }} as dbt_scd_id
from snapshot_query
)
, upserts AS (
SELECT source_data.*
, CASE
WHEN snapshotted_data.dbt_unique_key IS NULL THEN 'insert'
ELSE 'update'
END as dbt_change_type
, CAST('9999-01-01' as timestamp) AS dbt_valid_to
, True AS is_current_record
FROM source_data
LEFT JOIN snapshotted_data
ON snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
WHERE snapshotted_data.dbt_unique_key IS NULL
, upserts as (
select source_data.*
, case
when snapshotted_data.dbt_unique_key IS NULL THEN 'insert'
else 'update'
end as dbt_change_type
, {{ end_of_time() }} as dbt_valid_to
, True as is_current_record
from source_data
LEFT join snapshotted_data
on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key IS NULL
OR (
snapshotted_data.dbt_unique_key IS NOT NULL
AND (
Expand All @@ -93,32 +93,22 @@
)
)
{%- if strategy.invalidate_hard_deletes -%}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
, deletes AS (
SELECT
{% for column in target_columns if not column.name == 'dbt_snapshot_at' %}
{% if column.name == 'dbt_valid_from' %}
{{ current_timestamp() }} AS dbt_valid_from {%- if not loop.last -%},{%- endif -%}
{% elif column.name == 'dbt_change_type' %}
'delete' AS dbt_change_type {%- if not loop.last -%},{%- endif -%}
{% elif column.name == 'dbt_valid_to' %}
CAST('9999-01-01' as timestamp) AS dbt_valid_to {%- if not loop.last -%},{%- endif -%}
{% elif column.name == 'is_current_record' %}
True AS is_current_record {%- if not loop.last -%},{%- endif -%}
{% else %}
snapshotted_data.{{ column.name }} {%- if not loop.last -%},{%- endif -%}
{% endif %}
{% endfor %}
FROM snapshotted_data
LEFT JOIN source_data
ON snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
WHERE source_data.dbt_unique_key IS NULL
)
SELECT * FROM upserts
UNION ALL
SELECT * FROM deletes;
, deletes as (
select
source_data.*
, 'delete' as dbt_change_type
, {{ end_of_time() }} as dbt_valid_to
, True as is_current_record
from snapshotted_data
LEFT join source_data
on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key IS NULL
)
select * from upserts
union all
select * from deletes;
{% else %}
SELECT * FROM upserts;
select * from upserts;
{% endif %}

{%- endmacro %}
Expand Down Expand Up @@ -151,43 +141,39 @@
#}

{% macro athena__create_columns(relation, columns) -%}
{% set query -%}
alter table {{ relation }} add columns (
{%- for column in columns -%}
{% if column.data_type|lower == 'boolean' %}
{% set query -%}
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} BOOLEAN);
{%- endset -%}
{{ column.name }} boolean {%- if not loop.last -%},{%- endif -%}
{% elif column.data_type|lower == 'character varying(256)' %}
{% set query -%}
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} VARCHAR);
{%- endset -%}
{{ column.name }} varchar(255) {%- if not loop.last -%},{%- endif -%}
{% elif column.data_type|lower == 'integer' %}
{% set query -%}
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} INT);
{%- endset -%}
{{ column.name }} bigint {%- if not loop.last -%},{%- endif -%}
{% elif column.data_type|lower == 'float' %}
{% set query -%}
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} FLOAT);
{%- endset -%}
{{ column.name }} float {%- if not loop.last -%},{%- endif -%}
{% else %}
ALTER TABLE {{ relation }} ADD COLUMNS ({{ column.name }} {{ column.data_type }});
{{ column.name }} {{ column.data_type }} {%- if not loop.last -%},{%- endif -%}
{% endif %}
{% do run_query(query) %}
{%- endfor %}
)
{%- endset -%}
{% do run_query(query) %}
{% endmacro %}

{#
Update the dbt_valid_to and is_current_record for
snapshot rows being updated and create a new temporary table to hold them
#}

{% macro athena__create_new_snapshot_table(target, source) %}
{% macro athena__create_new_snapshot_table(strategy, strategy_name, target, source) %}
{%- set tmp_identifier = target.identifier ~ '__dbt_tmp_1' -%}

{%- set tmp_relation = adapter.get_relation(database=target.database, schema=target.schema, identifier=tmp_identifier) -%}

{%- set target_relation = api.Relation.create(identifier=tmp_identifier,
schema=target.schema,
database=none,
database=target.database,
type='table') -%}

{%- set source_columns = adapter.get_columns_in_relation(source) -%}
Expand All @@ -197,39 +183,49 @@
{% endif %}

{% set sql -%}
SELECT
select
{% for column in source_columns %}
{{ column.name }} {%- if not loop.last -%},{%- endif -%}
{% endfor %}
,dbt_snapshot_at
from {{ target }}
WHERE dbt_unique_key NOT IN ( SELECT dbt_unique_key FROM {{ source }} )
UNION ALL
SELECT
where dbt_unique_key NOT IN ( select dbt_unique_key from {{ source }} )
union all
select
{% for column in source_columns %}
{% if column.name == 'dbt_valid_to' %}
CASE
WHEN dbt_valid_to=CAST('9999-01-01' as timestamp) AND is_current_record=True
THEN {{ current_timestamp() }}
ELSE dbt_valid_to
END AS dbt_valid_to {%- if not loop.last -%},{%- endif -%}
{% elif column.name == 'is_current_record' %}
CASE WHEN is_current_record=True THEN False ELSE is_current_record END
AS is_current_record {%- if not loop.last -%},{%- endif -%}
{% else %}
{{ column.name }} {%- if not loop.last -%},{%- endif -%}
{% endif %}
{% if column.name == 'dbt_valid_to' %}
case
when tgt.is_current_record
THEN
{% if strategy_name == 'timestamp' %}
src.{{ strategy.updated_at }}
{% else %}
{{ strategy.updated_at }}
{% endif %}
else tgt.dbt_valid_to
end as dbt_valid_to {%- if not loop.last -%},{%- endif -%}
{% elif column.name == 'is_current_record' %}
False as is_current_record {%- if not loop.last -%},{%- endif -%}
{% else %}
tgt.{{ column.name }} {%- if not loop.last -%},{%- endif -%}
{% endif %}
{% endfor %}
,{{ current_timestamp() }} AS dbt_snapshot_at
from {{ target }}
WHERE dbt_unique_key IN ( SELECT dbt_unique_key FROM {{ source }} )
UNION ALL
SELECT
,
{% if strategy_name == 'timestamp' %}
tgt.{{ strategy.updated_at }}
{% else %}
{{ strategy.updated_at }}
{% endif %} as dbt_snapshot_at
from {{ target }} tgt
join {{ source }} src
on tgt.dbt_unique_key = src.dbt_unique_key
union all
select
{% for column in source_columns %}
{{ column.name }} {%- if not loop.last -%},{%- endif -%}
{% endfor %}
,{{ current_timestamp() }} AS dbt_snapshot_at
FROM {{ source }};
,{{ strategy.updated_at }} as dbt_snapshot_at
from {{ source }};
{%- endset -%}

{% call statement('create_new_snapshot_table') %}
Expand Down Expand Up @@ -277,9 +273,11 @@

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) %}

{% do create_columns(target_relation, missing_columns) %}
{% if missing_columns %}
{% do create_columns(target_relation, missing_columns) %}
{% endif %}

{% set new_snapshot_table = athena__create_new_snapshot_table(target = target_relation, source = staging_table) %}
{% set new_snapshot_table = athena__create_new_snapshot_table(strategy, strategy_name, target_relation, staging_table) %}

{% set final_sql = athena__snapshot_merge_sql(
target = target_relation,
Expand Down
18 changes: 16 additions & 2 deletions dbt/include/athena/macros/utils/timestamps.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
{%
pyathena converts time zoned timestamps to strings so lets avoid them now()
%}

{% macro athena__current_timestamp() -%}
-- pyathena converts time zoned timestamps to strings so lets avoid them
-- now()
cast(now() as timestamp)
{%- endmacro %}

{%
Macro to get the end_of_time timestamp
%}

{% macro end_of_time() -%}
{{ return(adapter.dispatch('end_of_time')()) }}
{%- endmacro %}

{% macro athena__end_of_time() -%}
cast('9999-01-01' AS timestamp)
{%- endmacro %}

0 comments on commit 8272eb2

Please sign in to comment.