Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] - Partitioning support #78

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 117 additions & 19 deletions dbt/include/postgres/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,127 @@

{{ sql_header if sql_header is not none }}

create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{% endif -%}
{% if contract_config.enforced and (not temporary) -%}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation }} (
{{ adapter.dispatch('get_column_names', 'dbt')() }}
)
{%- set sql = get_select_subquery(sql) %}
{% if config.get('partition_by') != None %}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I separated the logic for partitioning as it's easier to read. When partioning is not enabled, the code is exactly the same as before.

{# Use partitioning #}

{%- set partition_by_field = config.get('partition_by')['field'] -%}
{%- set partition_by_granularity = config.get('partition_by')['granularity'] -%}

-- We cannot create a partitioned table with "create as select".
-- Create a dummy temporary table just to be used as a template for the real one
create temporary table "{{ this.identifier }}__tt" as
select *
from ({{ sql }}) model_subq
where 1 = 2;

-- Create partitioned table as a copy of the template
create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }} (like "{{ this.identifier }}__tt")
partition by range ({{ partition_by_field }});

{% set required_partitions_query %}
-- Partitions need to be manually created before inserting.
-- We execute the model SQL to get the first and last partitions and use the granularity to define what others need to be created.
--
-- Note that we this executes the SQL model a first time just to get the partitions and it will be done a second time to actually get the data.
-- That might be very inefficient depending on the selected volume of data.
-- There's a alternative:
-- - Create as select a temporary non-partitioned table.
-- - Get the min/max partitions from that table.
-- - Move the data to the final table once it's partitioned.
--
-- That option would mean the data is stored twice during model execution.
with partitions as (
-- Generate a sequence with one row per required partition, based on the min and max dates
select
generate_series(
date_trunc('{{ partition_by_granularity }}', min({{ partition_by_field }})),
date_trunc('{{ partition_by_granularity }}', max({{ partition_by_field }})),
'1 {{ partition_by_granularity }}'::interval
) as begin_date
from (
{{ sql }}
) model_subq
)
select
-- Generate the begin-end date and name suffix for each partition
begin_date,
begin_date + '1 {{ partition_by_granularity }}'::interval as end_date,
to_char(begin_date, '_yyyymmdd') as partition_suffix
from partitions;
{% endset %}
{% set required_partitions_results = run_query(required_partitions_query) %}

-- Create the required partitions
{% for required_partition in required_partitions_results.rows %}
create
{% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %}
table
{{ make_intermediate_relation(relation, required_partition['partition_suffix']) }}
partition of {{ relation }} for values from ('{{ required_partition["begin_date"] }}'::timestamp) to ('{{ required_partition["end_date"] }}'::timestamp);
{% endfor %}

-- Insert into the parent table
insert into {{ relation }}
{{ sql }};
{% else %}
as
create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{% endif -%}
{% if contract_config.enforced and (not temporary) -%}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation }} (
{{ adapter.dispatch('get_column_names', 'dbt')() }}
)
{%- set sql = get_select_subquery(sql) %}
{% else %}
as
{% endif %}
(
{{ sql }}
);
{% endif %}
(
{{ sql }}
);
{%- endmacro %}

{% macro postgres__rename_relation(from_relation, to_relation) %}
{% set target_name = adapter.quote_as_configured(to_relation.identifier, 'identifier') %}
{% call statement('rename_relation') -%}
alter table {{ from_relation }} rename to {{ target_name }};

{# If the relation is partitioned, rename the subtables #}
{% set existing_partitions_query %}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partitions are created with the __dbt_tmp suffix and so they need to be individually renamed. Sadly I don't think the config variable is available here so I don't know if there's a way to determine if the relation is partitioned.

select
inhrelid::regclass as from_table_name,
'_' || regexp_substr(inhrelid::regclass::text, '[^_]*$') as partition_suffix -- Get the string after the last underscore
from pg_inherits i
join pg_class c on i.inhparent = c.oid
join pg_namespace ns on c.relnamespace = ns.oid
where ns.nspname = '{{ from_relation.schema }}' and c.relname = '{{ from_relation.identifier }}';
{% endset %}
{% set existing_partitions_results = run_query(existing_partitions_query) %}

-- Rename the existing partitions
{% for existing_partition in existing_partitions_results.rows %}
{%- set partition_relation = make_intermediate_relation(to_relation, existing_partition['partition_suffix']) %}

alter table {{ existing_partition["from_table_name"] }} rename to {{ partition_relation.identifier }};
{% endfor %}
{%- endcall %}
{% endmacro %}

{% macro postgres__get_create_index_sql(relation, index_dict) -%}
{%- set index_config = adapter.parse_index(index_dict) -%}
{%- set comma_separated_columns = ", ".join(index_config.columns) -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,83 @@
{% macro postgres__get_incremental_default_sql(arg_dict) %}
{% macro postgres__get_incremental_merge_sql(arg_dict) %}
{{ create_incremental_missing_partitions(arg_dict) }}

{% if arg_dict["unique_key"] %}
{% do return(get_incremental_delete_insert_sql(arg_dict)) %}
{% else %}
{% do return(get_incremental_append_sql(arg_dict)) %}
{% endif %}
{% do return(default__get_incremental_merge_sql(arg_dict)) %}
{% endmacro %}

{% macro postgres__get_incremental_delete_insert_sql(arg_dict) %}
{{ create_incremental_missing_partitions(arg_dict) }}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was parsed but not executed.
When I wrote the following, it was executed and the partition was created.

  {% do return(
create_incremental_missing_partitions(arg_dict) + default__get_incremental_delete_insert_sql(arg_dict)
) %}


{% do return(default__get_incremental_delete_insert_sql(arg_dict)) %}
{% endmacro %}

{% macro postgres__get_incremental_append_sql(arg_dict) %}
{{ create_incremental_missing_partitions(arg_dict) }}

{% do return(default__get_incremental_append_sql(arg_dict)) %}
{% endmacro %}

{% macro create_incremental_missing_partitions(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set unlogged = config.get('unlogged') %}

{#
-- New data might get inserted to a partition that still does not exist
-- We need to perform an anti join between the partitions that exist in the new (source) table and the target table
-- Partitions that don't exist will be created
#}
{%- set missing_partitions_query %}
with
new_partitions as (
-- Get the partitions present on the new, temporary source table
select
inhrelid::regclass as from_table_name,
regexp_substr(inhrelid::regclass::text, '[^_]*$') as partition_suffix, -- Get the string after the last underscore
pg_get_expr(c.relpartbound, c.oid, true) as partition_expression
from pg_inherits i
join pg_class c_parent on i.inhparent = c_parent.oid
join pg_class c on i.inhrelid = c.oid
where c.relnamespace = pg_my_temp_schema()::regclass
and c_parent.relname = '{{ source.identifier }}'
),
existing_partitions as (
-- Partitions already available on the target table
select
pg_get_expr(c.relpartbound, c.oid, true) as partition_expression
from pg_inherits i
join pg_class c_parent on i.inhparent = c_parent.oid
join pg_class c on i.inhrelid = c.oid
join pg_namespace ns on c.relnamespace = ns.oid
where ns.nspname = '{{ target.schema }}' and c_parent.relname = '{{ target.identifier }}'
),
missing_partitions as (
-- Find the partitions that exist on the source table but not on the target
select
np.from_table_name,
partition_suffix,
partition_expression
from new_partitions np
left join existing_partitions ep using (partition_expression)
where ep.partition_expression is null
)
select *
from missing_partitions
{% endset %}

{% set missing_partitions = run_query(missing_partitions_query) %}
{% for missing_partition in missing_partitions.rows %}
{%- set partition_relation = make_intermediate_relation(target, missing_partition['partition_suffix']) %}

-- Create the missing partition
create {% if unlogged -%}
unlogged
{%- endif %}
table
"{{ target.schema }}"."{{ partition_relation.identifier }}"
partition of {{ target }} {{ missing_partition.partition_expression }};
{% endfor %}

{# Ensure a statement is executed even if no new partitions were created #}
select 1;

{% endmacro %}