Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add column.expression to snowpipe get_copy.sql #328

Open
shaneorourke-payroc opened this issue Dec 4, 2024 · 0 comments
Open

Add column.expression to snowpipe get_copy.sql #328

shaneorourke-payroc opened this issue Dec 4, 2024 · 0 comments
Labels
enhancement New feature or request triage

Comments

@shaneorourke-payroc
Copy link

shaneorourke-payroc commented Dec 4, 2024

Describe the feature

Add the expression to the plugins/snowflake/snowpipe/get_copy.sql code so that the external_stage_source.yml expressions can be utilised for snowpipe.

I've tested this without issue:

{% macro snowflake_get_copy_sql(source_node, explicit_transaction=false) %}
{# This assumes you have already created an external stage #}

    {%- set columns = source_node.columns.values() -%}
    {%- set external = source_node.external -%}
    {%- set is_csv = dbt_external_tables.is_csv(external.file_format) %}
    {%- set copy_options = external.snowpipe.get('copy_options', none) -%}

    {%- if explicit_transaction -%} begin; {%- endif %}

    copy into {{source(source_node.source_name, source_node.name)}}
    from (
        select
        {% if columns|length == 0 %}
            $1::variant as value,
        {% else -%}
        {%- for column in columns -%}
            {%- set col_expression -%}
                {%- if column.expression -%}
                    {{column.expression}}
                {%- else -%}
                    {%- if is_csv -%}nullif(${{loop.index}},''){# special case: get columns by ordinal position #}
                    {%- else -%}nullif($1:{{column.name}},''){# standard behavior: get columns by name #}
                    {%- endif -%}
                {%- endif -%}
            {%- endset -%}
            {{col_expression}}::{{column.data_type}} as {{column.name}},
        {% endfor -%}
        {% endif %}
            metadata$filename::varchar as metadata_filename,
            metadata$file_row_number::bigint as metadata_file_row_number,
            metadata$file_last_modified::timestamp as metadata_file_last_modified,
            metadata$start_scan_time::timestamp as _dbt_copied_at
        from {{external.location}} {# stage #}
    )
    file_format = {{external.file_format}}
    {% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %}
    {% if copy_options %} {{copy_options}} {% endif %};

    {% if explicit_transaction -%} commit; {%- endif -%}

{% endmacro %}

Describe alternatives you've considered

n/a

Additional context

dbt-external-tables
prod_analytics/dbt_packages/dbt_external_tables/macros/plugins/snowflake/snowpipe/get_copy_sql.sql

I took the lines of code from snowflake/create_external_table.sql that pulls the column.expressions from the yml file.
The version I've added above will create the copy into statement dynamically with the column expression included, as opposed to iterating over the column positions
e.g.

            columns:
                - name: REPORT_DATE
                  expression: right(left($1::VARCHAR,171),8)
                  data_type: VARCHAR
                - name: MERCHANT_NUMBER
                  expression: ltrim(rtrim(right(left($2::VARCHAR,48),20)))
                  data_type: VARCHAR

Generated copy command:

copy into src_prd.FIRST_DATA_SOURCE.FD_NORTH_SETTLE_003
    from (
        select
        right(left($1::VARCHAR,171),8)::VARCHAR as REPORT_DATE,
        ltrim(rtrim(right(left($2::VARCHAR,48),20)))::VARCHAR as MERCHANT_NUMBER,

            metadata$filename::varchar as metadata_filename,
            metadata$file_row_number::bigint as metadata_file_row_number,
            metadata$file_last_modified::timestamp as metadata_file_last_modified,
            metadata$start_scan_time::timestamp as _dbt_copied_at
        from @ADMIN.PASSAGE_S3_STAGE
    )
    file_format = ADMIN.cl9dfmde_v1_ingest
    pattern = '.*CL9DFMDE.36829.NAGW-YHFBR001.708980000887.d.20210416.dfm.20231206.*'

Copy into generated prior to command:

copy into src_prd.FIRST_DATA_SOURCE.FD_NORTH_SETTLE_003
    from (
        select
        $1::VARCHAR as REPORT_DATE,
        $2::VARCHAR as MERCHANT_NUMBER,
 
            metadata$filename::varchar as metadata_filename,
            metadata$file_row_number::bigint as metadata_file_row_number,
            metadata$file_last_modified::timestamp as metadata_file_last_modified,
            metadata$start_scan_time::timestamp as _dbt_copied_at
        from @ADMIN.PASSAGE_S3_STAGE
    )
    file_format = ADMIN.cl9dfmde_v1_ingest
    pattern = **'.*CL9DFMDE.36829.NAGW-YHFBR001.708980000887.d.20210416.dfm.20231206.*'**

Who will this benefit?

Everyone using external stages with snowpipe

@shaneorourke-payroc shaneorourke-payroc added enhancement New feature or request triage labels Dec 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request triage
Projects
None yet
Development

No branches or pull requests

1 participant