diff --git a/dbt/include/postgres/macros/adapters.sql b/dbt/include/postgres/macros/adapters.sql index 294443be..69f6fa4c 100644 --- a/dbt/include/postgres/macros/adapters.sql +++ b/dbt/include/postgres/macros/adapters.sql @@ -4,29 +4,127 @@ {{ sql_header if sql_header is not none }} - create {% if temporary -%} - temporary - {%- elif unlogged -%} - unlogged - {%- endif %} table {{ relation }} - {% set contract_config = config.get('contract') %} - {% if contract_config.enforced %} - {{ get_assert_columns_equivalent(sql) }} - {% endif -%} - {% if contract_config.enforced and (not temporary) -%} - {{ get_table_columns_and_constraints() }} ; - insert into {{ relation }} ( - {{ adapter.dispatch('get_column_names', 'dbt')() }} - ) - {%- set sql = get_select_subquery(sql) %} + {% if config.get('partition_by') != None %} + {# Use partitioning #} + + {%- set partition_by_field = config.get('partition_by')['field'] -%} + {%- set partition_by_granularity = config.get('partition_by')['granularity'] -%} + + -- We cannot create a partitioned table with "create as select". + -- Create a dummy temporary table just to be used as a template for the real one + create temporary table "{{ this.identifier }}__tt" as + select * + from ({{ sql }}) model_subq + where 1 = 2; + + -- Create partitioned table as a copy of the template + create {% if temporary -%} + temporary + {%- elif unlogged -%} + unlogged + {%- endif %} table {{ relation }} (like "{{ this.identifier }}__tt") + partition by range ({{ partition_by_field }}); + + {% set required_partitions_query %} + -- Partitions need to be manually created before inserting. + -- We execute the model SQL to get the first and last partitions and use the granularity to define what others need to be created. + -- + -- Note that we this executes the SQL model a first time just to get the partitions and it will be done a second time to actually get the data. + -- That might be very inefficient depending on the selected volume of data. + -- There's a alternative: + -- - Create as select a temporary non-partitioned table. + -- - Get the min/max partitions from that table. + -- - Move the data to the final table once it's partitioned. + -- + -- That option would mean the data is stored twice during model execution. + with partitions as ( + -- Generate a sequence with one row per required partition, based on the min and max dates + select + generate_series( + date_trunc('{{ partition_by_granularity }}', min({{ partition_by_field }})), + date_trunc('{{ partition_by_granularity }}', max({{ partition_by_field }})), + '1 {{ partition_by_granularity }}'::interval + ) as begin_date + from ( + {{ sql }} + ) model_subq + ) + select + -- Generate the begin-end date and name suffix for each partition + begin_date, + begin_date + '1 {{ partition_by_granularity }}'::interval as end_date, + to_char(begin_date, '_yyyymmdd') as partition_suffix + from partitions; + {% endset %} + {% set required_partitions_results = run_query(required_partitions_query) %} + + -- Create the required partitions + {% for required_partition in required_partitions_results.rows %} + create + {% if temporary -%} + temporary + {%- elif unlogged -%} + unlogged + {%- endif %} + table + {{ make_intermediate_relation(relation, required_partition['partition_suffix']) }} + partition of {{ relation }} for values from ('{{ required_partition["begin_date"] }}'::timestamp) to ('{{ required_partition["end_date"] }}'::timestamp); + {% endfor %} + + -- Insert into the parent table + insert into {{ relation }} + {{ sql }}; {% else %} - as + create {% if temporary -%} + temporary + {%- elif unlogged -%} + unlogged + {%- endif %} table {{ relation }} + {% set contract_config = config.get('contract') %} + {% if contract_config.enforced %} + {{ get_assert_columns_equivalent(sql) }} + {% endif -%} + {% if contract_config.enforced and (not temporary) -%} + {{ get_table_columns_and_constraints() }} ; + insert into {{ relation }} ( + {{ adapter.dispatch('get_column_names', 'dbt')() }} + ) + {%- set sql = get_select_subquery(sql) %} + {% else %} + as + {% endif %} + ( + {{ sql }} + ); {% endif %} - ( - {{ sql }} - ); {%- endmacro %} +{% macro postgres__rename_relation(from_relation, to_relation) %} + {% set target_name = adapter.quote_as_configured(to_relation.identifier, 'identifier') %} + {% call statement('rename_relation') -%} + alter table {{ from_relation }} rename to {{ target_name }}; + + {# If the relation is partitioned, rename the subtables #} + {% set existing_partitions_query %} + select + inhrelid::regclass as from_table_name, + '_' || regexp_substr(inhrelid::regclass::text, '[^_]*$') as partition_suffix -- Get the string after the last underscore + from pg_inherits i + join pg_class c on i.inhparent = c.oid + join pg_namespace ns on c.relnamespace = ns.oid + where ns.nspname = '{{ from_relation.schema }}' and c.relname = '{{ from_relation.identifier }}'; + {% endset %} + {% set existing_partitions_results = run_query(existing_partitions_query) %} + + -- Rename the existing partitions + {% for existing_partition in existing_partitions_results.rows %} + {%- set partition_relation = make_intermediate_relation(to_relation, existing_partition['partition_suffix']) %} + + alter table {{ existing_partition["from_table_name"] }} rename to {{ partition_relation.identifier }}; + {% endfor %} + {%- endcall %} +{% endmacro %} + {% macro postgres__get_create_index_sql(relation, index_dict) -%} {%- set index_config = adapter.parse_index(index_dict) -%} {%- set comma_separated_columns = ", ".join(index_config.columns) -%} diff --git a/dbt/include/postgres/macros/materializations/incremental_strategies.sql b/dbt/include/postgres/macros/materializations/incremental_strategies.sql index f2fbf41e..8659a77a 100644 --- a/dbt/include/postgres/macros/materializations/incremental_strategies.sql +++ b/dbt/include/postgres/macros/materializations/incremental_strategies.sql @@ -1,9 +1,83 @@ -{% macro postgres__get_incremental_default_sql(arg_dict) %} +{% macro postgres__get_incremental_merge_sql(arg_dict) %} + {{ create_incremental_missing_partitions(arg_dict) }} - {% if arg_dict["unique_key"] %} - {% do return(get_incremental_delete_insert_sql(arg_dict)) %} - {% else %} - {% do return(get_incremental_append_sql(arg_dict)) %} - {% endif %} + {% do return(default__get_incremental_merge_sql(arg_dict)) %} +{% endmacro %} + +{% macro postgres__get_incremental_delete_insert_sql(arg_dict) %} + {{ create_incremental_missing_partitions(arg_dict) }} + + {% do return(default__get_incremental_delete_insert_sql(arg_dict)) %} +{% endmacro %} + +{% macro postgres__get_incremental_append_sql(arg_dict) %} + {{ create_incremental_missing_partitions(arg_dict) }} + {% do return(default__get_incremental_append_sql(arg_dict)) %} {% endmacro %} + +{% macro create_incremental_missing_partitions(arg_dict) %} + {%- set target = arg_dict["target_relation"] -%} + {%- set source = arg_dict["temp_relation"] -%} + {%- set unlogged = config.get('unlogged') %} + + {# + -- New data might get inserted to a partition that still does not exist + -- We need to perform an anti join between the partitions that exist in the new (source) table and the target table + -- Partitions that don't exist will be created + #} + {%- set missing_partitions_query %} + with + new_partitions as ( + -- Get the partitions present on the new, temporary source table + select + inhrelid::regclass as from_table_name, + regexp_substr(inhrelid::regclass::text, '[^_]*$') as partition_suffix, -- Get the string after the last underscore + pg_get_expr(c.relpartbound, c.oid, true) as partition_expression + from pg_inherits i + join pg_class c_parent on i.inhparent = c_parent.oid + join pg_class c on i.inhrelid = c.oid + where c.relnamespace = pg_my_temp_schema()::regclass + and c_parent.relname = '{{ source.identifier }}' + ), + existing_partitions as ( + -- Partitions already available on the target table + select + pg_get_expr(c.relpartbound, c.oid, true) as partition_expression + from pg_inherits i + join pg_class c_parent on i.inhparent = c_parent.oid + join pg_class c on i.inhrelid = c.oid + join pg_namespace ns on c.relnamespace = ns.oid + where ns.nspname = '{{ target.schema }}' and c_parent.relname = '{{ target.identifier }}' + ), + missing_partitions as ( + -- Find the partitions that exist on the source table but not on the target + select + np.from_table_name, + partition_suffix, + partition_expression + from new_partitions np + left join existing_partitions ep using (partition_expression) + where ep.partition_expression is null + ) + select * + from missing_partitions + {% endset %} + + {% set missing_partitions = run_query(missing_partitions_query) %} + {% for missing_partition in missing_partitions.rows %} + {%- set partition_relation = make_intermediate_relation(target, missing_partition['partition_suffix']) %} + + -- Create the missing partition + create {% if unlogged -%} + unlogged + {%- endif %} + table + "{{ target.schema }}"."{{ partition_relation.identifier }}" + partition of {{ target }} {{ missing_partition.partition_expression }}; + {% endfor %} + + {# Ensure a statement is executed even if no new partitions were created #} + select 1; + +{% endmacro %} \ No newline at end of file