-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] - Partitioning support #78
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,29 +4,127 @@ | |
|
||
{{ sql_header if sql_header is not none }} | ||
|
||
create {% if temporary -%} | ||
temporary | ||
{%- elif unlogged -%} | ||
unlogged | ||
{%- endif %} table {{ relation }} | ||
{% set contract_config = config.get('contract') %} | ||
{% if contract_config.enforced %} | ||
{{ get_assert_columns_equivalent(sql) }} | ||
{% endif -%} | ||
{% if contract_config.enforced and (not temporary) -%} | ||
{{ get_table_columns_and_constraints() }} ; | ||
insert into {{ relation }} ( | ||
{{ adapter.dispatch('get_column_names', 'dbt')() }} | ||
) | ||
{%- set sql = get_select_subquery(sql) %} | ||
{% if config.get('partition_by') != None %} | ||
{# Use partitioning #} | ||
|
||
{%- set partition_by_field = config.get('partition_by')['field'] -%} | ||
{%- set partition_by_granularity = config.get('partition_by')['granularity'] -%} | ||
|
||
-- We cannot create a partitioned table with "create as select". | ||
-- Create a dummy temporary table just to be used as a template for the real one | ||
create temporary table "{{ this.identifier }}__tt" as | ||
select * | ||
from ({{ sql }}) model_subq | ||
where 1 = 2; | ||
|
||
-- Create partitioned table as a copy of the template | ||
create {% if temporary -%} | ||
temporary | ||
{%- elif unlogged -%} | ||
unlogged | ||
{%- endif %} table {{ relation }} (like "{{ this.identifier }}__tt") | ||
partition by range ({{ partition_by_field }}); | ||
|
||
{% set required_partitions_query %} | ||
-- Partitions need to be manually created before inserting. | ||
-- We execute the model SQL to get the first and last partitions and use the granularity to define what others need to be created. | ||
-- | ||
-- Note that we this executes the SQL model a first time just to get the partitions and it will be done a second time to actually get the data. | ||
-- That might be very inefficient depending on the selected volume of data. | ||
-- There's a alternative: | ||
-- - Create as select a temporary non-partitioned table. | ||
-- - Get the min/max partitions from that table. | ||
-- - Move the data to the final table once it's partitioned. | ||
-- | ||
-- That option would mean the data is stored twice during model execution. | ||
with partitions as ( | ||
-- Generate a sequence with one row per required partition, based on the min and max dates | ||
select | ||
generate_series( | ||
date_trunc('{{ partition_by_granularity }}', min({{ partition_by_field }})), | ||
date_trunc('{{ partition_by_granularity }}', max({{ partition_by_field }})), | ||
'1 {{ partition_by_granularity }}'::interval | ||
) as begin_date | ||
from ( | ||
{{ sql }} | ||
) model_subq | ||
) | ||
select | ||
-- Generate the begin-end date and name suffix for each partition | ||
begin_date, | ||
begin_date + '1 {{ partition_by_granularity }}'::interval as end_date, | ||
to_char(begin_date, '_yyyymmdd') as partition_suffix | ||
from partitions; | ||
{% endset %} | ||
{% set required_partitions_results = run_query(required_partitions_query) %} | ||
|
||
-- Create the required partitions | ||
{% for required_partition in required_partitions_results.rows %} | ||
create | ||
{% if temporary -%} | ||
temporary | ||
{%- elif unlogged -%} | ||
unlogged | ||
{%- endif %} | ||
table | ||
{{ make_intermediate_relation(relation, required_partition['partition_suffix']) }} | ||
partition of {{ relation }} for values from ('{{ required_partition["begin_date"] }}'::timestamp) to ('{{ required_partition["end_date"] }}'::timestamp); | ||
{% endfor %} | ||
|
||
-- Insert into the parent table | ||
insert into {{ relation }} | ||
{{ sql }}; | ||
{% else %} | ||
as | ||
create {% if temporary -%} | ||
temporary | ||
{%- elif unlogged -%} | ||
unlogged | ||
{%- endif %} table {{ relation }} | ||
{% set contract_config = config.get('contract') %} | ||
{% if contract_config.enforced %} | ||
{{ get_assert_columns_equivalent(sql) }} | ||
{% endif -%} | ||
{% if contract_config.enforced and (not temporary) -%} | ||
{{ get_table_columns_and_constraints() }} ; | ||
insert into {{ relation }} ( | ||
{{ adapter.dispatch('get_column_names', 'dbt')() }} | ||
) | ||
{%- set sql = get_select_subquery(sql) %} | ||
{% else %} | ||
as | ||
{% endif %} | ||
( | ||
{{ sql }} | ||
); | ||
{% endif %} | ||
( | ||
{{ sql }} | ||
); | ||
{%- endmacro %} | ||
|
||
{% macro postgres__rename_relation(from_relation, to_relation) %} | ||
{% set target_name = adapter.quote_as_configured(to_relation.identifier, 'identifier') %} | ||
{% call statement('rename_relation') -%} | ||
alter table {{ from_relation }} rename to {{ target_name }}; | ||
|
||
{# If the relation is partitioned, rename the subtables #} | ||
{% set existing_partitions_query %} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Partitions are created with the __dbt_tmp suffix and so they need to be individually renamed. Sadly I don't think the |
||
select | ||
inhrelid::regclass as from_table_name, | ||
'_' || regexp_substr(inhrelid::regclass::text, '[^_]*$') as partition_suffix -- Get the string after the last underscore | ||
from pg_inherits i | ||
join pg_class c on i.inhparent = c.oid | ||
join pg_namespace ns on c.relnamespace = ns.oid | ||
where ns.nspname = '{{ from_relation.schema }}' and c.relname = '{{ from_relation.identifier }}'; | ||
{% endset %} | ||
{% set existing_partitions_results = run_query(existing_partitions_query) %} | ||
|
||
-- Rename the existing partitions | ||
{% for existing_partition in existing_partitions_results.rows %} | ||
{%- set partition_relation = make_intermediate_relation(to_relation, existing_partition['partition_suffix']) %} | ||
|
||
alter table {{ existing_partition["from_table_name"] }} rename to {{ partition_relation.identifier }}; | ||
{% endfor %} | ||
{%- endcall %} | ||
{% endmacro %} | ||
|
||
{% macro postgres__get_create_index_sql(relation, index_dict) -%} | ||
{%- set index_config = adapter.parse_index(index_dict) -%} | ||
{%- set comma_separated_columns = ", ".join(index_config.columns) -%} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,83 @@ | ||
{% macro postgres__get_incremental_default_sql(arg_dict) %} | ||
{% macro postgres__get_incremental_merge_sql(arg_dict) %} | ||
{{ create_incremental_missing_partitions(arg_dict) }} | ||
|
||
{% if arg_dict["unique_key"] %} | ||
{% do return(get_incremental_delete_insert_sql(arg_dict)) %} | ||
{% else %} | ||
{% do return(get_incremental_append_sql(arg_dict)) %} | ||
{% endif %} | ||
{% do return(default__get_incremental_merge_sql(arg_dict)) %} | ||
{% endmacro %} | ||
|
||
{% macro postgres__get_incremental_delete_insert_sql(arg_dict) %} | ||
{{ create_incremental_missing_partitions(arg_dict) }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was parsed but not executed.
|
||
|
||
{% do return(default__get_incremental_delete_insert_sql(arg_dict)) %} | ||
{% endmacro %} | ||
|
||
{% macro postgres__get_incremental_append_sql(arg_dict) %} | ||
{{ create_incremental_missing_partitions(arg_dict) }} | ||
|
||
{% do return(default__get_incremental_append_sql(arg_dict)) %} | ||
{% endmacro %} | ||
|
||
{% macro create_incremental_missing_partitions(arg_dict) %} | ||
{%- set target = arg_dict["target_relation"] -%} | ||
{%- set source = arg_dict["temp_relation"] -%} | ||
{%- set unlogged = config.get('unlogged') %} | ||
|
||
{# | ||
-- New data might get inserted to a partition that still does not exist | ||
-- We need to perform an anti join between the partitions that exist in the new (source) table and the target table | ||
-- Partitions that don't exist will be created | ||
#} | ||
{%- set missing_partitions_query %} | ||
with | ||
new_partitions as ( | ||
-- Get the partitions present on the new, temporary source table | ||
select | ||
inhrelid::regclass as from_table_name, | ||
regexp_substr(inhrelid::regclass::text, '[^_]*$') as partition_suffix, -- Get the string after the last underscore | ||
pg_get_expr(c.relpartbound, c.oid, true) as partition_expression | ||
from pg_inherits i | ||
join pg_class c_parent on i.inhparent = c_parent.oid | ||
join pg_class c on i.inhrelid = c.oid | ||
where c.relnamespace = pg_my_temp_schema()::regclass | ||
and c_parent.relname = '{{ source.identifier }}' | ||
), | ||
existing_partitions as ( | ||
-- Partitions already available on the target table | ||
select | ||
pg_get_expr(c.relpartbound, c.oid, true) as partition_expression | ||
from pg_inherits i | ||
join pg_class c_parent on i.inhparent = c_parent.oid | ||
join pg_class c on i.inhrelid = c.oid | ||
join pg_namespace ns on c.relnamespace = ns.oid | ||
where ns.nspname = '{{ target.schema }}' and c_parent.relname = '{{ target.identifier }}' | ||
), | ||
missing_partitions as ( | ||
-- Find the partitions that exist on the source table but not on the target | ||
select | ||
np.from_table_name, | ||
partition_suffix, | ||
partition_expression | ||
from new_partitions np | ||
left join existing_partitions ep using (partition_expression) | ||
where ep.partition_expression is null | ||
) | ||
select * | ||
from missing_partitions | ||
{% endset %} | ||
|
||
{% set missing_partitions = run_query(missing_partitions_query) %} | ||
{% for missing_partition in missing_partitions.rows %} | ||
{%- set partition_relation = make_intermediate_relation(target, missing_partition['partition_suffix']) %} | ||
|
||
-- Create the missing partition | ||
create {% if unlogged -%} | ||
unlogged | ||
{%- endif %} | ||
table | ||
"{{ target.schema }}"."{{ partition_relation.identifier }}" | ||
partition of {{ target }} {{ missing_partition.partition_expression }}; | ||
{% endfor %} | ||
|
||
{# Ensure a statement is executed even if no new partitions were created #} | ||
select 1; | ||
|
||
{% endmacro %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I separated the logic for partitioning as it's easier to read. When partioning is not enabled, the code is exactly the same as before.