diff --git a/example_project/models/sources.yml b/example_project/models/sources.yml index 49905b3..d21ac7c 100644 --- a/example_project/models/sources.yml +++ b/example_project/models/sources.yml @@ -56,4 +56,75 @@ sources: - name: user data_type: 'STRING' - name: version - data_type: 'BIGINT' \ No newline at end of file + data_type: 'BIGINT' + + - name: aws_public_blockchain + database: awscatalog + quoting: + database: false + schema: false + identifier: false + describe: "https://github.com/aws-samples/digital-assets-examples/blob/main/analytics/" + external: + location: "s3://aws-public-blockchain/v1.0" + row_format: "serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'" + partitions: + - name: coin + data_type: string + path_macro: value_only + vals: [ + 'btc', + 'eth' + ] + - name: transactions + data_type: string + path_macro: value_only + vals: [ + '/transactions/date=2024-09-19', + '/transactions/date=2024-09-20' + ] + columns: + - name: gas + data_type: bigint + - name: hash + data_type: string + - name: input + data_type: string + - name: nonce + data_type: bigint + - name: value + data_type: double + - name: block_number + data_type: bigint + - name: block_hash + data_type: string + - name: transaction_index + data_type: bigint + - name: from_address + data_type: string + - name: to_address + data_type: string + - name: gas_price + data_type: bigint + - name: receipt_cumulative_gas_used + data_type: bigint + - name: receipt_gas_used + data_type: bigint + - name: receipt_contract_address + data_type: string + - name: receipt_status + data_type: bigint + - name: receipt_effective_gas_price + data_type: bigint + - name: transaction_type + data_type: int + - name: max_fee_per_gas + data_type: bigint + - name: max_priority_fee_per_gas + data_type: bigint + - name: block_timestamp + data_type: timestamp + - name: date + data_type: string + - name: last_modified + data_type: timestamp \ No newline at end of file diff --git a/macros/refresh_external_tables.sql b/macros/refresh_external_tables.sql index 0526596..b46c985 100644 --- a/macros/refresh_external_tables.sql +++ b/macros/refresh_external_tables.sql @@ -1,38 +1,67 @@ {% macro athena__refresh_external_table(source_node) %} - {# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #} - {%- set partitions = source_node.external.partitions -%} - {%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%} - {%- if partitions -%} - {%- if hive_compatible_partitions -%} - {% set ddl -%} - msck repair table {{source(source_node.source_name, source_node.name).render_hive()}} - {%- endset %} - {{ return([ddl]) }} - {% else %} - {# https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html #} - {%- set part_len = partitions|length -%} - {%- set finals = [] -%} - {%- for partition in partitions -%} - {%- if partition.vals.macro -%} - {%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%} - {%- elif partition.vals is string -%} - {%- set vals = [partition.vals] -%} - {%- else -%} - {%- set vals = partition.vals -%} - {%- endif -%} - {%- for val in vals -%} - {%- set partition_parts = [{'name': partition.name, 'value': val}] -%} - {%- set path_parts = [dbt_external_tables.render_from_context(partition.path_macro, partition.name, val)] -%} - {%- set construct = { - 'partition_by': partition_parts, - 'path': path_parts | join('/') - } -%} - {% do finals.append(construct) %} - {%- endfor -%} + {# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #} + + {%- set athena_partitioning = [{'partition_by': [], 'path': ''}] -%} + {%- set list_partitions = [] -%} + {%- set finals = [] -%} + + {%- set partitions = source_node.external.partitions -%} + {%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%} + + {%- if partitions -%} + {%- if hive_compatible_partitions -%} + {% set ddl -%} + msck repair table {{source(source_node.source_name, source_node.name).render_hive()}} + {%- endset %} + {{ return([ddl]) }} + {% else %} + {# https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html #} + {%- set part_len = partitions|length -%} + {%- for partition in partitions -%} + {%- if not loop.first -%} + {%- set athena_partitioning = list_partitions -%} + {%- set list_partitions = [] -%} + {%- endif -%} + {%- for preexisting in athena_partitioning -%} + {%- if partition.vals.macro -%} + {%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%} + {%- elif partition.vals is string -%} + {%- set vals = [partition.vals] -%} + {%- else -%} + {%- set vals = partition.vals -%} + {%- endif -%} + + {# Allow the use of custom 'key' in path_macro (path.sql) #} + {# By default, take value from source node 'external.partitions.name' from raw yml #} + {# Useful if the data in s3 is saved with a prefix/suffix path 'path_macro_key' other than 'external.partitions.name' #} + {%- if partition.path_macro_key -%} + {%- set path_macro_key = partition.path_macro_key -%} + {%- else -%} + {%- set path_macro_key = partition.name -%} + {%- endif -%} + + {%- for val in vals -%} + {# For each preexisting item, add a new one #} + {%- set partition_parts = [] -%} + {%- for prexist_part in preexisting.partition_by -%} + {%- do partition_parts.append(prexist_part) -%} + {%- endfor -%} + {%- do partition_parts.append({'name': partition.name, 'value': val}) -%} + {# Concatenate path #} + {%- set path_parts = preexisting.path ~ render_from_context(partition.path_macro, path_macro_key, val) -%} + {%- do list_partitions.append({'partition_by': partition_parts, 'path': path_parts}) -%} {%- endfor -%} - {%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%} - {{ return(ddl) }} - {% endif %} + {%- endfor -%} + {%- if loop.last -%} + {%- for construct in list_partitions -%} + {%- do finals.append(construct) -%} + {%- endfor -%} + {%- endif -%} + {%- endfor -%} + {%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%} + {{ return(ddl) }} {% endif %} + {% else %} {% do return([]) %} + {% endif %} {% endmacro %} \ No newline at end of file