From a779de8dea0fb7890efe4da84ced303fb9f65004 Mon Sep 17 00:00:00 2001 From: Alexandro Hou Date: Fri, 20 Sep 2024 10:39:30 +0100 Subject: [PATCH 1/8] enable multipartitioning tables in athena --- macros/refresh_external_tables.sql | 58 ++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/macros/refresh_external_tables.sql b/macros/refresh_external_tables.sql index 0526596..fe8f0be 100644 --- a/macros/refresh_external_tables.sql +++ b/macros/refresh_external_tables.sql @@ -1,7 +1,13 @@ {% macro athena__refresh_external_table(source_node) %} {# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #} + + {%- set starting = [{ 'partition_by': [],'path': '' }] -%} + {%- set parts_list = [] -%} + {%- set finals = [] -%} + {%- set partitions = source_node.external.partitions -%} {%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%} + {%- if partitions -%} {%- if hive_compatible_partitions -%} {% set ddl -%} @@ -13,25 +19,47 @@ {%- set part_len = partitions|length -%} {%- set finals = [] -%} {%- for partition in partitions -%} - {%- if partition.vals.macro -%} - {%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%} - {%- elif partition.vals is string -%} - {%- set vals = [partition.vals] -%} - {%- else -%} - {%- set vals = partition.vals -%} + {%- if not loop.first -%} + {%- set starting = parts_list -%} + {%- set parts_list = [] -%} {%- endif -%} - {%- for val in vals -%} - {%- set partition_parts = [{'name': partition.name, 'value': val}] -%} - {%- set path_parts = [dbt_external_tables.render_from_context(partition.path_macro, partition.name, val)] -%} - {%- set construct = { - 'partition_by': partition_parts, - 'path': path_parts | join('/') - } -%} - {% do finals.append(construct) %} + + {%- for preexisting in starting -%} + {%- if partition.vals.macro -%} + {%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%} + {%- elif partition.vals is string -%} + {%- set vals = [partition.vals] -%} + {%- else -%} + {%- set vals = partition.vals -%} + {%- endif -%} + {%- for val in vals -%} + {%- set partition_parts = [] -%} + + {%- for sub_part in preexisting.partition_by -%} + {%- do partition_parts.append(sub_part) -%} + {%- endfor -%} + + {%- do partition_parts.append({'name': partition.name, 'value': val}) -%} + {%- set path_parts = [dbt_external_tables.render_from_context(partition.path_macro, partition.name, val)] -%} + {%- set construct = { + 'partition_by': partition_parts, + 'path': path_parts | join('/') + } -%} + {% do finals.append(construct) %} + {%- endfor -%} + {%- endfor -%} - {%- endfor -%} + + {%- if loop.last -%} + {%- for part_spec in parts_list -%} + {%- do finals.append(part_spec) -%} + {%- endfor -%} + {%- endif -%} + + {%- endfor -%} {%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%} {{ return(ddl) }} + {% endif %} {% endif %} {% do return([]) %} From 6b21e038722d9fcd8471f527e69188bf64096ed8 Mon Sep 17 00:00:00 2001 From: Alexandro Hou Date: Fri, 20 Sep 2024 12:09:23 +0100 Subject: [PATCH 2/8] adjust partitions concat --- macros/refresh_external_tables.sql | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/macros/refresh_external_tables.sql b/macros/refresh_external_tables.sql index fe8f0be..a744867 100644 --- a/macros/refresh_external_tables.sql +++ b/macros/refresh_external_tables.sql @@ -40,12 +40,12 @@ {%- endfor -%} {%- do partition_parts.append({'name': partition.name, 'value': val}) -%} - {%- set path_parts = [dbt_external_tables.render_from_context(partition.path_macro, partition.name, val)] -%} + {%- set path_parts = preexisting.path ~ dbt_external_tables.render_from_context(partition.path_macro, partition.name, val) -%} {%- set construct = { 'partition_by': partition_parts, - 'path': path_parts | join('/') + 'path': path_parts } -%} - {% do finals.append(construct) %} + {%- do parts_list.append(construct) -%} {%- endfor -%} {%- endfor -%} @@ -56,7 +56,7 @@ {%- endfor -%} {%- endif -%} - {%- endfor -%} + {%- endfor -%} {%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%} {{ return(ddl) }} From b0994e638109887163bd34533969e7dff9b9084a Mon Sep 17 00:00:00 2001 From: Alexandro Hou Date: Fri, 20 Sep 2024 13:16:30 +0100 Subject: [PATCH 3/8] added path_macro_arg --- macros/refresh_external_tables.sql | 38 ++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/macros/refresh_external_tables.sql b/macros/refresh_external_tables.sql index a744867..c371f64 100644 --- a/macros/refresh_external_tables.sql +++ b/macros/refresh_external_tables.sql @@ -1,10 +1,6 @@ {% macro athena__refresh_external_table(source_node) %} {# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #} - {%- set starting = [{ 'partition_by': [],'path': '' }] -%} - {%- set parts_list = [] -%} - {%- set finals = [] -%} - {%- set partitions = source_node.external.partitions -%} {%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%} @@ -19,12 +15,14 @@ {%- set part_len = partitions|length -%} {%- set finals = [] -%} {%- for partition in partitions -%} - {%- if not loop.first -%} - {%- set starting = parts_list -%} - {%- set parts_list = [] -%} + {%- if loop.first -%} + {%- set partition_list = [{ 'partition_by': [],'path': '' }] -%} + {% else %} + {%- set partition_list = partition_specs -%} {%- endif -%} - {%- for preexisting in starting -%} + {%- set partition_specs = [] -%} + {%- for preexisting in partition_list -%} {%- if partition.vals.macro -%} {%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%} {%- elif partition.vals is string -%} @@ -32,7 +30,20 @@ {%- else -%} {%- set vals = partition.vals -%} {%- endif -%} + + + {# Allow the use of custom 'key' in path_macro (path.sql) #} + {# By default, take value from source node 'external.partitions.name' from raw yml #} + {# Useful if the data in s3 is saved with a prefix/suffix path 'path_macro_arg' other than 'external.partitions.name' #} + {%- if partition.path_macro_arg -%} + {%- set path_macro_arg = partition.path_macro_arg -%} + {%- else -%} + {%- set path_macro_arg = partition.name -%} + {%- endif -%} + + {%- for val in vals -%} + {# For each preexisting item, add a new one #} {%- set partition_parts = [] -%} {%- for sub_part in preexisting.partition_by -%} @@ -40,18 +51,21 @@ {%- endfor -%} {%- do partition_parts.append({'name': partition.name, 'value': val}) -%} - {%- set path_parts = preexisting.path ~ dbt_external_tables.render_from_context(partition.path_macro, partition.name, val) -%} + + {# Concatenate path #} + {%- set path_parts = preexisting.path ~ dbt_external_tables.render_from_context(partition.path_macro, path_macro_arg, val) -%} + {%- set construct = { 'partition_by': partition_parts, - 'path': path_parts + 'path': path_parts | join('/') } -%} - {%- do parts_list.append(construct) -%} + {%- do partition_specs.append(construct) -%} {%- endfor -%} {%- endfor -%} {%- if loop.last -%} - {%- for part_spec in parts_list -%} + {%- for part_spec in partition_specs -%} {%- do finals.append(part_spec) -%} {%- endfor -%} {%- endif -%} From 093fdc3e6fe31df821c91858fbe2323b37564ca7 Mon Sep 17 00:00:00 2001 From: Alexandro Hou Date: Fri, 20 Sep 2024 14:57:22 +0100 Subject: [PATCH 4/8] fix array error --- macros/refresh_external_tables.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/macros/refresh_external_tables.sql b/macros/refresh_external_tables.sql index c371f64..27d244f 100644 --- a/macros/refresh_external_tables.sql +++ b/macros/refresh_external_tables.sql @@ -57,7 +57,7 @@ {%- set construct = { 'partition_by': partition_parts, - 'path': path_parts | join('/') + 'path': path_parts } -%} {%- do partition_specs.append(construct) -%} {%- endfor -%} From dda4e20f290a1f108f4788de0bd91e8be853a480 Mon Sep 17 00:00:00 2001 From: Alexandro Hou Date: Fri, 20 Sep 2024 18:14:00 +0100 Subject: [PATCH 5/8] project version --- macros/refresh_external_tables.sql | 131 ++++++++++++++--------------- 1 file changed, 62 insertions(+), 69 deletions(-) diff --git a/macros/refresh_external_tables.sql b/macros/refresh_external_tables.sql index 27d244f..1bd1e12 100644 --- a/macros/refresh_external_tables.sql +++ b/macros/refresh_external_tables.sql @@ -1,80 +1,73 @@ {% macro athena__refresh_external_table(source_node) %} - {# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #} + {# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #} - {%- set partitions = source_node.external.partitions -%} - {%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%} + {%- set starting = [ + { + 'partition_by': [], + 'path': '' + } + ] -%} - {%- if partitions -%} - {%- if hive_compatible_partitions -%} - {% set ddl -%} - msck repair table {{source(source_node.source_name, source_node.name).render_hive()}} - {%- endset %} - {{ return([ddl]) }} - {% else %} - {# https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html #} - {%- set part_len = partitions|length -%} - {%- set finals = [] -%} - {%- for partition in partitions -%} - {%- if loop.first -%} - {%- set partition_list = [{ 'partition_by': [],'path': '' }] -%} - {% else %} - {%- set partition_list = partition_specs -%} - {%- endif -%} + {%- set ending = [] -%} + {%- set finals = [] -%} - {%- set partition_specs = [] -%} - {%- for preexisting in partition_list -%} - {%- if partition.vals.macro -%} - {%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%} - {%- elif partition.vals is string -%} - {%- set vals = [partition.vals] -%} - {%- else -%} - {%- set vals = partition.vals -%} - {%- endif -%} + {%- set partitions = source_node.external.partitions -%} + {%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%} + {%- if partitions -%} + {%- if hive_compatible_partitions -%} + {% set ddl -%} + msck repair table {{source(source_node.source_name, source_node.name).render_hive()}} + {%- endset %} + {{ return([ddl]) }} + {% else %} + {# https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html #} + {%- set part_len = partitions|length -%} + {%- for partition in partitions -%} + {%- if not loop.first -%} + {%- set starting = ending -%} + {%- set ending = [] -%} + {%- endif -%} + {%- for preexisting in starting -%} + {%- if partition.vals.macro -%} + {%- set vals = render_from_context(partition.vals.macro, **partition.vals.args) -%} + {%- elif partition.vals is string -%} + {%- set vals = [partition.vals] -%} + {%- else -%} + {%- set vals = partition.vals -%} + {%- endif -%} - {# Allow the use of custom 'key' in path_macro (path.sql) #} - {# By default, take value from source node 'external.partitions.name' from raw yml #} - {# Useful if the data in s3 is saved with a prefix/suffix path 'path_macro_arg' other than 'external.partitions.name' #} - {%- if partition.path_macro_arg -%} - {%- set path_macro_arg = partition.path_macro_arg -%} - {%- else -%} - {%- set path_macro_arg = partition.name -%} - {%- endif -%} - - - {%- for val in vals -%} - {# For each preexisting item, add a new one #} - {%- set partition_parts = [] -%} - - {%- for sub_part in preexisting.partition_by -%} - {%- do partition_parts.append(sub_part) -%} - {%- endfor -%} - - {%- do partition_parts.append({'name': partition.name, 'value': val}) -%} - - {# Concatenate path #} - {%- set path_parts = preexisting.path ~ dbt_external_tables.render_from_context(partition.path_macro, path_macro_arg, val) -%} - - {%- set construct = { - 'partition_by': partition_parts, - 'path': path_parts - } -%} - {%- do partition_specs.append(construct) -%} - {%- endfor -%} - - {%- endfor -%} - - {%- if loop.last -%} - {%- for part_spec in partition_specs -%} - {%- do finals.append(part_spec) -%} - {%- endfor -%} - {%- endif -%} + {# Allow the use of custom 'key' in path_macro (path.sql) #} + {# By default, take value from source node 'external.partitions.name' from raw yml #} + {# Useful if the data in s3 is saved with a prefix/suffix path 'path_macro_key' other than 'external.partitions.name' #} + {%- if partition.path_macro_key -%} + {%- set path_macro_key = partition.path_macro_key -%} + {%- else -%} + {%- set path_macro_key = partition.name -%} + {%- endif -%} + {%- for val in vals -%} + {# For each preexisting item, add a new one #} + {%- set next_partition_by = [] -%} + {%- for prexist_part in preexisting.partition_by -%} + {%- do next_partition_by.append(prexist_part) -%} + {%- endfor -%} + {%- do next_partition_by.append({'name': partition.name, 'value': val}) -%} + {# Concatenate path #} + {%- set concat_path = preexisting.path ~ render_from_context(partition.path_macro, path_macro_key, val) -%} + {%- do ending.append({'partition_by': next_partition_by, 'path': concat_path}) -%} {%- endfor -%} - {%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%} - {{ return(ddl) }} - - {% endif %} + {%- endfor -%} + {%- if loop.last -%} + {%- for end in ending -%} + {%- do finals.append(end) -%} + {%- endfor -%} + {%- endif -%} + {%- endfor -%} + {%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%} + {{ return(ddl) }} {% endif %} + {% else %} {% do return([]) %} + {% endif %} {% endmacro %} \ No newline at end of file From 4184568b5de30d5666ffa13c4a59ef196a5d0928 Mon Sep 17 00:00:00 2001 From: Alexandro Hou Date: Sat, 21 Sep 2024 13:12:27 +0100 Subject: [PATCH 6/8] added test --- example_project/models/sources.yml | 73 +++++++++++++++++++++++++++++- macros/refresh_external_tables.sql | 32 ++++++------- 2 files changed, 85 insertions(+), 20 deletions(-) diff --git a/example_project/models/sources.yml b/example_project/models/sources.yml index 49905b3..d21ac7c 100644 --- a/example_project/models/sources.yml +++ b/example_project/models/sources.yml @@ -56,4 +56,75 @@ sources: - name: user data_type: 'STRING' - name: version - data_type: 'BIGINT' \ No newline at end of file + data_type: 'BIGINT' + + - name: aws_public_blockchain + database: awscatalog + quoting: + database: false + schema: false + identifier: false + describe: "https://github.com/aws-samples/digital-assets-examples/blob/main/analytics/" + external: + location: "s3://aws-public-blockchain/v1.0" + row_format: "serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'" + partitions: + - name: coin + data_type: string + path_macro: value_only + vals: [ + 'btc', + 'eth' + ] + - name: transactions + data_type: string + path_macro: value_only + vals: [ + '/transactions/date=2024-09-19', + '/transactions/date=2024-09-20' + ] + columns: + - name: gas + data_type: bigint + - name: hash + data_type: string + - name: input + data_type: string + - name: nonce + data_type: bigint + - name: value + data_type: double + - name: block_number + data_type: bigint + - name: block_hash + data_type: string + - name: transaction_index + data_type: bigint + - name: from_address + data_type: string + - name: to_address + data_type: string + - name: gas_price + data_type: bigint + - name: receipt_cumulative_gas_used + data_type: bigint + - name: receipt_gas_used + data_type: bigint + - name: receipt_contract_address + data_type: string + - name: receipt_status + data_type: bigint + - name: receipt_effective_gas_price + data_type: bigint + - name: transaction_type + data_type: int + - name: max_fee_per_gas + data_type: bigint + - name: max_priority_fee_per_gas + data_type: bigint + - name: block_timestamp + data_type: timestamp + - name: date + data_type: string + - name: last_modified + data_type: timestamp \ No newline at end of file diff --git a/macros/refresh_external_tables.sql b/macros/refresh_external_tables.sql index 1bd1e12..b46c985 100644 --- a/macros/refresh_external_tables.sql +++ b/macros/refresh_external_tables.sql @@ -1,14 +1,8 @@ {% macro athena__refresh_external_table(source_node) %} {# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #} - {%- set starting = [ - { - 'partition_by': [], - 'path': '' - } - ] -%} - - {%- set ending = [] -%} + {%- set athena_partitioning = [{'partition_by': [], 'path': ''}] -%} + {%- set list_partitions = [] -%} {%- set finals = [] -%} {%- set partitions = source_node.external.partitions -%} @@ -25,12 +19,12 @@ {%- set part_len = partitions|length -%} {%- for partition in partitions -%} {%- if not loop.first -%} - {%- set starting = ending -%} - {%- set ending = [] -%} + {%- set athena_partitioning = list_partitions -%} + {%- set list_partitions = [] -%} {%- endif -%} - {%- for preexisting in starting -%} + {%- for preexisting in athena_partitioning -%} {%- if partition.vals.macro -%} - {%- set vals = render_from_context(partition.vals.macro, **partition.vals.args) -%} + {%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%} {%- elif partition.vals is string -%} {%- set vals = [partition.vals] -%} {%- else -%} @@ -48,19 +42,19 @@ {%- for val in vals -%} {# For each preexisting item, add a new one #} - {%- set next_partition_by = [] -%} + {%- set partition_parts = [] -%} {%- for prexist_part in preexisting.partition_by -%} - {%- do next_partition_by.append(prexist_part) -%} + {%- do partition_parts.append(prexist_part) -%} {%- endfor -%} - {%- do next_partition_by.append({'name': partition.name, 'value': val}) -%} + {%- do partition_parts.append({'name': partition.name, 'value': val}) -%} {# Concatenate path #} - {%- set concat_path = preexisting.path ~ render_from_context(partition.path_macro, path_macro_key, val) -%} - {%- do ending.append({'partition_by': next_partition_by, 'path': concat_path}) -%} + {%- set path_parts = preexisting.path ~ render_from_context(partition.path_macro, path_macro_key, val) -%} + {%- do list_partitions.append({'partition_by': partition_parts, 'path': path_parts}) -%} {%- endfor -%} {%- endfor -%} {%- if loop.last -%} - {%- for end in ending -%} - {%- do finals.append(end) -%} + {%- for construct in list_partitions -%} + {%- do finals.append(construct) -%} {%- endfor -%} {%- endif -%} {%- endfor -%} From 8c22ac0497e160d111d7294ea38da6629ae82a04 Mon Sep 17 00:00:00 2001 From: Alexandro Hou Date: Tue, 1 Oct 2024 09:19:52 +0100 Subject: [PATCH 7/8] removed article authors historic, keeping just the last change --- example_project/profiles.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/example_project/profiles.yml b/example_project/profiles.yml index 27619c6..393520d 100644 --- a/example_project/profiles.yml +++ b/example_project/profiles.yml @@ -8,10 +8,10 @@ example: athena: type: athena - database: "{{ env_var('ATHENA_TEST_DBNAME', 'AwsDataCatalog') }}" - region_name: "{{ env_var('ATHENA_TEST_REGION') }}" - s3_staging_dir: "s3://{{ env_var('ATHENA_TEST_BUCKET') }}" - work_group: "{{ env_var('ATHENA_TEST_WORKGROUP', 'primary') }}" + database: "awsdatacatalog" + region_name: "eu-central-1" + s3_staging_dir: "s3://paidmedia-tmp-athena/dbt/dev/ahou" + work_group: "ahou" schema: dbt_athena_external_tables_example threads: 1 retries: 1 \ No newline at end of file From 9f62409d611688b76529c88c37e4bd2a3bc2a6f5 Mon Sep 17 00:00:00 2001 From: Alexandro Hou Date: Tue, 1 Oct 2024 09:23:08 +0100 Subject: [PATCH 8/8] rollback --- example_project/profiles.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/example_project/profiles.yml b/example_project/profiles.yml index 393520d..27619c6 100644 --- a/example_project/profiles.yml +++ b/example_project/profiles.yml @@ -8,10 +8,10 @@ example: athena: type: athena - database: "awsdatacatalog" - region_name: "eu-central-1" - s3_staging_dir: "s3://paidmedia-tmp-athena/dbt/dev/ahou" - work_group: "ahou" + database: "{{ env_var('ATHENA_TEST_DBNAME', 'AwsDataCatalog') }}" + region_name: "{{ env_var('ATHENA_TEST_REGION') }}" + s3_staging_dir: "s3://{{ env_var('ATHENA_TEST_BUCKET') }}" + work_group: "{{ env_var('ATHENA_TEST_WORKGROUP', 'primary') }}" schema: dbt_athena_external_tables_example threads: 1 retries: 1 \ No newline at end of file