From 6b6fbde4112428fa817e32805c5d3684956f5255 Mon Sep 17 00:00:00 2001 From: Benoit Perigaud <8754100+b-per@users.noreply.github.com> Date: Tue, 6 Jun 2023 10:37:03 +0200 Subject: [PATCH] Add support to infer schemas on Snowflake (#211) * Add support to infer schemas on Snowflake * Add tests and example for Snowflake infer schema --- .../plugins/snowflake/prep_external.sql | 8 +++- .../plugins/snowflake/snowflake_external.yml | 37 +++++++++++++++ .../snowflake/create_external_table.sql | 45 ++++++++++++++----- run_test.sh | 4 +- sample_sources/snowflake.yml | 29 +++++++++--- 5 files changed, 101 insertions(+), 22 deletions(-) diff --git a/integration_tests/macros/plugins/snowflake/prep_external.sql b/integration_tests/macros/plugins/snowflake/prep_external.sql index fc9fb68c..b754d5ba 100644 --- a/integration_tests/macros/plugins/snowflake/prep_external.sql +++ b/integration_tests/macros/plugins/snowflake/prep_external.sql @@ -1,18 +1,22 @@ {% macro snowflake__prep_external() %} {% set external_stage = target.schema ~ '.dbt_external_tables_testing' %} + {% set parquet_file_format = target.schema ~ '.dbt_external_tables_testing_parquet' %} - {% set create_external_stage %} + {% set create_external_stage_and_file_format %} begin; create or replace stage {{ external_stage }} url = 's3://dbt-external-tables-testing'; + + create or replace file format {{ parquet_file_format }} type = parquet; commit; {% endset %} {% do log('Creating external stage ' ~ external_stage, info = true) %} - {% do run_query(create_external_stage) %} + {% do log('Creating parquet file format ' ~ parquet_file_format, info = true) %} + {% do run_query(create_external_stage_and_file_format) %} {% endmacro %} diff --git a/integration_tests/models/plugins/snowflake/snowflake_external.yml b/integration_tests/models/plugins/snowflake/snowflake_external.yml index e1a31385..195cca9d 100644 --- a/integration_tests/models/plugins/snowflake/snowflake_external.yml +++ b/integration_tests/models/plugins/snowflake/snowflake_external.yml @@ -115,3 +115,40 @@ sources: data_type: varchar expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)" tests: *same-rowcount + + - name: people_parquet_column_list_unpartitioned + external: &parquet-people + location: '@{{ target.schema }}.dbt_external_tables_testing/parquet' + file_format: '{{ target.schema }}.dbt_external_tables_testing_parquet' + columns: *cols-of-the-people + tests: *equal-to-the-people + + - name: people_parquet_column_list_partitioned + external: + <<: *parquet-people + partitions: *parts-of-the-people + columns: *cols-of-the-people + tests: *equal-to-the-people + + - name: people_parquet_infer_schema_unpartitioned + external: + <<: *parquet-people + infer_schema: true + tests: *equal-to-the-people + + - name: people_parquet_infer_schema_partitioned + external: + <<: *parquet-people + partitions: *parts-of-the-people + infer_schema: true + tests: *equal-to-the-people + + - name: people_parquet_infer_schema_partitioned_and_column_desc + external: + <<: *parquet-people + partitions: *parts-of-the-people + infer_schema: true + tests: *equal-to-the-people + columns: + - name: id + description: "the unique ID for people" \ No newline at end of file diff --git a/macros/plugins/snowflake/create_external_table.sql b/macros/plugins/snowflake/create_external_table.sql index 7bb46291..f48b87a7 100644 --- a/macros/plugins/snowflake/create_external_table.sql +++ b/macros/plugins/snowflake/create_external_table.sql @@ -3,26 +3,47 @@ {%- set columns = source_node.columns.values() -%} {%- set external = source_node.external -%} {%- set partitions = external.partitions -%} + {%- set infer_schema = external.infer_schema -%} + + {% if infer_schema %} + {% set query_infer_schema %} + select * from table( infer_schema( location=>'{{external.location}}', file_format=>'{{external.file_format}}') ) + {% endset %} + {% if execute %} + {% set columns_infer = run_query(query_infer_schema) %} + {% endif %} + {% endif %} {%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%} {# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} {# This assumes you have already created an external stage #} create or replace external table {{source(source_node.source_name, source_node.name)}} - {%- if columns or partitions -%} + {%- if columns or partitions or infer_schema -%} ( {%- if partitions -%}{%- for partition in partitions %} - {{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 -}} + {{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}} {%- endfor -%}{%- endif -%} - {%- for column in columns %} - {%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} - {%- set col_expression -%} - {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_quoted -%} - (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) - {%- endset %} - {{column_quoted}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) - {{- ',' if not loop.last -}} - {% endfor %} + {%- if not infer_schema -%} + {%- for column in columns %} + {%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} + {%- set col_expression -%} + {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_quoted -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endset %} + {{column_quoted}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) + {{- ',' if not loop.last -}} + {% endfor %} + {% else %} + {%- for column in columns_infer %} + {%- set col_expression -%} + {%- set col_id = 'value:' ~ column[0] -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endset %} + {{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}}) + {{- ',' if not loop.last -}} + {% endfor %} + {%- endif -%} ) {%- endif -%} {% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %} @@ -34,4 +55,4 @@ {% if external.integration -%} integration = '{{external.integration}}' {%- endif %} file_format = {{external.file_format}} {% if external.table_format -%} table_format = '{{external.table_format}}' {%- endif %} -{% endmacro %} +{% endmacro %} \ No newline at end of file diff --git a/run_test.sh b/run_test.sh index 5d0f454e..903c0210 100755 --- a/run_test.sh +++ b/run_test.sh @@ -36,6 +36,6 @@ set -eo pipefail dbt deps --target $1 dbt seed --full-refresh --target $1 dbt run-operation prep_external --target $1 -dbt run-operation stage_external_sources --vars 'ext_full_refresh: true' --target $1 -dbt run-operation stage_external_sources --target $1 +dbt run-operation dbt_external_tables.stage_external_sources --vars 'ext_full_refresh: true' --target $1 +dbt run-operation dbt_external_tables.stage_external_sources --target $1 dbt test --target $1 diff --git a/sample_sources/snowflake.yml b/sample_sources/snowflake.yml index 83d710ea..035f5335 100644 --- a/sample_sources/snowflake.yml +++ b/sample_sources/snowflake.yml @@ -62,9 +62,26 @@ sources: # include `value`, the JSON blob of all file contents. - name: delta_tbl - description: "External table using Delta files" - external: - location: "@stage" # reference an existing external stage - file_format: "( type = parquet )" # fully specified here, or reference an existing file format - table_format: delta # specify the table format - auto_refresh: false # requires configuring an event notification from Amazon S3 or Azure + description: "External table using Delta files" + external: + location: "@stage" # reference an existing external stage + file_format: "( type = parquet )" # fully specified here, or reference an existing file format + table_format: delta # specify the table format + auto_refresh: false # requires configuring an event notification from Amazon S3 or Azure + + + - name: parquet_with_inferred_schema + description: "External table using Parquet and inferring the schema" + external: + location: "@stage" # reference an existing external stage + file_format: "my_file_format" # we need a named file format for infer to work + infer_schema: true # parameter to tell Snowflake we want to infer the table schema + partitions: + - name: section # we can define partitions on top of the schema columns + data_type: varchar(64) + expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)" + columns: # columns can still be listed for documentation/testing purpose + - name: id + description: this is an id + - name: name + description: and this is a name \ No newline at end of file