From 7c216445f8009baa9cec4d61dd56693be1dd79fa Mon Sep 17 00:00:00 2001 From: Christophe Oudar Date: Wed, 26 Apr 2023 13:14:36 +0200 Subject: [PATCH] Support all types of data_type using time ingestion partitioning (#496) * Support all types of data_type using time ingestion partitioning * rework bq_create_table_as & fix partitions * touchups after verifying no bug * change case of test field because the parse routine now sanitizes the config val --------- Co-authored-by: Mila Page <67295367+VersusFacit@users.noreply.github.com> Co-authored-by: Mila Page --- .../unreleased/Fixes-20230202-010912.yaml | 8 ++ dbt/adapters/bigquery/impl.py | 74 ++++++++++++++----- dbt/include/bigquery/macros/adapters.sql | 16 ++++ .../macros/materializations/incremental.sql | 31 ++++---- .../incremental_strategy/common.sql | 9 --- .../incremental_strategy/insert_overwrite.sql | 14 ++-- .../incremental_strategy/merge.sql | 4 +- .../time_ingestion_tables.sql | 35 ++------- .../incremental_strategy_fixtures.py | 46 +++++++++++- .../test_incremental_strategies.py | 20 ++++- tests/unit/test_bigquery_adapter.py | 16 ++-- 11 files changed, 179 insertions(+), 94 deletions(-) create mode 100644 .changes/unreleased/Fixes-20230202-010912.yaml diff --git a/.changes/unreleased/Fixes-20230202-010912.yaml b/.changes/unreleased/Fixes-20230202-010912.yaml new file mode 100644 index 000000000..d85c2ed84 --- /dev/null +++ b/.changes/unreleased/Fixes-20230202-010912.yaml @@ -0,0 +1,8 @@ +kind: Fixes +body: Support all types of data_type using time ingestion partitioning as previously + `date` was failing +time: 2023-02-02T01:09:12.013631+01:00 +custom: + Author: Kayrnt + Issue: "486" + PR: "496" diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index fd99927a2..2bf6ddf84 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -77,9 +77,17 @@ class PartitionConfig(dbtClassMixin): time_ingestion_partitioning: bool = False copy_partitions: bool = False + PARTITION_DATE = "_PARTITIONDATE" + PARTITION_TIME = "_PARTITIONTIME" + def data_type_for_partition(self): - """Return the data type of partitions for replacement.""" - return self.data_type if not self.time_ingestion_partitioning else "timestamp" + """Return the data type of partitions for replacement. + When time_ingestion_partitioning is enabled, the data type supported are date & timestamp. + """ + if not self.time_ingestion_partitioning: + return self.data_type + + return "date" if self.data_type == "date" else "timestamp" def reject_partition_field_column(self, columns: List[Any]) -> List[str]: return [c for c in columns if not c.name.upper() == self.field.upper()] @@ -87,12 +95,28 @@ def reject_partition_field_column(self, columns: List[Any]) -> List[str]: def data_type_should_be_truncated(self): """Return true if the data type should be truncated instead of cast to the data type.""" return not ( - self.data_type.lower() == "int64" - or (self.data_type.lower() == "date" and self.granularity.lower() == "day") + self.data_type == "int64" or (self.data_type == "date" and self.granularity == "day") ) + def time_partitioning_field(self) -> str: + """Return the time partitioning field name based on the data type. + The default is _PARTITIONTIME, but for date it is _PARTITIONDATE + else it will fail statements for type mismatch.""" + if self.data_type == "date": + return self.PARTITION_DATE + else: + return self.PARTITION_TIME + + def insertable_time_partitioning_field(self) -> str: + """Return the insertable time partitioning field name based on the data type. + Practically, only _PARTITIONTIME works so far. + The function is meant to keep the call sites consistent as it might evolve.""" + return self.PARTITION_TIME + def render(self, alias: Optional[str] = None): - column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME" + column: str = ( + self.field if not self.time_ingestion_partitioning else self.time_partitioning_field() + ) if alias: column = f"{alias}.{column}" @@ -107,6 +131,9 @@ def render_wrapped(self, alias: Optional[str] = None): if ( self.data_type in ("date", "timestamp", "datetime") and not self.data_type_should_be_truncated() + and not ( + self.time_ingestion_partitioning and self.data_type == "date" + ) # _PARTITIONDATE is already a date ): return f"{self.data_type}({self.render(alias)})" else: @@ -118,7 +145,12 @@ def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]: return None try: cls.validate(raw_partition_by) - return cls.from_dict(raw_partition_by) + return cls.from_dict( + { + key: (value.lower() if isinstance(value, str) else value) + for key, value in raw_partition_by.items() + } + ) except ValidationError as exc: raise dbt.exceptions.DbtValidationError("Could not parse partition config") from exc except TypeError: @@ -273,9 +305,16 @@ def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryCo return [] @available.parse(lambda *a, **k: []) - def add_time_ingestion_partition_column(self, columns) -> List[BigQueryColumn]: - "Add time ingestion partition column to columns list" - columns.append(self.Column("_PARTITIONTIME", "TIMESTAMP", None, "NULLABLE")) + def add_time_ingestion_partition_column(self, partition_by, columns) -> List[BigQueryColumn]: + """Add time ingestion partition column to columns list""" + columns.append( + self.Column( + partition_by.insertable_time_partitioning_field(), + partition_by.data_type, + None, + "NULLABLE", + ) + ) return columns def expand_column_types(self, goal: BigQueryRelation, current: BigQueryRelation) -> None: # type: ignore[override] @@ -564,18 +603,15 @@ def _partitions_match(table, conf_partition: Optional[PartitionConfig]) -> bool: if not is_partitioned and not conf_partition: return True elif conf_partition and table.time_partitioning is not None: - partitioning_field = table.time_partitioning.field or "_PARTITIONTIME" - table_field = partitioning_field.lower() - table_granularity = table.partitioning_type.lower() - conf_table_field = ( - conf_partition.field - if not conf_partition.time_ingestion_partitioning - else "_PARTITIONTIME" + table_field = ( + table.time_partitioning.field.lower() if table.time_partitioning.field else None ) + table_granularity = table.partitioning_type + conf_table_field = conf_partition.field return ( - table_field == conf_table_field.lower() - and table_granularity == conf_partition.granularity.lower() - ) + table_field == conf_table_field + or (conf_partition.time_ingestion_partitioning and table_field is not None) + ) and table_granularity == conf_partition.granularity elif conf_partition and table.range_partitioning is not None: dest_part = table.range_partitioning conf_part = conf_partition.range or {} diff --git a/dbt/include/bigquery/macros/adapters.sql b/dbt/include/bigquery/macros/adapters.sql index ed9359bee..23a3f3bf6 100644 --- a/dbt/include/bigquery/macros/adapters.sql +++ b/dbt/include/bigquery/macros/adapters.sql @@ -2,6 +2,8 @@ {% macro partition_by(partition_config) -%} {%- if partition_config is none -%} {% do return('') %} + {%- elif partition_config.time_ingestion_partitioning -%} + partition by {{ partition_config.render_wrapped() }} {%- elif partition_config.data_type | lower in ('date','timestamp','datetime') -%} partition by {{ partition_config.render() }} {%- elif partition_config.data_type | lower in ('int64') -%} @@ -48,6 +50,11 @@ {%- set sql_header = config.get('sql_header', none) -%} {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + {%- if partition_config.time_ingestion_partitioning -%} + {%- set columns = get_columns_with_types_in_query_sql(sql) -%} + {%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%} + {%- set columns = '(' ~ table_dest_columns_csv ~ ')' -%} + {%- endif -%} {{ sql_header if sql_header is not none }} @@ -57,14 +64,23 @@ {{ get_assert_columns_equivalent(compiled_code) }} {{ get_table_columns_and_constraints() }} {%- set compiled_code = get_select_subquery(compiled_code) %} + {% else %} + {#-- cannot do contracts at the same time as time ingestion partitioning -#} + {{ columns }} {% endif %} {{ partition_by(partition_config) }} {{ cluster_by(raw_cluster_by) }} {{ bigquery_table_options(config, model, temporary) }} + + {#-- PARTITION BY cannot be used with the AS query_statement clause. + https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#partition_expression + -#} + {%- if not partition_config.time_ingestion_partitioning %} as ( {{ compiled_code }} ); + {%- endif %} {%- elif language == 'python' -%} {#-- N.B. Python models _can_ write to temp views HOWEVER they use a different session diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 9f1479749..2cbb14d9b 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -16,24 +16,25 @@ {% macro source_sql_with_partition(partition_by, source_sql) %} {%- if partition_by.time_ingestion_partitioning %} - {{ return(wrap_with_time_ingestion_partitioning_sql(build_partition_time_exp(partition_by.field), source_sql, False)) }} + {{ return(wrap_with_time_ingestion_partitioning_sql(partition_by, source_sql, False)) }} {% else %} {{ return(source_sql) }} {%- endif -%} {% endmacro %} -{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, compiled_code, language='sql') %} - {% if is_time_ingestion_partitioning and language == 'python' %} + +{% macro bq_create_table_as(partition_by, temporary, relation, compiled_code, language='sql') %} + {%- set _dbt_max_partition = declare_dbt_max_partition(this, partition_by, compiled_code, language) -%} + {% if partition_by.time_ingestion_partitioning and language == 'python' %} {% do exceptions.raise_compiler_error( "Python models do not support ingestion time partitioning" ) %} - {% endif %} - {% if is_time_ingestion_partitioning and language == 'sql' %} + {% elif partition_by.time_ingestion_partitioning and language == 'sql' %} {#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#} - {% do run_query(create_ingestion_time_partitioned_table_as_sql(temporary, relation, compiled_code)) %} - {{ return(bq_insert_into_ingestion_time_partitioned_table_sql(relation, compiled_code)) }} + {% do run_query(create_table_as(temporary, relation, compiled_code)) %} + {{ return(_dbt_max_partition + bq_insert_into_ingestion_time_partitioned_table_sql(relation, compiled_code)) }} {% else %} - {{ return(create_table_as(temporary, relation, compiled_code, language)) }} + {{ return(_dbt_max_partition + create_table_as(temporary, relation, compiled_code, language)) }} {% endif %} {% endmacro %} @@ -93,14 +94,14 @@ {% elif existing_relation is none %} {%- call statement('main', language=language) -%} - {{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }} + {{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }} {%- endcall -%} {% elif existing_relation.is_view %} {#-- There's no way to atomically replace a view with a table on BQ --#} {{ adapter.drop_relation(existing_relation) }} {%- call statement('main', language=language) -%} - {{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }} + {{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }} {%- endcall -%} {% elif full_refresh_mode %} @@ -110,7 +111,7 @@ {{ adapter.drop_relation(existing_relation) }} {% endif %} {%- call statement('main', language=language) -%} - {{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }} + {{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }} {%- endcall -%} {% else %} @@ -127,9 +128,7 @@ {#-- Check first, since otherwise we may not build a temp table --#} {#-- Python always needs to create a temp table --#} {%- call statement('create_tmp_relation', language=language) -%} - {{ declare_dbt_max_partition(this, partition_by, compiled_code, language) + - bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, compiled_code, language) - }} + {{ bq_create_table_as(partition_by, True, tmp_relation, compiled_code, language) }} {%- endcall -%} {% set tmp_relation_exists = true %} {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} @@ -139,9 +138,11 @@ {% if not dest_columns %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {% endif %} + {#-- Add time ingestion pseudo column to destination column as not part of the 'schema' but still need it for actual data insertion --#} {% if partition_by.time_ingestion_partitioning %} - {% set dest_columns = adapter.add_time_ingestion_partition_column(dest_columns) %} + {% set dest_columns = adapter.add_time_ingestion_partition_column(partition_by, dest_columns) %} {% endif %} + {% set build_sql = bq_generate_incremental_build_sql( strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates ) %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql index b9f8560d9..9d71ba7c0 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql @@ -1,12 +1,3 @@ -{% macro build_partition_time_exp(partition_by) %} - {% if partition_by.data_type == 'timestamp' %} - {% set partition_value = partition_by.field %} - {% else %} - {% set partition_value = 'timestamp(' + partition_by.field + ')' %} - {% endif %} - {{ return({'value': partition_value, 'field': partition_by.field}) }} -{% endmacro %} - {% macro declare_dbt_max_partition(relation, partition_by, compiled_code, language='sql') %} {#-- TODO: revisit partitioning with python models --#} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 3153f49d0..41c11c15d 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -58,7 +58,7 @@ {%- set source_sql -%} ( {%- if partition_by.time_ingestion_partitioning -%} - {{ wrap_with_time_ingestion_partitioning_sql(build_partition_time_exp(partition_by), sql, True) }} + {{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }} {%- else -%} {{sql}} {%- endif -%} @@ -85,8 +85,7 @@ ) %} {# We run temp table creation in a separated script to move to partitions copy #} {%- call statement('create_tmp_relation_for_copy', language='sql') -%} - {{ declare_dbt_max_partition(this, partition_by, sql, 'sql') + - bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql, 'sql') + {{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql') }} {%- endcall %} {%- set partitions_sql -%} @@ -112,7 +111,7 @@ ( select {% if partition_by.time_ingestion_partitioning -%} - _PARTITIONTIME, + {{ partition_by.insertable_time_partitioning_field() }}, {%- endif -%} * from {{ tmp_relation }} ) @@ -123,19 +122,18 @@ {# have we already created the temp table to check for schema changes? #} {% if not tmp_relation_exists %} - {{ declare_dbt_max_partition(this, partition_by, sql) }} - -- 1. create a temp table with model data - {{ bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql, 'sql') }} + {{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql') }} {% else %} -- 1. temp table already exists, we used it to check for schema changes {% endif %} + {%- set partition_field = partition_by.time_partitioning_field() if partition_by.time_ingestion_partitioning else partition_by.render_wrapped() -%} -- 2. define partitions to update set (dbt_partitions_for_replacement) = ( select as struct -- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null - array_agg(distinct {{ partition_by.render_wrapped() }} IGNORE NULLS) + array_agg(distinct {{ partition_field }} IGNORE NULLS) from {{ tmp_relation }} ); diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql index 57c88dbc8..90af66f52 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql @@ -6,14 +6,14 @@ ( select {% if partition_by.time_ingestion_partitioning -%} - _PARTITIONTIME, + {{ partition_by.insertable_time_partitioning_field() }}, {%- endif -%} * from {{ tmp_relation }} ) {%- else -%} {#-- wrap sql in parens to make it a subquery --#} ( {%- if partition_by.time_ingestion_partitioning -%} - {{ wrap_with_time_ingestion_partitioning_sql(build_partition_time_exp(partition_by), sql, True) }} + {{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }} {%- else -%} {{sql}} {%- endif %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/time_ingestion_tables.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/time_ingestion_tables.sql index 79d6a74eb..0a118dab6 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/time_ingestion_tables.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/time_ingestion_tables.sql @@ -1,35 +1,11 @@ -{% macro wrap_with_time_ingestion_partitioning_sql(partition_time_exp, sql, is_nested) %} +{% macro wrap_with_time_ingestion_partitioning_sql(partition_by, sql, is_nested) %} - select {{ partition_time_exp['value'] }} as _partitiontime, * EXCEPT({{ partition_time_exp['field'] }}) from ( + select TIMESTAMP({{ partition_by.field }}) as {{ partition_by.insertable_time_partitioning_field() }}, * EXCEPT({{ partition_by.field }}) from ( {{ sql }} ){%- if not is_nested -%};{%- endif -%} {% endmacro %} -{% macro create_ingestion_time_partitioned_table_as_sql(temporary, relation, sql) -%} - {%- set raw_partition_by = config.get('partition_by', none) -%} - {%- set raw_cluster_by = config.get('cluster_by', none) -%} - {%- set sql_header = config.get('sql_header', none) -%} - - {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} - - {%- set columns = get_columns_with_types_in_query_sql(sql) -%} - {%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%} - - {{ sql_header if sql_header is not none }} - - {% set ingestion_time_partition_config_raw = fromjson(tojson(raw_partition_by)) %} - {% do ingestion_time_partition_config_raw.update({'field':'_PARTITIONTIME'}) %} - - {%- set ingestion_time_partition_config = adapter.parse_partition_by(ingestion_time_partition_config_raw) -%} - - create or replace table {{ relation }} ({{table_dest_columns_csv}}) - {{ partition_by(ingestion_time_partition_config) }} - {{ cluster_by(raw_cluster_by) }} - {{ bigquery_table_options(config, model, temporary) }} - -{%- endmacro -%} - {% macro get_quoted_with_types_csv(columns) %} {% set quoted = [] %} {% for col in columns -%} @@ -48,12 +24,13 @@ {%- endmacro -%} {% macro bq_insert_into_ingestion_time_partitioned_table_sql(target_relation, sql) -%} - {%- set partition_by = config.get('partition_by', none) -%} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} {%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} - insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }}) - {{ wrap_with_time_ingestion_partitioning_sql(build_partition_time_exp(partition_by), sql, False) }} + insert into {{ target_relation }} ({{ partition_by.insertable_time_partitioning_field() }}, {{ dest_columns_csv }}) + {{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, False) }} {%- endmacro -%} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index b3d45ae80..6bb429833 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -54,7 +54,7 @@ cluster_by="id", partition_by={ "field": "date_time", - "data_type": "datetime" + "data_type": "dateTime" } ) }} @@ -220,6 +220,50 @@ {% endif %} """.lstrip() +overwrite_day_with_time_partition_datetime_sql = """ +{{ + config( + materialized="incremental", + incremental_strategy='insert_overwrite', + cluster_by="id", + partition_by={ + "field": "date_day", + "data_type": "date", + "time_ingestion_partitioning": true + } + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as date) as date_day union all + select 2 as id, cast('2020-01-01' as date) as date_day union all + select 3 as id, cast('2020-01-01' as date) as date_day union all + select 4 as id, cast('2020-01-01' as date) as date_day + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as date) as date_day union all + select 20 as id, cast('2020-01-01' as date) as date_day union all + select 30 as id, cast('2020-01-02' as date) as date_day union all + select 40 as id, cast('2020-01-02' as date) as date_day + + {% endif %} + +) + +select * from data + +{% if is_incremental() %} +where date_day >= '2020-01-01' +{% endif %} +""".lstrip() + overwrite_partitions_sql = """ {{ config( diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index a49872310..8a90b98ab 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -24,6 +24,7 @@ overwrite_range_sql, overwrite_time_sql, overwrite_day_with_time_ingestion_sql, + overwrite_day_with_time_partition_datetime_sql, ) @@ -43,7 +44,8 @@ def models(self): "incremental_overwrite_partitions.sql": overwrite_partitions_sql, "incremental_overwrite_range.sql": overwrite_range_sql, "incremental_overwrite_time.sql": overwrite_time_sql, - "incremental_overwrite_day_with_time_ingestion.sql": overwrite_day_with_time_ingestion_sql, + "incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql, + "incremental_overwrite_day_with_time_partition_datetime.sql": overwrite_day_with_time_partition_datetime_sql, } @pytest.fixture(scope="class") @@ -61,10 +63,10 @@ def seeds(self): def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(self, project): run_dbt(["seed"]) results = run_dbt() - assert len(results) == 9 + assert len(results) == 10 results = run_dbt() - assert len(results) == 9 + assert len(results) == 10 incremental_strategies = [ ("incremental_merge_range", "merge_expected"), ("incremental_merge_time", "merge_expected"), @@ -73,6 +75,10 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se ("incremental_overwrite_partitions", "incremental_overwrite_date_expected"), ("incremental_overwrite_day", "incremental_overwrite_day_expected"), ("incremental_overwrite_range", "incremental_overwrite_range_expected"), + ( + "incremental_overwrite_day_with_time_partition_datetime", + "incremental_overwrite_day_with_time_partition_expected", + ), ] db_with_schema = f"{project.database}.{project.test_schema}" for incremental_strategy in incremental_strategies: @@ -87,3 +93,11 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se ) expected = get_relation_columns(project.adapter, "incremental_overwrite_day_expected") assert created == expected + + created = get_relation_columns( + project.adapter, "incremental_overwrite_day_with_time_partition" + ) + expected = get_relation_columns( + project.adapter, "incremental_overwrite_day_with_time_partition_expected" + ) + assert created == expected diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index 4dece85a6..56b8e07d7 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -752,7 +752,7 @@ def test_parse_partition_by(self): { "field": "ts", "data_type": "date", - "granularity": "MONTH", + "granularity": "month", "time_ingestion_partitioning": False, "copy_partitions": False, }, @@ -765,7 +765,7 @@ def test_parse_partition_by(self): { "field": "ts", "data_type": "date", - "granularity": "YEAR", + "granularity": "year", "time_ingestion_partitioning": False, "copy_partitions": False, }, @@ -778,7 +778,7 @@ def test_parse_partition_by(self): { "field": "ts", "data_type": "timestamp", - "granularity": "HOUR", + "granularity": "hour", "time_ingestion_partitioning": False, "copy_partitions": False, }, @@ -791,7 +791,7 @@ def test_parse_partition_by(self): { "field": "ts", "data_type": "timestamp", - "granularity": "MONTH", + "granularity": "month", "time_ingestion_partitioning": False, "copy_partitions": False, }, @@ -804,7 +804,7 @@ def test_parse_partition_by(self): { "field": "ts", "data_type": "timestamp", - "granularity": "YEAR", + "granularity": "year", "time_ingestion_partitioning": False, "copy_partitions": False, }, @@ -817,7 +817,7 @@ def test_parse_partition_by(self): { "field": "ts", "data_type": "datetime", - "granularity": "HOUR", + "granularity": "hour", "time_ingestion_partitioning": False, "copy_partitions": False, }, @@ -830,7 +830,7 @@ def test_parse_partition_by(self): { "field": "ts", "data_type": "datetime", - "granularity": "MONTH", + "granularity": "month", "time_ingestion_partitioning": False, "copy_partitions": False, }, @@ -843,7 +843,7 @@ def test_parse_partition_by(self): { "field": "ts", "data_type": "datetime", - "granularity": "YEAR", + "granularity": "year", "time_ingestion_partitioning": False, "copy_partitions": False, },