From 386a43c2fb483f5de48629ca425cc436a502db98 Mon Sep 17 00:00:00 2001 From: Lukas Gust Date: Thu, 31 Aug 2023 17:36:54 -0600 Subject: [PATCH 1/5] Added null check for partitions batches --- .../models/helpers/get_partition_batches.sql | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql b/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql index 20773841..5780d283 100644 --- a/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql +++ b/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql @@ -18,8 +18,13 @@ {%- set single_partition = [] -%} {%- for col in row -%} + {%- set column_type = adapter.convert_type(table, loop.index0) -%} - {%- if column_type == 'integer' -%} + {%- set comp_func = '=' -%} + {%- if col is none -%} + {%- set value = 'null' -%} + {%- set comp_func = ' is ' -%} + {%- elif column_type == 'integer' -%} {%- set value = col | string -%} {%- elif column_type == 'string' -%} {%- set value = "'" + col + "'" -%} @@ -31,7 +36,7 @@ {%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%} {%- endif -%} {%- set partition_key = adapter.format_one_partition_key(partitioned_by[loop.index0]) -%} - {%- do single_partition.append(partition_key + '=' + value) -%} + {%- do single_partition.append(partition_key + comp_func + value) -%} {%- endfor -%} {%- set single_partition_expression = single_partition | join(' and ') -%} From 66fabd974832749a4afbd1228fcaac90f73bfecb Mon Sep 17 00:00:00 2001 From: Lukas Gust Date: Fri, 1 Sep 2023 11:48:43 -0600 Subject: [PATCH 2/5] Functional tests first pass --- tests/functional/adapter/test_partitions.py | 56 +++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/functional/adapter/test_partitions.py b/tests/functional/adapter/test_partitions.py index 68e639e6..af7301ac 100644 --- a/tests/functional/adapter/test_partitions.py +++ b/tests/functional/adapter/test_partitions.py @@ -17,6 +17,30 @@ cross join unnest(date_array) as t2(date_column) """ +test_null_valued_partitions_model_sql = """ +with date_sequence as ( + select + case + when random() < 0.1 then null + else from_iso8601_date('2023-01-01') + interval '1' day * cast(random() * 212 as integer) + end as date_column + from + unnest(sequence(1, 212)) -- adjust 211 to the number of days you want +) + +, data as ( + select + random() as rnd, + cast(date_column as date) as date_column, + doy(date_column) as doy, + case when random() < 0.1 then null else cast(uuid() as varchar) end as group_guid + from date_sequence +) + +select * +from data +""" + class TestHiveTablePartitions: @pytest.fixture(scope="class") @@ -125,3 +149,35 @@ def test__check_incremental_run_with_partitions(self, project): incremental_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] assert incremental_records_count == 212 + + +class TestHiveNullValuedPartitions: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+table_type": "hive", + "+materialized": "table", + "+partitioned_by": ["date_column", "doy", "group_guid"], + } + } + + @pytest.fixture(scope="class") + def models(self): + return { + "test_hive_partitions_null_values.sql": test_null_valued_partitions_model_sql, + } + + def test__check_run_with_partitions(self, project): + relation_name = "test_hive_partitions_null_values" + model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}" + + first_model_run = run_dbt(["run", "--select", relation_name]) + first_model_run_result = first_model_run.results[0] + + # check that the model run successfully + assert first_model_run_result.status == RunStatus.Success + + records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] + + assert records_count_first_run == 212 From 1294a4a6d60331aefe5addf682940519dcbb56e2 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Sat, 2 Sep 2023 12:23:34 +0200 Subject: [PATCH 3/5] Update test_partitions.py --- tests/functional/adapter/test_partitions.py | 28 ++++++++------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/tests/functional/adapter/test_partitions.py b/tests/functional/adapter/test_partitions.py index af7301ac..72a34dc6 100644 --- a/tests/functional/adapter/test_partitions.py +++ b/tests/functional/adapter/test_partitions.py @@ -18,27 +18,21 @@ """ test_null_valued_partitions_model_sql = """ -with date_sequence as ( +with data as ( select - case - when random() < 0.1 then null - else from_iso8601_date('2023-01-01') + interval '1' day * cast(random() * 212 as integer) - end as date_column + random() as col_1, + row_number() over() as id from - unnest(sequence(1, 212)) -- adjust 211 to the number of days you want + unnest(sequence(1, 200)) ) -, data as ( - select - random() as rnd, - cast(date_column as date) as date_column, - doy(date_column) as doy, - case when random() < 0.1 then null else cast(uuid() as varchar) end as group_guid - from date_sequence -) - -select * +select + col_1, id from data +union all +select random() as col_1, NULL as id +union all +select random() as col_1, NULL as id """ @@ -180,4 +174,4 @@ def test__check_run_with_partitions(self, project): records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] - assert records_count_first_run == 212 + assert records_count_first_run == 202 From 7afccfcc3739879f9b759093dbb58a544c7b42e5 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Sat, 2 Sep 2023 12:33:49 +0200 Subject: [PATCH 4/5] Update test_partitions.py --- tests/functional/adapter/test_partitions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/adapter/test_partitions.py b/tests/functional/adapter/test_partitions.py index 72a34dc6..9bd91de9 100644 --- a/tests/functional/adapter/test_partitions.py +++ b/tests/functional/adapter/test_partitions.py @@ -152,7 +152,7 @@ def project_config_update(self): "models": { "+table_type": "hive", "+materialized": "table", - "+partitioned_by": ["date_column", "doy", "group_guid"], + "+partitioned_by": ["id"], } } From db38042e86180ddaf8ede79529f2d9475da88d65 Mon Sep 17 00:00:00 2001 From: Lukas Gust Date: Tue, 5 Sep 2023 11:02:21 -0600 Subject: [PATCH 5/5] Added additional test --- tests/functional/adapter/test_partitions.py | 95 ++++++++++++++++++++- 1 file changed, 92 insertions(+), 3 deletions(-) diff --git a/tests/functional/adapter/test_partitions.py b/tests/functional/adapter/test_partitions.py index 9bd91de9..f5f1e6d3 100644 --- a/tests/functional/adapter/test_partitions.py +++ b/tests/functional/adapter/test_partitions.py @@ -17,7 +17,7 @@ cross join unnest(date_array) as t2(date_column) """ -test_null_valued_partitions_model_sql = """ +test_single_nullable_partition_model_sql = """ with data as ( select random() as col_1, @@ -35,6 +35,49 @@ select random() as col_1, NULL as id """ +test_nullable_partitions_model_sql = """ +{{ config( + materialized='table', + format='parquet', + s3_data_naming='table', + partitioned_by=['id', 'date_column'] +) }} + +with data as ( + select + random() as rnd, + row_number() over() as id, + cast(date_column as date) as date_column +from ( + values ( + sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day) + ) +) as t1(date_array) +cross join unnest(date_array) as t2(date_column) +) + +select + rnd, + case when id <= 50 then null else id end as id, + date_column +from data +union all +select + random() as rnd, + NULL as id, + NULL as date_column +union all +select + random() as rnd, + NULL as id, + cast('2023-09-02' as date) as date_column +union all +select + random() as rnd, + 40 as id, + NULL as date_column +""" + class TestHiveTablePartitions: @pytest.fixture(scope="class") @@ -146,6 +189,52 @@ def test__check_incremental_run_with_partitions(self, project): class TestHiveNullValuedPartitions: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+table_type": "hive", + "+materialized": "table", + "+partitioned_by": ["id", "date_column"], + } + } + + @pytest.fixture(scope="class") + def models(self): + return { + "test_nullable_partitions_model.sql": test_nullable_partitions_model_sql, + } + + def test__check_run_with_partitions(self, project): + relation_name = "test_nullable_partitions_model" + model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}" + model_run_result_null_id_count_query = ( + f"select count(*) as records from {project.test_schema}.{relation_name} where id is null" + ) + model_run_result_null_date_count_query = ( + f"select count(*) as records from {project.test_schema}.{relation_name} where date_column is null" + ) + + first_model_run = run_dbt(["run", "--select", relation_name]) + first_model_run_result = first_model_run.results[0] + + # check that the model run successfully + assert first_model_run_result.status == RunStatus.Success + + records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] + + assert records_count_first_run == 215 + + null_id_count_first_run = project.run_sql(model_run_result_null_id_count_query, fetch="all")[0][0] + + assert null_id_count_first_run == 52 + + null_date_count_first_run = project.run_sql(model_run_result_null_date_count_query, fetch="all")[0][0] + + assert null_date_count_first_run == 2 + + +class TestHiveSingleNullValuedPartition: @pytest.fixture(scope="class") def project_config_update(self): return { @@ -159,11 +248,11 @@ def project_config_update(self): @pytest.fixture(scope="class") def models(self): return { - "test_hive_partitions_null_values.sql": test_null_valued_partitions_model_sql, + "test_single_nullable_partition_model.sql": test_single_nullable_partition_model_sql, } def test__check_run_with_partitions(self, project): - relation_name = "test_hive_partitions_null_values" + relation_name = "test_single_nullable_partition_model" model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}" first_model_run = run_dbt(["run", "--select", relation_name])