Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPIKE: external table as model materialization #1058

Closed
wants to merge 12 commits into from
17 changes: 17 additions & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,20 @@ def valid_incremental_strategies(self):
def debug_query(self):
"""Override for DebugTask method"""
self.execute("select 1 as id")

@available.parse_none
def stack_tables(self, tables_list: List[agate.Table]) -> agate.Table:
"""
Given a list of agate_tables with the same column names & types
return a single unioned agate table.
"""
non_empty_tables = [table for table in tables_list if len(table.rows) > 0]

if len(non_empty_tables) == 0:
return tables_list[0]
else:
return (
agate.TableSet(non_empty_tables, keys=range(len(non_empty_tables)))
.merge()
.exclude(["group"])
)
8 changes: 8 additions & 0 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ def is_dynamic_table(self) -> bool:
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)

@property
def is_snowpipe(self) -> bool:
return self.type == SnowflakeRelationType.SnowPipe

@classproperty
def SnowPipe(cls) -> str:
return str(SnowflakeRelationType.SnowPipe)

@classproperty
def get_relation_type(cls) -> Type[SnowflakeRelationType]:
return SnowflakeRelationType
Expand Down
2 changes: 2 additions & 0 deletions dbt/adapters/snowflake/relation_configs/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class SnowflakeRelationType(StrEnum):
CTE = "cte"
External = "external"
DynamicTable = "dynamic_table"
ExternalTable = "external_table"
SnowPipe = "snowpipe"


class SnowflakeIncludePolicy(Policy):
Expand Down
44 changes: 41 additions & 3 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,47 @@
{%- set max_total_results = max_results_per_iter * max_iter -%}

{%- set sql -%}
show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}
show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }};

SELECT
"database_name",
"schema_name",
"name",
"kind",
"is_dynamic"
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
{%- endset -%}

{%- set result = run_query(sql) -%}

{%- set sql_extab -%}
show external tables in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }};

SELECT
"database_name",
"schema_name",
"name",
'external_table' as "kind",
'N' as "is_dynamic"
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
{%- endset -%}

{%- set result_extab = run_query(sql_extab) -%}

{%- set sql_pipes -%}
show pipes in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }};

SELECT
"database_name",
"schema_name",
"name",
'snowpipe' as "kind",
'N' as "is_dynamic"
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
{%- endset -%}

{%- set result_pipes = run_query(sql_pipes) -%}

{%- set n = (result | length) -%}
{%- set watermark = namespace(table_name=result.columns[1].values()[-1]) -%}
{%- set paginated = namespace(result=[]) -%}
Expand All @@ -147,8 +183,10 @@
{% endif %}

{%- set all_results_array = [result] + paginated.result -%}
{%- set result = result.merge(all_results_array) -%}
{%- do return(result) -%}

{%- set result_stacked = adapter.stack_tables([result, result_pipes, result_extab]) -%}

{%- do return(result_stacked) -%}

{% endmacro %}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
{% macro snowflake__create_external_table(relation, columns) %}

{% set snowpipe = config.get('snowpipe') %}

{% if snowpipe %}

{{ snowflake_get_build_snowpipe_sql(relation, columns) }}

{% else %}

{{ get_create_external_table_sql(relation, columns) }}

{% endif %}

{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #}
{# This assumes you have already created an external stage #}


{% endmacro %}

{% macro get_create_external_table_sql(relation, columns) %}

{% set file_format = config.get('file_format') %}
{% set location = config.get('location') %}
{% set partitions = config.get('partitions') %}
{% set partition_map = partitions|map(attribute='name')|join(', ') %}

{%- set is_csv = is_csv(file_format) -%}

create or replace external table {{ relation }}
{%- if columns or partitions or infer_schema -%}
(
{%- if partitions -%}{%- for partition in partitions %}
{{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}}
{%- endfor -%}{%- endif -%}
{%- if not infer_schema -%}
{%- for column in columns %}
{%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %}
{%- set column_alias -%}
{%- if 'alias' in column and column.quote -%}
{{adapter.quote(column.alias)}}
{%- elif 'alias' in column -%}
{{column.alias}}
{%- else -%}
{{column_quoted}}
{%- endif -%}
{%- endset %}
{%- set col_expression -%}
{%- if column.expression -%}
{{column.expression}}
{%- else -%}
{%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%}
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end)
{%- endif -%}
{%- endset %}
{{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}})
{{- ',' if not loop.last -}}
{% endfor %}
{% else %}
{%- for column in columns_infer %}
{%- set col_expression -%}
{%- set col_id = 'value:' ~ column[0] -%}
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end)
{%- endset %}
{{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}})
{{- ',' if not loop.last -}}
{% endfor %}
{%- endif -%}
)
{%- endif -%}
{% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %}
location = {{location}} {# stage #}
{% if auto_refresh in (true, false) -%}
auto_refresh = {{auto_refresh}}
{%- endif %}
{% if aws_sns_topic -%}
aws_sns_topic = '{{aws_sns_topic}}'
{%- endif %}
{% if table_format | lower == "delta" %}
refresh_on_create = false
{% endif %}
{% if pattern -%} pattern = '{{pattern}}' {%- endif %}
{% if integration -%} integration = '{{integration}}' {%- endif %}
file_format = {{file_format}}
{% if table_format -%} table_format = '{{table_format}}' {%- endif %}

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{% macro snowflake__refresh_external_table(relation) %}

{% set auto_refresh = config.get('auto_refresh', false) %}
{% set manual_refresh = not auto_refresh %}

{% set partitions = config.get('partitions', none) %}

{% set table_format = config.get('table_format', none) %}
{% if table_format %}
{% set is_delta = table_format | lower == "delta" %}
{% endif %}

{# snowpipe as well #}
{% set snowpipe = config.get('snowpipe', none) %}
{% set auto_ingest = snowpipe.get('auto_ingest', false) if snowpipe is mapping %}

{% set relation_type = 'pipe' if snowpipe is not none else 'external table' %}

{% if manual_refresh or auto_ingest %}

{% set ddl %}
begin;
alter {{ relation_type }} {{ relation }} refresh;
commit;
{% endset %}

{% do return([ddl]) %}

{% else %}

{% do return([]) %}

{% endif %}

{% endmacro %}

{% macro is_csv(file_format) %}

{# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html:

Important: The external table does not inherit the file format, if any, in the
stage definition. You must explicitly specify any file format options for the
external table using the FILE_FORMAT parameter.

Note: FORMAT_NAME and TYPE are mutually exclusive; to avoid unintended behavior,
you should only specify one or the other when creating an external table.

#}

{% set ff_ltrimmed = file_format|lower|replace(' ','') %}

{% if 'type=' in ff_ltrimmed %}

{% if 'type=csv' in ff_ltrimmed %}

{{return(true)}}

{% else %}

{{return(false)}}

{% endif %}

{% else %}

{% set ff_standardized = ff_ltrimmed
| replace('(','') | replace(')','')
| replace('format_name=','') %}
{% set fqn = ff_standardized.split('.') %}

{% if fqn | length == 3 %}
{% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %}
{% elif fqn | length == 2 %}
{% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %}
{% else %}
{% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %}
{% endif %}

{% call statement('get_file_format', fetch_result = True) %}
show file formats in {{ff_database}}.{{ff_schema}}
{% endcall %}

{% set ffs = load_result('get_file_format').table %}

{% for ff in ffs %}

{% if ff['name']|lower == ff_identifier and ff['type']|lower == 'csv' %}

{{return(true)}}

{% endif %}

{% endfor %}

{{return(false)}}

{% endif %}

{% endmacro %}
96 changes: 96 additions & 0 deletions dbt/include/snowflake/macros/relations/snowpipe/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
{% macro snowflake_create_empty_table(relation, columns) %}

create or replace table {{ relation }} (
{% if columns|length == 0 %}
value variant,
{% else -%}
{%- for column in columns -%}
{{column.name}} {{column.data_type}},
{% endfor -%}
{% endif %}
metadata_filename varchar,
metadata_file_row_number bigint,
metadata_file_last_modified timestamp,
_dbt_copied_at timestamp
);

{% endmacro %}

{% macro snowflake_get_copy_sql(relation, columns, explicit_transaction=false) %}
{# This assumes you have already created an external stage #}

{% set location = config.get('location') %}
{% set file_format = config.get('file_format') %}

{% set pattern = config.get('pattern') %}
{%- set is_csv = is_csv(file_format) %}

{% set snowpipe = config.get('snowpipe', none) %}
{%- set copy_options = snowpipe.get('copy_options', none) -%}

{%- if explicit_transaction -%} begin; {%- endif %}

copy into {{ relation }}
from (
select
{% if columns|length == 0 %}
$1::variant as value,
{% else -%}
{%- for column in columns -%}
{%- set col_expression -%}
{%- if is_csv -%}nullif(${{loop.index}},''){# special case: get columns by ordinal position #}
{%- else -%}nullif($1:{{column.name}},''){# standard behavior: get columns by name #}
{%- endif -%}
{%- endset -%}
{{col_expression}}::{{column.data_type}} as {{column.name}},
{% endfor -%}
{% endif %}
metadata$filename::varchar as metadata_filename,
metadata$file_row_number::bigint as metadata_file_row_number,
metadata$file_last_modified::timestamp as metadata_file_last_modified,
metadata$start_scan_time::timestamp as _dbt_copied_at
from {{location}} {# stage #}
)
file_format = {{file_format}}
{% if pattern -%} pattern = '{{pattern}}' {%- endif %}
{% if copy_options %} {{copy_options}} {% endif %};

{% if explicit_transaction -%} commit; {%- endif -%}

{% endmacro %}


{% macro snowflake_create_snowpipe(relation, columns) %}

{% set snowpipe = config.get('snowpipe', none) %}

{# https://docs.snowflake.com/en/sql-reference/sql/create-pipe.html #}
create or replace pipe {{ relation }}
{% if snowpipe.auto_ingest -%} auto_ingest = {{snowpipe.auto_ingest}} {%- endif %}
{% if snowpipe.aws_sns_topic -%} aws_sns_topic = '{{snowpipe.aws_sns_topic}}' {%- endif %}
{% if snowpipe.integration -%} integration = '{{snowpipe.integration}}' {%- endif %}
{% if snowpipe.error_integration -%} error_integration = '{{snowpipe.error_integration}}' {%- endif %}
as {{ snowflake_get_copy_sql(relation, columns) }}

{% endmacro %}

{% macro snowflake_refresh_snowpipe(relation) %}

{% set snowpipe = config.get('snowpipe', none) %}
{% set auto_ingest = snowpipe.get('auto_ingest', false) if snowpipe is mapping %}

{% if auto_ingest is true %}

{% do return([]) %}

{% else %}

{% set ddl %}
alter pipe {{ relation }} refresh
{% endset %}

{{ return([ddl]) }}

{% endif %}

{% endmacro %}
Loading
Loading