Skip to content

Commit

Permalink
Feature: on_schema_change for incremental models (#3387)
Browse files Browse the repository at this point in the history
* detect and act on schema changes

* update incremental helpers code

* update changelog

* fix error in diff_columns from testing

* abstract code a bit further

* address matching names vs. data types

* Update CHANGELOG.md

Co-authored-by: Jeremy Cohen <[email protected]>

* updates from Jeremy's feedback

* multi-column add / remove with full_refresh

* simple changes from JC's feedback

* updated for snowflake

* reorganize postgres code

* reorganize approach

* updated full refresh trigger logic

* fixed unintentional wipe behavior

* catch final else condition

* remove WHERE string replace

* touch ups

* port core to snowflake

* added bigquery code

* updated impacted unit tests

* updates from linting tests

* updates from linting again

* snowflake updates from further testing

* fix logging

* clean up incremental logic

* updated for bigquery

* update postgres with new strategy

* update nodeconfig

* starting integration tests

* integration test for ignore case

* add test for append_new_columns

* add integration test for sync

* remove extra tests

* add unique key and snowflake test

* move incremental integration test dir

* update integration tests

* update integration tests

* Suggestions for #3387 (#3558)

* PR feedback: rationalize macros + logging, fix + expand tests

* Rm alter_column_types, always true for sync_all_columns

* update logging and integration test on sync

* update integration tests

* test fix SF integration tests

Co-authored-by: Matt Winkler <[email protected]>

* rename integration test folder

* Update core/dbt/include/global_project/macros/materializations/incremental/incremental.sql

Accept Jeremy's suggested change

Co-authored-by: Jeremy Cohen <[email protected]>

* Update changelog [skip ci]

Co-authored-by: Jeremy Cohen <[email protected]>
  • Loading branch information
matt-winkler and jtcohen6 authored Jul 21, 2021
1 parent 9f716b3 commit bd70106
Show file tree
Hide file tree
Showing 33 changed files with 838 additions and 47 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
- Add `dbt build` command to run models, tests, seeds, and snapshots in DAG order. ([#2743] (https://github.com/dbt-labs/dbt/issues/2743), [#3490] (https://github.com/dbt-labs/dbt/issues/3490))
- Introduce `on_schema_change` config to detect and handle schema changes on incremental models ([#1132](https://github.com/fishtown-analytics/dbt/issues/1132), [#3387](https://github.com/fishtown-analytics/dbt/issues/3387))

### Breaking changes
- Add full node selection to source freshness command and align selection syntax with other tasks (`dbt source freshness --select source_name` --> `dbt source freshness --select source:souce_name`) and rename `dbt source snapshot-freshness` -> `dbt source freshness`. ([#2987](https://github.com/dbt-labs/dbt/issues/2987), [#3554](https://github.com/dbt-labs/dbt/pull/3554))
Expand All @@ -23,6 +24,7 @@

Contributors:
- [@kostek-pl](https://github.com/kostek-pl) ([#3236](https://github.com/fishtown-analytics/dbt/pull/3408))
- [@matt-winkler](https://github.com/matt-winkler) ([#3387](https://github.com/dbt-labs/dbt/pull/3387))
- [@tconbeer](https://github.com/tconbeer) [#3468](https://github.com/fishtown-analytics/dbt/pull/3468))
- [@JLDLaughlin](https://github.com/JLDLaughlin) ([#3473](https://github.com/fishtown-analytics/dbt/pull/3473))
- [@jmriego](https://github.com/jmriego) ([#3526](https://github.com/dbt-labs/dbt/pull/3526))
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def rename_relation(
def get_columns_in_relation(
self, relation: BaseRelation
) -> List[BaseColumn]:
"""Get a list of the columns in the given Relation."""
"""Get a list of the columns in the given Relation. """
raise NotImplementedException(
'`get_columns_in_relation` is not implemented for this adapter!'
)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ class NodeConfig(BaseConfig):
CompareBehavior.Exclude),
)
full_refresh: Optional[bool] = None
on_schema_change: Optional[str] = 'ignore'

@classmethod
def __pre_deserialize__(cls, data):
Expand Down
31 changes: 31 additions & 0 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,34 @@
{{ config.set('sql_header', caller()) }}
{%- endmacro %}


{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
{{ return(adapter.dispatch('alter_relation_add_remove_columns')(relation, add_columns, remove_columns)) }}
{% endmacro %}

{% macro default__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}
{% if remove_columns is none %}
{% set remove_columns = [] %}
{% endif %}

{% set sql -%}

alter {{ relation.type }} {{ relation }}

{% for column in add_columns %}
add column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}{{ ',' if remove_columns | length > 0 }}

{% for column in remove_columns %}
drop column {{ column.name }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}

{% do run_query(sql) %}

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + "__dbt_backup" %}
Expand All @@ -28,22 +32,30 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{# -- first check whether we want to full refresh for source view or config reasons #}
{% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or should_full_refresh() %}
{% elif trigger_full_refresh %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + '__dbt_backup' %}
{% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}

{% set build_sql = create_table_as(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% do to_drop.append(backup_relation) %}
{% else %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}

{% endif %}

{% call statement("main") %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %}

{% if on_schema_change not in ['sync_all_columns', 'append_new_columns', 'fail', 'ignore'] %}

{% set log_message = 'Invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default) %}
{% do log(log_message) %}

{{ return(default) }}

{% else %}

{{ return(on_schema_change) }}

{% endif %}

{% endmacro %}

{% macro diff_columns(source_columns, target_columns) %}

{% set result = [] %}
{% set source_names = source_columns | map(attribute = 'column') | list %}
{% set target_names = target_columns | map(attribute = 'column') | list %}

{# --check whether the name attribute exists in the target - this does not perform a data type check #}
{% for sc in source_columns %}
{% if sc.name not in target_names %}
{{ result.append(sc) }}
{% endif %}
{% endfor %}

{{ return(result) }}

{% endmacro %}

{% macro diff_column_data_types(source_columns, target_columns) %}

{% set result = [] %}
{% for sc in source_columns %}
{% set tc = target_columns | selectattr("name", "equalto", sc.name) | list | first %}
{% if tc %}
{% if sc.data_type != tc.data_type %}
{{ result.append( { 'column_name': tc.name, 'new_type': sc.data_type } ) }}
{% endif %}
{% endif %}
{% endfor %}

{{ return(result) }}

{% endmacro %}


{% macro check_for_schema_changes(source_relation, target_relation) %}

{% set schema_changed = False %}

{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%}
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%}

{% set new_target_types = diff_column_data_types(source_columns, target_columns) %}

{% if source_not_in_target != [] %}
{% set schema_changed = True %}
{% elif target_not_in_source != [] or new_target_types != [] %}
{% set schema_changed = True %}
{% elif new_target_types != [] %}
{% set schema_changed = True %}
{% endif %}

{% set changes_dict = {
'schema_changed': schema_changed,
'source_not_in_target': source_not_in_target,
'target_not_in_source': target_not_in_source,
'new_target_types': new_target_types
} %}

{% set msg %}
In {{ target_relation }}:
Schema changed: {{ schema_changed }}
Source columns not in target: {{ source_not_in_target }}
Target columns not in source: {{ target_not_in_source }}
New column types: {{ new_target_types }}
{% endset %}

{% do log(msg) %}

{{ return(changes_dict) }}

{% endmacro %}


{% macro sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %}

{%- set add_to_target_arr = schema_changes_dict['source_not_in_target'] -%}

{%- if on_schema_change == 'append_new_columns'-%}
{%- if add_to_target_arr | length > 0 -%}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, none) -%}
{%- endif -%}

{% elif on_schema_change == 'sync_all_columns' %}
{%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%}
{%- set new_target_types = schema_changes_dict['new_target_types'] -%}

{% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 %}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%}
{% endif %}

{% if new_target_types != [] %}
{% for ntt in new_target_types %}
{% set column_name = ntt['column_name'] %}
{% set new_type = ntt['new_type'] %}
{% do alter_column_type(target_relation, column_name, new_type) %}
{% endfor %}
{% endif %}

{% endif %}

{% set schema_change_message %}
In {{ target_relation }}:
Schema change approach: {{ on_schema_change }}
Columns added: {{ add_to_target_arr }}
Columns removed: {{ remove_from_target_arr }}
Data types changed: {{ new_target_types }}
{% endset %}

{% do log(schema_change_message) %}

{% endmacro %}


{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}

{% if on_schema_change != 'ignore' %}

{% set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) %}

{% if schema_changes_dict['schema_changed'] %}

{% if on_schema_change == 'fail' %}

{% set fail_msg %}
The source and target schemas on this incremental model are out of sync!
They can be reconciled in several ways:
- set the `on_schema_change` config to either append_new_columns or sync_all_columns, depending on your situation.
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.
{% endset %}

{% do exceptions.raise_compiler_error(fail_msg) %}

{# -- unless we ignore, run the sync operation per the config #}
{% else %}

{% do sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %}

{% endif %}

{% endif %}

{% endif %}

{% endmacro %}
32 changes: 32 additions & 0 deletions plugins/bigquery/dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,38 @@
{% do adapter.rename_relation(from_relation, to_relation) %}
{% endmacro %}

{% macro bigquery__alter_relation_add_columns(relation, add_columns) %}

{% set sql -%}

alter {{ relation.type }} {{ relation }}
{% for column in add_columns %}
add column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}

{{ return(run_query(sql)) }}

{% endmacro %}

{% macro bigquery__alter_relation_drop_columns(relation, drop_columns) %}

{% set sql -%}

alter {{ relation.type }} {{ relation }}

{% for column in drop_columns %}
drop column {{ column.name }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}

{{ return(run_query(sql)) }}

{% endmacro %}


{% macro bigquery__alter_column_type(relation, column_name, new_column_type) -%}
{#
Changing a column's data type using a query requires you to scan the entire table.
Expand Down
Loading

0 comments on commit bd70106

Please sign in to comment.