diff --git a/README.md b/README.md index 7aa4e53..27eb9d4 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ Uses [vertica-python](https://github.com/vertica/vertica-python) to connect to V ### dbt Core Features -Below is a table for what features the current Vertica adapter supports for dbt. This is constantly improving and changing as both dbt adds new functionality, as well as the dbt-vertica driver improves. +Below is a table for what features the current Vertica adapter supports for dbt. This is constantly improving and changing as both dbt adds new functionality, as well as the dbt-vertica driver improves. This list is based upon dbt 1.0.3. + | dbt Core Features | Supported | | ------------------------------------------------- | ----------- | @@ -45,6 +46,10 @@ Below is a table for what features the current Vertica adapter supports for Vert ## Changes +### 1.0.3 +- Refactored the adapter to model after dbt's global_project macros +- Unimplemented functions should throw an exception that it's not implemented. If you stumble across this, please open an Issue or PR so we can investigate. + ### 1.0.2 - Added support for snapshot timestamp with passing tests - Added support for snapshot check cols with passing tests diff --git a/dbt/include/vertica/macros/README.md b/dbt/include/vertica/macros/README.md new file mode 100644 index 0000000..33be61e --- /dev/null +++ b/dbt/include/vertica/macros/README.md @@ -0,0 +1,65 @@ +# dbt-vertica Macro Implementations + +Below is a table for what macros the current Vertica adapter supports for dbt. This is constantly improving and changing as both dbt adds new functionality, as well as the dbt-vertica driver improves. This list is based upon dbt 1.0.3, with the list found under https://github.com/dbt-labs/dbt-core/tree/main/core/dbt/include/global_project/macros. + +## dbt Macros - adapters + +| dbt Macros - adapters | Function | Implemented | +| ------------------------- | ----------------------------------- | ----------- | +| adapters/columns.sql | get_columns_in_relation() | Yes | +| adapters/columns.sql | sql_convert_columns_in_relation() | No | +| adapters/columns.sql | get_columns_in_query() | Yes | +| adapters/columns.sql | alter_column_type() | No | +| adapters/columns.sql | alter_relation_add_remove_columns() | No | +| adapters/freshness.sql | collect_freshness() | No | +| adapters/freshness.sql | current_timestamp() | Yes | +| adapters/indexes.sql | create_indexes() | Yes | +| adapters/indexes.sql | get_create_index_sql() | No | +| adapters/metadata.sql | get_catalog() | Yes | +| adapters/metadata.sql | information_schema_name() | Yes | +| adapters/metadata.sql | list_schemas() | Yes | +| adapters/metadata.sql | list_relations_without_caching() | Yes | +| adapters/persist_docs.sql | alter_column_comment() | No | +| adapters/persist_docs.sql | alter_relation_comment() | No | +| adapters/persist_docs.sql | persist_docs() | Yes | +| adapters/relation.sql | drop_relation() | Yes | +| adapters/relation.sql | drop_relation_if_exists() | Yes | +| adapters/relation.sql | get_or_create_relation() | No | +| adapters/relation.sql | load_relation() | No | +| adapters/relation.sql | make_temp_relation() | Yes | +| adapters/relation.sql | rename_relation() | Yes | +| adapters/relation.sql | truncate_relation() | Yes | +| adapters/schema.sql | create_schema() | Yes | +| adapters/schema.sql | drop_schema() | Yes | + +## dbt Macros - materializations +| dbt Macros - materializations | Function | Implemented | +| -------------------------------------------------------- | -------------------------------- | ----------- | +| materializations/models/incremental/merge.sql | get_merge_sql() | Yes | +| materializations/models/incremental/merge.sql | get_delete_insert_merge_sql() | Yes | +| materializations/models/incremental/merge.sql | get_insert_overwrite_merge_sql() | No | +| materializations/models/table/create_table_as.sql | create_table_as() | Yes | +| materializations/models/view/create_or_replace_view.sql | create_or_replace_view() | Yes | +| materializations/models/view/create_view_as.sql | create_view_as() | Yes | +| materializations/models/view/create_view_as.sql | get_create_view_as_sql() | Yes | +| materializations/seeds/helpers.sql | create_csv_table() | Yes | +| materializations/seeds/helpers.sql | reset_csv_table() | Yes | +| materializations/seeds/helpers.sql | get_binding_char() | Yes | +| materializations/seeds/helpers.sql | get_seed_column_quoted_csv() | Yes | +| materializations/seeds/helpers.sql | load_csv_rows() | Yes | +| materializations/snapshots/helper.sql | build_snapshot_table() | Yes | +| materializations/snapshots/helper.sql | create_columns() | Yes | +| materializations/snapshots/helper.sql | post_snapshot() | Yes | +| materializations/snapshots/helper.sql | snapshot_staging_table() | Yes | +| materializations/snapshots/snapshot_merge.sql | snapshot_merge_sql() | Yes | +| materializations/snapshots/strategies.sql | snapshot_get_time() | Yes | +| materializations/snapshots/strategies.sql | snapshot_hash_arguments() | Yes | +| materializations/snapshots/strategies.sql | snapshot_string_as_time() | Yes | +| materializations/configs.sql | set_sql_header() | Yes | +| materializations/configs.sql | should_full_refresh() | Yes | +| materializations/configs.sql | should_store_failures() | Yes | +| materializations/hooks.sql | run_hooks() | Yes | +| materializations/hooks.sql | make_hook_config() | Yes | +| materializations/hooks.sql | before_begin() | Yes | +| materializations/hooks.sql | in_transaction() | Yes | +| materializations/hooks.sql | after_commit() | Yes | \ No newline at end of file diff --git a/dbt/include/vertica/macros/adapters/columns.sql b/dbt/include/vertica/macros/adapters/columns.sql new file mode 100644 index 0000000..8eb0158 --- /dev/null +++ b/dbt/include/vertica/macros/adapters/columns.sql @@ -0,0 +1,59 @@ +{% macro vertica__get_columns_in_relation(relation) -%} + {% call statement('get_columns_in_relation', fetch_result=True) %} + select + column_name + , data_type + , character_maximum_length + , numeric_precision + , numeric_scale + from ( + select + column_name + , data_type + , character_maximum_length + , numeric_precision + , numeric_scale + , ordinal_position + from v_catalog.columns + where table_schema = '{{ relation.schema }}' + and table_name = '{{ relation.identifier }}' + union all + select + column_name + , data_type + , character_maximum_length + , numeric_precision + , numeric_scale + , ordinal_position + from v_catalog.view_columns + where table_schema = '{{ relation.schema }}' + and table_name = '{{ relation.identifier }}' + ) t + order by ordinal_position + {% endcall %} + {% set table = load_result('get_columns_in_relation').table %} + {{ return(sql_convert_columns_in_relation(table)) }} +{% endmacro %} + + +{% macro vertica__sql_convert_columns_in_relation(table) -%} + {{ exceptions.raise_not_implemented( + 'sql_convert_columns_in_relation macro not implemented for adapter '+adapter.type()) }} +{%- endmacro %} + + +{% macro vertica__alter_column_type(relation, column_name, new_column_type) -%} + {{ exceptions.raise_not_implemented( + 'alter_column_type macro not implemented for adapter '+adapter.type()) }} +{%- endmacro %} + + +{% macro vertica__alter_relation_add_remove_columns(relation, column_name, new_column_type) -%} + {{ exceptions.raise_not_implemented( + 'alter_relation_add_remove_columns macro not implemented for adapter '+adapter.type()) }} +{%- endmacro %} + + +{# + No need to implement get_columns_in_query(). Syntax supported by default. +#} \ No newline at end of file diff --git a/dbt/include/vertica/macros/adapters/freshness.sql b/dbt/include/vertica/macros/adapters/freshness.sql new file mode 100644 index 0000000..0f90672 --- /dev/null +++ b/dbt/include/vertica/macros/adapters/freshness.sql @@ -0,0 +1,9 @@ +{% macro vertica__current_timestamp() -%} + current_timestamp +{%- endmacro %} + + +{% macro vertica__collect_freshness() -%} + {{ exceptions.raise_not_implemented( + 'collect_freshness macro not implemented for adapter '+adapter.type()) }} +{%- endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/adapters/indexes.sql b/dbt/include/vertica/macros/adapters/indexes.sql new file mode 100644 index 0000000..9019bf3 --- /dev/null +++ b/dbt/include/vertica/macros/adapters/indexes.sql @@ -0,0 +1,9 @@ +{% macro vertica__get_create_index_sql(relation, index_dict) -%} + {{ exceptions.raise_not_implemented( + 'get_create_index_sql macro not implemented for adapter '+adapter.type()) }} +{%- endmacro %} + + +{# + No need to implement create_indexes(). Syntax supported by default. +#} \ No newline at end of file diff --git a/dbt/include/vertica/macros/adapters.sql b/dbt/include/vertica/macros/adapters/metadata.sql similarity index 51% rename from dbt/include/vertica/macros/adapters.sql rename to dbt/include/vertica/macros/adapters/metadata.sql index f31f015..829b37c 100644 --- a/dbt/include/vertica/macros/adapters.sql +++ b/dbt/include/vertica/macros/adapters/metadata.sql @@ -1,140 +1,3 @@ - -{% macro vertica__information_schema_name(database) -%} - {%- if database -%} - {{ adapter.quote_as_configured(database, 'database') }}.v_catalog - {%- else -%} - v_catalog - {%- endif -%} -{%- endmacro %} - -{% macro vertica__list_schemas(database) %} - {% call statement('list_schemas', fetch_result=True, auto_begin=False) %} - select schema_name - from v_catalog.schemata - {% endcall %} - {{ return(load_result('list_schemas').table) }} -{% endmacro %} - -{% macro vertica__check_schema_exists(database, schema) -%} - {% call statement('check_schema_exists', fetch_result=True, auto_begin=False) -%} - select count(*) - from v_catalog.schemata - where schema_name='{{ schema }}' - {%- endcall %} - {{ return(load_result('check_schema_exists').table) }} -{% endmacro %} - -{% macro vertica__drop_schema(relation) -%} - {% call statement('drop_schema') -%} - drop schema if exists {{ relation.without_identifier().include(database=False) }} cascade - {% endcall %} -{% endmacro %} - -{% macro vertica__create_schema(relation) -%} - {%- call statement('create_schema') -%} - create schema if not exists {{ relation.without_identifier().include(database=False) }} - {% endcall %} -{% endmacro %} - -{% macro vertica__list_relations_without_caching(schema_relation) %} - {% call statement('list_relations_without_caching', fetch_result=True) -%} - select - '{{ schema_relation.database }}' as database, - table_name as name, - table_schema as schema, - 'table' as type - from v_catalog.tables - where table_schema ilike '{{ schema_relation.schema }}' - union all - select - '{{ schema_relation.database }}' as database, - table_name as name, - table_schema as schema, - 'view' as type - from v_catalog.views - where table_schema ilike '{{ schema_relation.schema }}' - {% endcall %} - {{ return(load_result('list_relations_without_caching').table) }} - {% endmacro %} - -{% macro vertica__rename_relation(from_relation, to_relation) %} - {% set target_name = adapter.quote_as_configured(to_relation.identifier, 'identifier') %} - {% call statement('rename_relation') -%} - alter {{ from_relation.type }} {{ from_relation }} rename to {{ target_name }} - {%- endcall %} -{% endmacro %} - - -{% macro vertica__get_columns_in_relation(relation) -%} - {% call statement('get_columns_in_relation', fetch_result=True) %} - select - column_name - , data_type - , character_maximum_length - , numeric_precision - , numeric_scale - from ( - select - column_name - , data_type - , character_maximum_length - , numeric_precision - , numeric_scale - , ordinal_position - from v_catalog.columns - where table_schema = '{{ relation.schema }}' - and table_name = '{{ relation.identifier }}' - union all - select - column_name - , data_type - , character_maximum_length - , numeric_precision - , numeric_scale - , ordinal_position - from v_catalog.view_columns - where table_schema = '{{ relation.schema }}' - and table_name = '{{ relation.identifier }}' - ) t - order by ordinal_position - {% endcall %} - {% set table = load_result('get_columns_in_relation').table %} - {{ return(sql_convert_columns_in_relation(table)) }} -{% endmacro %} - -{% macro vertica__create_view_as(relation, sql) %} - {% set sql_header = config.get('sql_header', none) %} - - {{ sql_header if sql_header is not none }} - create or replace view {{ relation }} as ( - {{ sql }} - ); - -{% endmacro %} - -{% macro vertica__create_table_as(temporary, relation, sql) -%} - {%- set sql_header = config.get('sql_header', none) -%} - - {{ sql_header if sql_header is not none }} - - create {% if temporary: -%}local temporary{%- endif %} table - {{ relation.include(database=(not temporary), schema=(not temporary)) }} - {% if temporary: -%}on commit preserve rows{%- endif %} - as ( - {{ sql }} - ); -{% endmacro %} - -{% macro vertica__make_temp_relation(base_relation, suffix) %} - {% set tmp_identifier = base_relation.identifier ~ suffix %} - {% do return(base_relation.incorporate( - path={ - "identifier": tmp_identifier, - "schema": none, - "database": none - })) -%} -{% endmacro %} - {% macro vertica__get_catalog(information_schema, schemas) -%} {% call statement('get_catalog', fetch_result=True) %} @@ -185,6 +48,52 @@ {{ return(load_result('get_catalog').table) }} {% endmacro %} -{% macro vertica__current_timestamp() -%} - current_timestamp + +{% macro vertica__information_schema_name(database) -%} + {%- if database -%} + {{ adapter.quote_as_configured(database, 'database') }}.v_catalog + {%- else -%} + v_catalog + {%- endif -%} {%- endmacro %} + + +{% macro vertica__list_schemas(database) %} + {% call statement('list_schemas', fetch_result=True, auto_begin=False) %} + select schema_name + from v_catalog.schemata + {% endcall %} + {{ return(load_result('list_schemas').table) }} +{% endmacro %} + + +{% macro vertica__check_schema_exists(database, schema) -%} + {% call statement('check_schema_exists', fetch_result=True, auto_begin=False) -%} + select count(*) + from v_catalog.schemata + where schema_name='{{ schema }}' + {%- endcall %} + {{ return(load_result('check_schema_exists').table) }} +{% endmacro %} + + +{% macro vertica__list_relations_without_caching(schema_relation) %} + {% call statement('list_relations_without_caching', fetch_result=True) -%} + select + '{{ schema_relation.database }}' as database, + table_name as name, + table_schema as schema, + 'table' as type + from v_catalog.tables + where table_schema ilike '{{ schema_relation.schema }}' + union all + select + '{{ schema_relation.database }}' as database, + table_name as name, + table_schema as schema, + 'view' as type + from v_catalog.views + where table_schema ilike '{{ schema_relation.schema }}' + {% endcall %} + {{ return(load_result('list_relations_without_caching').table) }} +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/adapters/persist_docs.sql b/dbt/include/vertica/macros/adapters/persist_docs.sql new file mode 100644 index 0000000..ae81603 --- /dev/null +++ b/dbt/include/vertica/macros/adapters/persist_docs.sql @@ -0,0 +1,15 @@ +{% macro vertica__alter_column_comment(relation, column_dict) -%} + {{ exceptions.raise_not_implemented( + 'alter_column_comment macro not implemented for adapter '+adapter.type()) }} +{%- endmacro %} + + +{% macro vertica__alter_relation_comment(relation, relation_comment) -%} + {{ exceptions.raise_not_implemented( + 'alter_relation_comment macro not implemented for adapter '+adapter.type()) }} +{%- endmacro %} + + +{# + No need to implement persist_docs(). Syntax supported by default. +#} \ No newline at end of file diff --git a/dbt/include/vertica/macros/adapters/relation.sql b/dbt/include/vertica/macros/adapters/relation.sql new file mode 100644 index 0000000..c0121bd --- /dev/null +++ b/dbt/include/vertica/macros/adapters/relation.sql @@ -0,0 +1,30 @@ +{% macro vertica__make_temp_relation(base_relation, suffix) %} + {% set tmp_identifier = base_relation.identifier ~ suffix %} + {% do return(base_relation.incorporate( + path={ + "identifier": tmp_identifier, + "schema": none, + "database": none + })) -%} +{% endmacro %} + + +{% macro vertica__rename_relation(from_relation, to_relation) %} + {% set target_name = adapter.quote_as_configured(to_relation.identifier, 'identifier') %} + {% call statement('rename_relation') -%} + alter {{ from_relation.type }} {{ from_relation }} rename to {{ target_name }} + {%- endcall %} +{% endmacro %} + + +{% macro vertica__load_relation(relation) -%} + {{ exceptions.raise_not_implemented( + 'load_relation macro not implemented for adapter '+adapter.type()) }} +{%- endmacro %} + + +{# + No need to implement drop_relation(). Syntax supported by default. + No need to implement drop_relation_if_exists(). Syntax supported by default. + No need to implement get_or_create_relation(). Syntax supported by default. +#} diff --git a/dbt/include/vertica/macros/adapters/schema.sql b/dbt/include/vertica/macros/adapters/schema.sql new file mode 100644 index 0000000..914fca0 --- /dev/null +++ b/dbt/include/vertica/macros/adapters/schema.sql @@ -0,0 +1,12 @@ +{% macro vertica__create_schema(relation) -%} + {%- call statement('create_schema') -%} + create schema if not exists {{ relation.without_identifier().include(database=False) }} + {% endcall %} +{% endmacro %} + + +{% macro vertica__drop_schema(relation) -%} + {% call statement('drop_schema') -%} + drop schema if exists {{ relation.without_identifier().include(database=False) }} cascade + {% endcall %} +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/configs.sql b/dbt/include/vertica/macros/materializations/configs.sql new file mode 100644 index 0000000..88a51c0 --- /dev/null +++ b/dbt/include/vertica/macros/materializations/configs.sql @@ -0,0 +1,5 @@ +{# + No need to implement set_sql_header(). Syntax supported by default. + No need to implement should_full_refresh(). Syntax supported by default. + No need to implement should_store_failures(). Syntax supported by default. +#} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/hooks.sql b/dbt/include/vertica/macros/materializations/hooks.sql new file mode 100644 index 0000000..cdcd2de --- /dev/null +++ b/dbt/include/vertica/macros/materializations/hooks.sql @@ -0,0 +1,7 @@ +{# + No need to implement run_hooks(). Syntax supported by default. + No need to implement make_hook_config(). Syntax supported by default. + No need to implement before_begin(). Syntax supported by default. + No need to implement in_transaction(). Syntax supported by default. + No need to implement after_commit(). Syntax supported by default. +#} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/incremental.sql b/dbt/include/vertica/macros/materializations/incremental.sql deleted file mode 100644 index 910f4ea..0000000 --- a/dbt/include/vertica/macros/materializations/incremental.sql +++ /dev/null @@ -1,126 +0,0 @@ -{% macro vertica__validate_get_incremental_strategy(config) %} - {#-- Find and validate the incremental strategy #} - {%- set strategy = config.get("incremental_strategy", default="merge") -%} - - {% set invalid_strategy_msg -%} - Invalid incremental strategy provided: {{ strategy }} - Expected one of: 'merge', 'delete+insert' - {%- endset %} - {% if strategy not in ['merge', 'delete+insert'] %} - {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} - {% endif %} - - {% do return(strategy) %} -{% endmacro %} - -{% macro vertica__get_incremental_sql(strategy, tmp_relation, target_relation, dest_columns) %} - {% if strategy == 'merge' %} - {% do return(vertica__get_merge_sql(target_relation, tmp_relation, dest_columns)) %} - {% elif strategy == 'delete+insert' %} - {% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, dest_columns)) %} - {% else %} - {% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %} - {% endif %} -{% endmacro %} - - -{% macro vertica__get_merge_sql(target_relation, tmp_relation, dest_columns) %} - {%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} - {%- set merge_columns = config.get("merge_columns", default=None)%} - - merge into {{ target_relation }} as DBT_INTERNAL_DEST - using {{ tmp_relation }} as DBT_INTERNAL_SOURCE - - {#-- Test 1, find the provided merge columns #} - {% if merge_columns %} - on - {% for column in merge_columns %} - DBT_INTERNAL_DEST.{{ adapter.quote(column) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column) }} - {%- if not loop.last %} AND {% endif %} - {%- endfor %} - - {#-- Test 2, use all columns in the destination table #} - {% else %} - on - {% for column in dest_columns -%} - DBT_INTERNAL_DEST.{{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} - {%- if not loop.last %} AND {% endif %} - {%- endfor %} - - {% endif %} - - when matched then update set - {% for column in dest_columns -%} - {{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} - {%- if not loop.last %}, {% endif %} - {%- endfor %} - - when not matched then insert - ({{ dest_columns_csv }}) - values - ( - {% for column in dest_columns -%} - DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} - {%- if not loop.last %}, {% endif %} - {%- endfor %} - ) - -{%- endmacro %} - -{% materialization incremental, adapter='vertica' %} - - {% set full_refresh_mode = flags.FULL_REFRESH %} - - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} - - {#-- Validate early so we don't run SQL if the strategy is invalid --#} - {% set strategy = vertica__validate_get_incremental_strategy(config) %} - - -- setup - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - {% set to_drop = [] %} - {% if existing_relation is none %} - {% set build_sql = vertica__create_table_as(False, target_relation, sql) %} - {% elif existing_relation.is_view or full_refresh_mode %} - {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} - {% do adapter.drop_relation(backup_relation) %} - - {% do adapter.rename_relation(existing_relation, backup_relation) %} - {% set build_sql = vertica__create_table_as(False, target_relation, sql) %} - {% do to_drop.append(backup_relation) %} - {% else %} - {% set tmp_relation = make_temp_relation(target_relation) %} - {% do run_query(vertica__create_table_as(True, tmp_relation, sql)) %} - {% do adapter.expand_target_column_types( - from_relation=tmp_relation, - to_relation=target_relation) %} - {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} - {% set build_sql = vertica__get_incremental_sql(strategy, tmp_relation, target_relation, dest_columns) %} - {% endif %} - - {% call statement("main") %} - {{ build_sql }} - {% endcall %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - -- `COMMIT` happens here - {% do adapter.commit() %} - - {% for rel in to_drop %} - {% do adapter.drop_relation(rel) %} - {% endfor %} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {{ return({'relations': [target_relation]}) }} - -{%- endmaterialization %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/models/incremental/helpers.sql b/dbt/include/vertica/macros/materializations/models/incremental/helpers.sql new file mode 100644 index 0000000..d87a825 --- /dev/null +++ b/dbt/include/vertica/macros/materializations/models/incremental/helpers.sql @@ -0,0 +1,25 @@ +{% macro vertica__validate_get_incremental_strategy(config) %} + {#-- Find and validate the incremental strategy #} + {%- set strategy = config.get("incremental_strategy", default="merge") -%} + + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ strategy }} + Expected one of: 'merge', 'delete+insert' + {%- endset %} + {% if strategy not in ['merge', 'delete+insert'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {% endif %} + + {% do return(strategy) %} +{% endmacro %} + + +{% macro vertica__get_incremental_sql(strategy, tmp_relation, target_relation, dest_columns) %} + {% if strategy == 'merge' %} + {% do return(vertica__get_merge_sql(target_relation, tmp_relation, dest_columns)) %} + {% elif strategy == 'delete+insert' %} + {% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, dest_columns)) %} + {% else %} + {% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %} + {% endif %} +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/models/incremental/incremental.sql b/dbt/include/vertica/macros/materializations/models/incremental/incremental.sql new file mode 100644 index 0000000..2732f00 --- /dev/null +++ b/dbt/include/vertica/macros/materializations/models/incremental/incremental.sql @@ -0,0 +1,57 @@ +{% materialization incremental, adapter='vertica' %} + + {% set full_refresh_mode = flags.FULL_REFRESH %} + + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} + + {#-- Validate early so we don't run SQL if the strategy is invalid --#} + {% set strategy = vertica__validate_get_incremental_strategy(config) %} + + -- setup + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set to_drop = [] %} + {% if existing_relation is none %} + {% set build_sql = vertica__create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view or full_refresh_mode %} + {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} + {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} + {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} + {% do adapter.drop_relation(backup_relation) %} + + {% do adapter.rename_relation(existing_relation, backup_relation) %} + {% set build_sql = vertica__create_table_as(False, target_relation, sql) %} + {% do to_drop.append(backup_relation) %} + {% else %} + {% set tmp_relation = make_temp_relation(target_relation) %} + {% do run_query(vertica__create_table_as(True, tmp_relation, sql)) %} + {% do adapter.expand_target_column_types( + from_relation=tmp_relation, + to_relation=target_relation) %} + {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} + {% set build_sql = vertica__get_incremental_sql(strategy, tmp_relation, target_relation, dest_columns) %} + {% endif %} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {% do adapter.commit() %} + + {% for rel in to_drop %} + {% do adapter.drop_relation(rel) %} + {% endfor %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/models/incremental/merge.sql b/dbt/include/vertica/macros/materializations/models/incremental/merge.sql new file mode 100644 index 0000000..b02478a --- /dev/null +++ b/dbt/include/vertica/macros/materializations/models/incremental/merge.sql @@ -0,0 +1,48 @@ +{% macro vertica__get_merge_sql(target_relation, tmp_relation, dest_columns) %} + {%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set merge_columns = config.get("merge_columns", default=None)%} + + merge into {{ target_relation }} as DBT_INTERNAL_DEST + using {{ tmp_relation }} as DBT_INTERNAL_SOURCE + + {#-- Test 1, find the provided merge columns #} + {% if merge_columns %} + on + {% for column in merge_columns %} + DBT_INTERNAL_DEST.{{ adapter.quote(column) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column) }} + {%- if not loop.last %} AND {% endif %} + {%- endfor %} + {#-- Test 2, use all columns in the destination table #} + {% else %} + on + {% for column in dest_columns -%} + DBT_INTERNAL_DEST.{{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} + {%- if not loop.last %} AND {% endif %} + {%- endfor %} + {% endif %} + + when matched then update set + {% for column in dest_columns -%} + {{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} + {%- if not loop.last %}, {% endif %} + {%- endfor %} + + when not matched then insert + ({{ dest_columns_csv }}) + values + ( + {% for column in dest_columns -%} + DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} + {%- if not loop.last %}, {% endif %} + {%- endfor %} + ) +{%- endmacro %} + + +{# No need to implement get_delete_insert_merge_sql(). Syntax supported by default. #} + + +{% macro vertica__get_insert_overwrite_merge_sql() -%} + {{ exceptions.raise_not_implemented( + 'get_insert_overwrite_merge_sql macro not implemented for adapter '+adapter.type()) }} +{%- endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql b/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql new file mode 100644 index 0000000..0815e24 --- /dev/null +++ b/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql @@ -0,0 +1,12 @@ +{% macro vertica__create_table_as(temporary, relation, sql) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none }} + + create {% if temporary: -%}local temporary{%- endif %} table + {{ relation.include(database=(not temporary), schema=(not temporary)) }} + {% if temporary: -%}on commit preserve rows{%- endif %} + as ( + {{ sql }} + ); +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/table.sql b/dbt/include/vertica/macros/materializations/models/table/table.sql similarity index 98% rename from dbt/include/vertica/macros/materializations/table.sql rename to dbt/include/vertica/macros/materializations/models/table/table.sql index a85379e..9460220 100644 --- a/dbt/include/vertica/macros/materializations/table.sql +++ b/dbt/include/vertica/macros/materializations/models/table/table.sql @@ -58,4 +58,4 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} -{% endmaterialization %} +{% endmaterialization %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/models/view/create_or_replace_view.sql b/dbt/include/vertica/macros/materializations/models/view/create_or_replace_view.sql new file mode 100644 index 0000000..38691d4 --- /dev/null +++ b/dbt/include/vertica/macros/materializations/models/view/create_or_replace_view.sql @@ -0,0 +1,8 @@ +{% macro vertica__create_or_replace_view(relation, sql) %} + {% set sql_header = config.get('sql_header', none) %} + + {{ sql_header if sql_header is not none }} + create or replace view {{ relation }} as ( + {{ sql }} + ); +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/models/view/create_view_as.sql b/dbt/include/vertica/macros/materializations/models/view/create_view_as.sql new file mode 100644 index 0000000..8fd3f18 --- /dev/null +++ b/dbt/include/vertica/macros/materializations/models/view/create_view_as.sql @@ -0,0 +1,13 @@ +{% macro vertica__get_create_view_as_sql(relation, sql) -%} + {{ return(create_view_as(relation, sql)) }} +{% endmacro %} + + +{% macro vertica__create_view_as(relation, sql) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none }} + create view {{ relation }} as ( + {{ sql }} + ); +{%- endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/models/view/view.sql b/dbt/include/vertica/macros/materializations/models/view/view.sql new file mode 100644 index 0000000..00d1924 --- /dev/null +++ b/dbt/include/vertica/macros/materializations/models/view/view.sql @@ -0,0 +1,72 @@ +{%- materialization view, default -%} + + {%- set identifier = model['alias'] -%} + {%- set tmp_identifier = model['name'] + '__dbt_tmp' -%} + {%- set backup_identifier = model['name'] + '__dbt_backup' -%} + + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, + type='view') -%} + {%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier, + schema=schema, database=database, type='view') -%} + -- the intermediate_relation should not already exist in the database; get_relation + -- will return None in that case. Otherwise, we get a relation that we can drop + -- later, before we try to use this name for the current operation + {%- set preexisting_intermediate_relation = adapter.get_relation(identifier=tmp_identifier, + schema=schema, + database=database) -%} + /* + This relation (probably) doesn't exist yet. If it does exist, it's a leftover from + a previous run, and we're going to try to drop it immediately. At the end of this + materialization, we're going to rename the "old_relation" to this identifier, + and then we're going to drop it. In order to make sure we run the correct one of: + - drop view ... + - drop table ... + We need to set the type of this relation to be the type of the old_relation, if it exists, + or else "view" as a sane default if it does not. Note that if the old_relation does not + exist, then there is nothing to move out of the way and subsequentally drop. In that case, + this relation will be effectively unused. + */ + {%- set backup_relation_type = 'view' if old_relation is none else old_relation.type -%} + {%- set backup_relation = api.Relation.create(identifier=backup_identifier, + schema=schema, database=database, + type=backup_relation_type) -%} + -- as above, the backup_relation should not already exist + {%- set preexisting_backup_relation = adapter.get_relation(identifier=backup_identifier, + schema=schema, + database=database) -%} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- drop the temp relations if they exist already in the database + {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ drop_relation_if_exists(preexisting_backup_relation) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + -- build model + {% call statement('main') -%} + {{ create_view_as(intermediate_relation, sql) }} + {%- endcall %} + + -- cleanup + -- move the existing view out of the way + {% if old_relation is not none %} + {{ adapter.rename_relation(old_relation, backup_relation) }} + {% endif %} + {{ adapter.rename_relation(intermediate_relation, target_relation) }} + + {% do persist_docs(target_relation, model) %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {{ adapter.commit() }} + + {{ drop_relation_if_exists(backup_relation) }} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization -%} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/seed.sql b/dbt/include/vertica/macros/materializations/seeds/helpers.sql similarity index 58% rename from dbt/include/vertica/macros/materializations/seed.sql rename to dbt/include/vertica/macros/materializations/seeds/helpers.sql index 53e35bc..2288809 100644 --- a/dbt/include/vertica/macros/materializations/seed.sql +++ b/dbt/include/vertica/macros/materializations/seeds/helpers.sql @@ -1,4 +1,3 @@ - {% macro vertica__create_csv_table(model, agate_table) %} {%- set column_override = model['config'].get('column_types', {}) -%} {%- set quote_seed_column = model['config'].get('quote_columns', None) -%} @@ -17,10 +16,12 @@ {{ return(sql) }} {% endmacro %} + {% macro vertica__load_csv_rows(model, agate_table) %} {{ return(copy_local_load_csv_rows(model, agate_table) )}} {% endmacro %} + {% macro basic_load_csv_rows(model, batch_size, agate_table) %} {% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %} {% set bindings = [] %} @@ -57,6 +58,7 @@ {{ return(statements[0]) }} {% endmacro %} + {% macro copy_local_load_csv_rows(model, agate_table) %} {% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %} @@ -74,54 +76,10 @@ {{ return(sql) }} {% endmacro %} -{% materialization seed, adapter='vertica' %} - - {%- set identifier = model['alias'] -%} - {%- set full_refresh_mode = (should_full_refresh()) -%} - - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - - {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} - {%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%} - - {%- set agate_table = load_agate_table() -%} - {%- do store_result('agate_table', response='OK', agate_table=agate_table) -%} - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - -- build model - {% set create_table_sql = "" %} - {% if exists_as_view %} - {{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a view".format(old_relation)) }} - {% elif exists_as_table %} - {% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %} - {% else %} - {% set create_table_sql = create_csv_table(model, agate_table) %} - {% endif %} - - {% set status = 'CREATE' if full_refresh_mode else 'INSERT' %} - {% set num_rows = (agate_table.rows | length) %} - {% set sql = load_csv_rows(model, agate_table) %} - - {% call statement('main') %} - {{ create_table_sql }}; - -- dbt seed -- - {{ sql }} - {% endcall %} - - {% set target_relation = this.incorporate(type='table') %} - {% do persist_docs(target_relation, model) %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - -- `COMMIT` happens here - {{ adapter.commit() }} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {{ return({'relations': [target_relation]}) }} -{% endmaterialization %} +{# + No need to implement reset_csv_table(). Syntax supported by default. + No need to implement get_binding_char(). Syntax supported by default. + No need to implement get_batch_size(). Syntax supported by default. + No need to implement get_seed_column_quoted_csv(). Syntax supported by default. +#} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/seeds/seed.sql b/dbt/include/vertica/macros/materializations/seeds/seed.sql new file mode 100644 index 0000000..4f7d478 --- /dev/null +++ b/dbt/include/vertica/macros/materializations/seeds/seed.sql @@ -0,0 +1,55 @@ +{% materialization seed, adapter='vertica' %} + + {%- set identifier = model['alias'] -%} + {%- set full_refresh_mode = (should_full_refresh()) -%} + + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + + {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} + {%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%} + + {%- set agate_table = load_agate_table() -%} + {%- do store_result('agate_table', response='OK', agate_table=agate_table) -%} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + -- build model + {% set create_table_sql = "" %} + {% if exists_as_view %} + {{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a view".format(old_relation)) }} + {% elif exists_as_table %} + {% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %} + {% else %} + {% set create_table_sql = create_csv_table(model, agate_table) %} + {% endif %} + + {% set status = 'CREATE' if full_refresh_mode else 'INSERT' %} + {% set num_rows = (agate_table.rows | length) %} + {% set sql = load_csv_rows(model, agate_table) %} + + {% call statement('main') %} + {{ create_table_sql }}; + -- dbt seed -- + {{ sql }} + {% endcall %} + + {% set target_relation = this.incorporate(type='table') %} + {% do persist_docs(target_relation, model) %} + + {% if full_refresh_mode or not exists_as_table %} + {% do create_indexes(target_relation) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {{ adapter.commit() }} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/snapshots/helper.sql b/dbt/include/vertica/macros/materializations/snapshots/helper.sql new file mode 100644 index 0000000..a00c049 --- /dev/null +++ b/dbt/include/vertica/macros/materializations/snapshots/helper.sql @@ -0,0 +1,6 @@ +{# + No need to implement create_columns(). Syntax supported by default. + No need to implement post_snapshot(). No-op. + No need to implement snapshot_staging_table(). Syntax supported by default. + No need to implement build_snapshot_table(). Syntax supported by default. +#} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/snapshots/snapshot_merge.sql b/dbt/include/vertica/macros/materializations/snapshots/snapshot_merge.sql index c93d894..3855403 100644 --- a/dbt/include/vertica/macros/materializations/snapshots/snapshot_merge.sql +++ b/dbt/include/vertica/macros/materializations/snapshots/snapshot_merge.sql @@ -1,34 +1,29 @@ -{% macro snapshot_merge_sql(target, source, insert_cols) -%} - +{% macro vertica__snapshot_merge_sql(target, source, insert_cols_csv) -%} + {% if insert_cols %} {%- set insert_cols_csv = insert_cols | join(', ') -%} {% else %} - {%- set insert_cols_csv = get_quoted_csv(vertica__get_columns_in_relation(target) | map(attribute="name")) %} + {%- set insert_cols_csv = get_quoted_csv(get_columns_in_relation(target) | map(attribute="name")) %} {% endif %} - - {{ adapter.dispatch('snapshot_merge_sql')(target, source, insert_cols_csv) }} -{%- endmacro %} - -{% macro vertica__snapshot_merge_sql(target, source, insert_cols_csv) -%} - merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source }} as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id - when matched - and DBT_INTERNAL_DEST.dbt_valid_to is null - and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') - then update - set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + when matched + and DBT_INTERNAL_DEST.dbt_valid_to is null + and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') + then update + set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to - when not matched - and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' - then insert ({{ insert_cols_csv }}) - values ( - {% for column in vertica__get_columns_in_relation(target) | map(attribute="name") -%} - DBT_INTERNAL_SOURCE.{{ column }} - {%- if not loop.last %}, {% endif %} - {%- endfor %} - ) + when not matched + and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' + then insert ({{ insert_cols_csv }}) + values ( + {% for column in vertica__get_columns_in_relation(target) | map(attribute="name") -%} + DBT_INTERNAL_SOURCE.{{ column }} + {%- if not loop.last %}, {% endif %} + {%- endfor %} + ) {% endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/snapshots/strategies.sql b/dbt/include/vertica/macros/materializations/snapshots/strategies.sql index 3047cd3..24ee913 100644 --- a/dbt/include/vertica/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/vertica/macros/materializations/snapshots/strategies.sql @@ -1,45 +1,7 @@ -{# - Dispatch strategies by name, optionally qualified to a package -#} -{% macro strategy_dispatch(name) -%} -{% set original_name = name %} - {% if '.' in name %} - {% set package_name, name = name.split(".", 1) %} - {% else %} - {% set package_name = none %} - {% endif %} - - {% if package_name is none %} - {% set package_context = context %} - {% elif package_name in context %} - {% set package_context = context[package_name] %} - {% else %} - {% set error_msg %} - Could not find package '{{package_name}}', called with '{{original_name}}' - {% endset %} - {{ exceptions.raise_compiler_error(error_msg | trim) }} - {% endif %} - - {%- set search_name = 'snapshot_' ~ name ~ '_strategy' -%} - - {% if search_name not in package_context %} - {% set error_msg %} - The specified strategy macro '{{name}}' was not found in package '{{ package_name }}' - {% endset %} - {{ exceptions.raise_compiler_error(error_msg | trim) }} - {% endif %} - {{ return(package_context[search_name]) }} -{%- endmacro %} - - {# Create SCD Hash SQL fields cross-db #} -{% macro snapshot_hash_arguments(args) -%} - {{ adapter.dispatch('snapshot_hash_arguments', 'dbt')(args) }} -{%- endmacro %} - -{% macro default__snapshot_hash_arguments(args) -%} +{% macro vertica__snapshot_hash_arguments(args) -%} md5({%- for arg in args -%} coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %} @@ -50,135 +12,12 @@ {# Get the current time cross-db #} -{% macro snapshot_get_time() -%} - {{ adapter.dispatch('snapshot_get_time', 'dbt')() }} -{%- endmacro %} - -{% macro default__snapshot_get_time() -%} +{% macro vertica__snapshot_get_time() -%} {{ current_timestamp() }} {%- endmacro %} -{# - Core strategy definitions -#} -{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set primary_key = config['unique_key'] %} - {% set updated_at = config['updated_at'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} - - {#/* - The snapshot relation might not have an {{ updated_at }} value if the - snapshot strategy is changed from `check` to `timestamp`. We - should use a dbt-created column for the comparison in the snapshot - table instead of assuming that the user-supplied {{ updated_at }} - will be present in the historical data. - - See https://github.com/dbt-labs/dbt-core/issues/2350 - */ #} - {% set row_changed_expr -%} - ({{ snapshotted_rel }}.dbt_valid_from < {{ current_rel }}.{{ updated_at }}) - {%- endset %} - - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} - - {% do return({ - "unique_key": primary_key, - "updated_at": updated_at, - "row_changed": row_changed_expr, - "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes - }) %} -{% endmacro %} - - -{% macro snapshot_string_as_time(timestamp) -%} - {{ adapter.dispatch('snapshot_string_as_time')(timestamp) }} -{%- endmacro %} - {% macro vertica__snapshot_string_as_time(timestamp) %} {%- set result = "('" ~ timestamp ~ "'::timestamptz)" -%} {{ return(result) }} -{% endmacro %} - - -{% macro snapshot_check_all_get_existing_columns(node, target_exists) -%} - {%- set query_columns = get_columns_in_query(node['compiled_sql']) -%} - {%- if not target_exists -%} - {# no table yet -> return whatever the query does #} - {{ return([false, query_columns]) }} - {%- endif -%} - {# handle any schema changes #} - {%- set target_table = node.get('alias', node.get('name')) -%} - {%- set target_relation = adapter.get_relation(database=node.database, schema=node.schema, identifier=target_table) -%} - {%- set existing_cols = get_columns_in_query('select * from ' ~ target_relation) -%} - {%- set ns = namespace() -%} {# handle for-loop scoping with a namespace #} - {%- set ns.column_added = false -%} - - {%- set intersection = [] -%} - {%- for col in query_columns -%} - {%- if col in existing_cols -%} - {%- do intersection.append(col) -%} - {%- else -%} - {% set ns.column_added = true %} - {%- endif -%} - {%- endfor -%} - {{ return([ns.column_added, intersection]) }} -{%- endmacro %} - - -{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set check_cols_config = config['check_cols'] %} - {% set primary_key = config['unique_key'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} - - {% set select_current_time -%} - select {{ snapshot_get_time() }} as snapshot_start - {%- endset %} - - {#-- don't access the column by name, to avoid dealing with casing issues on snowflake #} - {%- set now = run_query(select_current_time)[0][0] -%} - {% if now is none or now is undefined -%} - {%- do exceptions.raise_compiler_error('Could not get a snapshot start time from the database') -%} - {%- endif %} - {% set updated_at = config.get('updated_at', snapshot_string_as_time(now)) %} - - {% set column_added = false %} - - {% if check_cols_config == 'all' %} - {% set column_added, check_cols = snapshot_check_all_get_existing_columns(node, target_exists) %} - {% elif check_cols_config is iterable and (check_cols_config | length) > 0 %} - {% set check_cols = check_cols_config %} - {% else %} - {% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %} - {% endif %} - - {%- set row_changed_expr -%} - ( - {%- if column_added -%} - TRUE - {%- else -%} - {%- for col in check_cols -%} - {{ snapshotted_rel }}.{{ col }} != {{ current_rel }}.{{ col }} - or - ( - (({{ snapshotted_rel }}.{{ col }} is null) and not ({{ current_rel }}.{{ col }} is null)) - or - ((not {{ snapshotted_rel }}.{{ col }} is null) and ({{ current_rel }}.{{ col }} is null)) - ) - {%- if not loop.last %} or {% endif -%} - {%- endfor -%} - {%- endif -%} - ) - {%- endset %} - - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} - - {% do return({ - "unique_key": primary_key, - "updated_at": updated_at, - "row_changed": row_changed_expr, - "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes - }) %} {% endmacro %} \ No newline at end of file diff --git a/dbt/include/vertica/macros/materializations/view.sql b/dbt/include/vertica/macros/materializations/view.sql deleted file mode 100644 index 7c0e67b..0000000 --- a/dbt/include/vertica/macros/materializations/view.sql +++ /dev/null @@ -1,3 +0,0 @@ -{% materialization view, adapter='vertica' %} - {{ return(create_or_replace_view()) }} -{% endmaterialization %} diff --git a/setup.py b/setup.py index 1f93c99..f1e3182 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import pathlib package_name = "dbt-vertica" -package_version = "1.0.2" +package_version = "1.0.3" description = """The vertica adapter plugin for dbt (data build tool)""" HERE = pathlib.Path(__file__).parent @@ -25,7 +25,13 @@ 'dbt': [ 'include/vertica/dbt_project.yml', 'include/vertica/macros/*.sql', + 'include/vertica/macros/adapters/*.sql', 'include/vertica/macros/materializations/*.sql', + 'include/vertica/macros/materializations/models/incremental/*.sql', + 'include/vertica/macros/materializations/models/table/*.sql', + 'include/vertica/macros/materializations/models/view/*.sql', + 'include/vertica/macros/materializations/seeds/*.sql', + 'include/vertica/macros/materializations/snapshots/*.sql', ] }, install_requires=[