From c5e2a59cf2a5ddc4baa77509ddb0786a7e9fc1ce Mon Sep 17 00:00:00 2001 From: atch Date: Sun, 19 Nov 2023 22:32:48 +0000 Subject: [PATCH] with https://github.com/dbt-labs/dbt-external-tables/pull/133 --- .../{stage_external_sources => }/path.sql | 0 .../stage_external_sources/add_partitions.sql | 70 ----------- .../create_external_table.sql | 35 ------ .../drop_external_table.sql | 15 --- .../refresh_external_table.sql | 111 ------------------ .../stage_external_sources.sql | 104 ---------------- .../models/raw/idealista_porto_homes.yml | 103 ++++++++++++++++ src/dbt/project/packages.yml | 7 +- 8 files changed, 107 insertions(+), 338 deletions(-) rename src/dbt/project/macros/{stage_external_sources => }/path.sql (100%) delete mode 100644 src/dbt/project/macros/stage_external_sources/add_partitions.sql delete mode 100644 src/dbt/project/macros/stage_external_sources/create_external_table.sql delete mode 100644 src/dbt/project/macros/stage_external_sources/drop_external_table.sql delete mode 100644 src/dbt/project/macros/stage_external_sources/refresh_external_table.sql delete mode 100644 src/dbt/project/macros/stage_external_sources/stage_external_sources.sql create mode 100644 src/dbt/project/models/raw/idealista_porto_homes.yml diff --git a/src/dbt/project/macros/stage_external_sources/path.sql b/src/dbt/project/macros/path.sql similarity index 100% rename from src/dbt/project/macros/stage_external_sources/path.sql rename to src/dbt/project/macros/path.sql diff --git a/src/dbt/project/macros/stage_external_sources/add_partitions.sql b/src/dbt/project/macros/stage_external_sources/add_partitions.sql deleted file mode 100644 index bf8380b..0000000 --- a/src/dbt/project/macros/stage_external_sources/add_partitions.sql +++ /dev/null @@ -1,70 +0,0 @@ -{% macro alter_table_add_partitions_ddl(source_node, partitions) %} - {{ return(adapter.dispatch('alter_table_add_partitions_ddl')(source_node, partitions)) }} -{% endmacro %} - -{% macro default__alter_table_add_partitions_ddl(source_node, partitions) %} - {{ exceptions.raise_compiler_error("External table creation is not implemented for the default adapter") }} -{% endmacro %} - -{# - Generates a series of alter statements to add a batch of partitions to a table. - Ideally it would require a single alter statement to add all partitions, but - Amazon imposes a limit of 100 partitions per alter statement. Therefore we need - to generate multiple altes when the number of partitions to add exceeds 100. - - Arguments: - - source (string): The name of the table to generate the partitions for. - - source_external_location (string): Base location of the external table. Paths - in the 'partitions' argument are specified relative to this location. - - partitions (list): A list of partitions to be added to the external table. - Each partition is represented by a dictionary with the keys: - - partition_by (list): A set of columns that the partition is affected by - Each column is represented by a dictionary with the keys: - - name: Name of the column - - value: Value of the column - - path (string): The path to be added as a partition for the particular - combination of columns defined in the 'partition_by' -#} -{% macro athena__alter_table_add_partitions_ddl(source_node, partitions) %} - - {{ log("Generating ADD PARTITION statement for partition set between " - ~ partitions[0]['path'] ~ " and " ~ (partitions|last)['path']) }} - - {% set ddl = [] %} - - {% if partitions|length > 0 %} - - {% set alter_table_add %} - alter table {{source(source_node.source_name, source_node.name) | replace('\"', '') }} add if not exists - {% endset %} - - {%- set alters -%} - - {{ alter_table_add }} - - {%- for partition in partitions -%} - - {%- if loop.index0 != 0 and loop.index0 % 100 == 0 -%} - - ; {{ alter_table_add }} - - {%- endif -%} - - partition ({%- for part in partition.partition_by -%}{{ part.name }}='{{ part.value }}'{{',' if not loop.last}}{%- endfor -%}) - location '{{ source_node.external.location }}{{ partition.path }}/' - - {% endfor -%} - - {%- endset -%} - - {% set ddl = ddl + alters.split(';') %} - - {% else %} - - {{ log("No partitions to be added") }} - - {% endif %} - - {% do return(ddl) %} - -{% endmacro %} diff --git a/src/dbt/project/macros/stage_external_sources/create_external_table.sql b/src/dbt/project/macros/stage_external_sources/create_external_table.sql deleted file mode 100644 index 098b42e..0000000 --- a/src/dbt/project/macros/stage_external_sources/create_external_table.sql +++ /dev/null @@ -1,35 +0,0 @@ -{% macro create_schema_ddl(schema_name) %} - create schema if not exists {{schema_name}} -{% endmacro %} - -{% macro create_external_table_ddl(source_node) %} - {{ adapter.dispatch('create_external_table_ddl')(source_node) }} -{% endmacro %} - -{% macro default__create_external_table_ddl(source_node) %} - {{ exceptions.raise_compiler_error("External table creation is not implemented for the default adapter") }} -{% endmacro %} - -{% macro athena__create_external_table_ddl(source_node) %} - - {%- set columns = source_node.columns.values() -%} - {%- set external = source_node.external -%} - {%- set partitions = external.partitions -%} - - create external table if not exists {{source(source_node.source_name, source_node.name)| replace('\"', '') }} ( - {% for column in columns %} - {{'`' + column.name + '`'}} {{column.data_type}} - {{- ',' if not loop.last -}} - {% endfor %} - ) - {% if partitions -%} partitioned by ( - {%- for partition in partitions -%} - {{'`' + partition.name + '`'}} {{partition.data_type}}{{', ' if not loop.last}} - {%- endfor -%} - ) {%- endif %} - {% if external.row_format -%} row format {{external.row_format}} {%- endif %} - {% if external.file_format -%} stored as {{external.file_format}} {%- endif %} - {% if external.location -%} location '{{external.location}}' {%- endif %} - {% if external.table_properties -%} tblproperties {{external.table_properties}} {%- endif %} - -{% endmacro %} diff --git a/src/dbt/project/macros/stage_external_sources/drop_external_table.sql b/src/dbt/project/macros/stage_external_sources/drop_external_table.sql deleted file mode 100644 index 34716fa..0000000 --- a/src/dbt/project/macros/stage_external_sources/drop_external_table.sql +++ /dev/null @@ -1,15 +0,0 @@ -{% macro dropif_ddl(source_node) %} - {{ adapter.dispatch('dropif_ddl')(source_node) }} -{% endmacro %} - -{% macro default__dropif_ddl(source_node) %} - {{ exceptions.raise_compiler_error("External table creation is not implemented for the default adapter") }} -{% endmacro %} - - -{% macro athena__dropif_ddl(node) %} - {% set ddl %} - drop table if exists {{source(node.source_name, node.name)}} - {% endset %} - {{return(ddl)}} -{% endmacro %} diff --git a/src/dbt/project/macros/stage_external_sources/refresh_external_table.sql b/src/dbt/project/macros/stage_external_sources/refresh_external_table.sql deleted file mode 100644 index 6727814..0000000 --- a/src/dbt/project/macros/stage_external_sources/refresh_external_table.sql +++ /dev/null @@ -1,111 +0,0 @@ -{% macro render_from_context(name) -%} -{% set original_name = name %} - {% if '.' in name %} - {% set package_name, name = name.split(".", 1) %} - {% else %} - {% set package_name = none %} - {% endif %} - - {% if package_name is none %} - {% set package_context = context %} - {% elif package_name in context %} - {% set package_context = context[package_name] %} - {% else %} - {% set error_msg %} - Could not find package '{{package_name}}', called with '{{original_name}}' - {% endset %} - {{ exceptions.raise_compiler_error(error_msg | trim) }} - {% endif %} - - {{ return(package_context[name](*varargs, **kwargs)) }} - -{%- endmacro %} - -{% macro refresh_external_table_ddl(source_node) %} - {{ return(adapter.dispatch('refresh_external_table_ddl')(source_node)) }} -{% endmacro %} - -{% macro default__refresh_external_table_ddl(source_node) %} - {{ exceptions.raise_compiler_error("External table creation is not implemented for the default adapter") }} -{% endmacro %} - -{% macro athena__refresh_external_table_ddl(source_node) %} - - {%- set starting = [ - { - 'partition_by': [], - 'path': '' - } - ] -%} - - {%- set ending = [] -%} - {%- set finals = [] -%} - - {%- set partitions = source_node.external.get('partitions',[]) -%} - - {%- if partitions -%}{%- for partition in partitions -%} - - {%- if not loop.first -%} - {%- set starting = ending -%} - {%- set ending = [] -%} - {%- endif -%} - - {%- for preexisting in starting -%} - - {%- if partition.vals.macro -%} - {%- set vals = render_from_context(partition.vals.macro, **partition.vals.args) -%} - {%- elif partition.vals is string -%} - {%- set vals = [partition.vals] -%} - {%- else -%} - {%- set vals = partition.vals -%} - {%- endif -%} - - {# Allow the use of custom 'key' in path_macro (path.sql) #} - {# By default, take value from source node 'external.partitions.name' from raw yml #} - {# Useful if the data in s3 is saved with a prefix/suffix path 'path_macro_key' other than 'external.partitions.name' #} - {%- if partition.path_macro_key -%} - {%- set path_macro_key = partition.path_macro_key -%} - {%- else -%} - {%- set path_macro_key = partition.name -%} - {%- endif -%} - - {%- for val in vals -%} - - {# For each preexisting guy, add a new one #} - - {%- set next_partition_by = [] -%} - - {%- for prexist_part in preexisting.partition_by -%} - {%- do next_partition_by.append(prexist_part) -%} - {%- endfor -%} - - {%- do next_partition_by.append({'name': partition.name, 'value': val}) -%} - - {# Concatenate path #} - - {%- set concat_path = preexisting.path ~ render_from_context(partition.path_macro, path_macro_key, val) -%} - - {%- do ending.append({'partition_by': next_partition_by, 'path': concat_path}) -%} - - {%- endfor -%} - - {%- endfor -%} - - {%- if loop.last -%} - {%- for end in ending -%} - {%- do finals.append(end) -%} - {%- endfor -%} - {%- endif -%} - - {%- endfor -%} - - {%- set ddl = alter_table_add_partitions_ddl(source_node, finals) -%} - {{ return(ddl) }} - - {% else %} - - {% do return([]) %} - - {% endif %} - -{% endmacro %} diff --git a/src/dbt/project/macros/stage_external_sources/stage_external_sources.sql b/src/dbt/project/macros/stage_external_sources/stage_external_sources.sql deleted file mode 100644 index 4d5fa26..0000000 --- a/src/dbt/project/macros/stage_external_sources/stage_external_sources.sql +++ /dev/null @@ -1,104 +0,0 @@ -{% macro get_external_build_plan(source_node) %} - {{ return(adapter.dispatch('get_external_build_plan')(source_node)) }} -{% endmacro %} - -{% macro default__get_external_build_plan(source_node) %} - {{ exceptions.raise_compiler_error("Staging external sources is not implemented for the default adapter") }} -{% endmacro %} - -{% macro athena__get_external_build_plan(source_node) %} - - {% set build_plan = [] %} - - {%- set partitions = source_node.external.get('partitions', none) -%} - {% set full_refresh = var('full_refresh', false) %} - - {% if full_refresh %} - - {% set build_plan = [ - create_schema_ddl(source_node.schema), - dropif_ddl(source_node), - create_external_table_ddl(source_node) - ] + refresh_external_table_ddl(source_node) - %} - - {% else %} - - {% set build_plan = [ - create_schema_ddl(source_node.schema), - create_external_table_ddl(source_node) - ] + refresh_external_table_ddl(source_node) - %} - - - {% endif %} - - {% do return(build_plan) %} - -{% endmacro %} - -{% macro stage_external_sources(select=none) %} - - {% set sources_to_stage = [] %} - - {% for node in graph.sources.values() %} - - {% if node.external.location %} - - {% if select %} - - {% for src in select.split(' ') %} - - {% if '.' in src %} - {% set src_s = src.split('.') %} - {% if src_s[0] == node.source_name and src_s[1] == node.name %} - {% do sources_to_stage.append(node) %} - {% endif %} - {% else %} - {% if src == node.source_name %} - {% do sources_to_stage.append(node) %} - {% endif %} - {% endif %} - - {% endfor %} - - {% else %} - - {% do sources_to_stage.append(node) %} - - {% endif %} - - {% endif %} - - {% endfor %} - - {% for node in sources_to_stage %} - - {% set loop_label = loop.index ~ ' of ' ~ loop.length %} - - {% do log(loop_label ~ ' START external source ' ~ node.schema ~ '.' ~ node.identifier, True) -%} - - {% set run_queue = get_external_build_plan(node) %} - - - {% do log(loop_label ~ ' SKIP', True) if run_queue == [] %} - - {% for q in run_queue %} - - {% set q_msg = q|trim %} - {% set q_log = q_msg[:50] ~ '... ' if q_msg|length > 50 else q_msg %} - - {% do log(loop_label ~ ' (' ~ loop.index ~ ') ' ~ q_log, True) %} - - {% call statement('runner', fetch_result = True, auto_begin = False) %} - {{ q }} - {% endcall %} - - {% set status = load_result('runner')['status'] %} - {% do log(loop_label ~ ' (' ~ loop.index ~ ') ' ~ status, True) %} - - {% endfor %} - - {% endfor %} - -{% endmacro %} diff --git a/src/dbt/project/models/raw/idealista_porto_homes.yml b/src/dbt/project/models/raw/idealista_porto_homes.yml new file mode 100644 index 0000000..a0bac11 --- /dev/null +++ b/src/dbt/project/models/raw/idealista_porto_homes.yml @@ -0,0 +1,103 @@ +version: 2 + +sources: + + - name: raw + schema: datalake_raw + loader: S3 + tables: + - name: idealista_porto_homes + external: + location: "s3://atommych-datalake-dev/input/idealista/pt/braga/sale/homes/" + row_format: "delimited fields terminated by ','" + table_properties: "('skip.header.line.count'='1')" + partitions: # optional + - name: loaded_at + data_type: date + path_macro: date_path_dash + vals: + macro: dbt.dates_in_range + args: + start_date_str: '2023-11-13' + end_date_str: '{{modules.datetime.date.today().strftime("%Y-%m-%d")}}' + in_fmt: "%Y-%m-%d" + out_fmt: "%Y-%m-%d" + columns: + - name: propertyCode + data_type: int + - name: thumbnail + data_type: string + - name: externalReference + data_type: string + - name: numPhotos + data_type: int + - name: price + data_type: float + - name: propertyType + data_type: string + - name: operation + data_type: string + - name: size + data_type: float + - name: rooms + data_type: int + - name: bathrooms + data_type: int + - name: address + data_type: string + - name: province + data_type: string + - name: municipality + data_type: string + - name: district + data_type: string + - name: country + data_type: string + - name: latitude + data_type: float + - name: longitude + data_type: float + - name: showAddress + data_type: boolean + - name: url + data_type: string + - name: distance + data_type: int + - name: description + data_type: string + - name: hasVideo + data_type: boolean + - name: status + data_type: string + - name: newDevelopment + data_type: boolean + - name: priceByArea + data_type: float + - name: detailedType + data_type: string + - name: suggestedTexts + data_type: string + - name: hasPlan + data_type: boolean + - name: has3DTour + data_type: boolean + - name: has360 + data_type: boolean + - name: hasStaging + data_type: boolean + - name: topNewDevelopment + data_type: boolean + - name: superTopHighlight + data_type: boolean + - name: hasLift + data_type: string + - name: parkingSpace + data_type: string + - name: floor + data_type: string + - name: neighborhood + data_type: string + - name: labels + data_type: string + - name: highlight + data_type: string \ No newline at end of file diff --git a/src/dbt/project/packages.yml b/src/dbt/project/packages.yml index 425b404..64b09b3 100644 --- a/src/dbt/project/packages.yml +++ b/src/dbt/project/packages.yml @@ -6,12 +6,13 @@ packages: ## Docs: https://docs.elementary-data.com #- package: elementary-data/elementary # version: 0.11.0 - # + - git: "https://github.com/dbt-athena/athena-utils" revision: 0.3.0 #- git: "https://github.com/dbt-labs/dbt-external-tables/" - # revision: 0.8.7 + # revision: 0.8.7 #- package: dbt-labs/dbt_external_tables # version: 0.8.7 - #- local: ../../../../133/dbt-external-tables \ No newline at end of file + # https://github.com/dbt-labs/dbt-external-tables/pull/133 + - local: ../../../../133/dbt-external-tables \ No newline at end of file