diff --git a/docs/changelog/next_release/319.bugfix.rst b/docs/changelog/next_release/319.bugfix.rst new file mode 100644 index 000000000..82a6eebc4 --- /dev/null +++ b/docs/changelog/next_release/319.bugfix.rst @@ -0,0 +1 @@ +Fix ``DBReader(conn=oracle, options={"partitioning_mode": "hash"})`` lead to data skew in last partition due to wrong ``ora_hash`` usage. diff --git a/onetl/connection/db_connection/clickhouse/dialect.py b/onetl/connection/db_connection/clickhouse/dialect.py index 394843b80..1ee213d0e 100644 --- a/onetl/connection/db_connection/clickhouse/dialect.py +++ b/onetl/connection/db_connection/clickhouse/dialect.py @@ -10,7 +10,7 @@ class ClickhouseDialect(JDBCDialect): def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str: - return f"modulo(halfMD5({partition_column}), {num_partitions})" + return f"halfMD5({partition_column}) % {num_partitions}" def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str: return f"{partition_column} % {num_partitions}" diff --git a/onetl/connection/db_connection/mssql/dialect.py b/onetl/connection/db_connection/mssql/dialect.py index 6be43c802..3cb809ad2 100644 --- a/onetl/connection/db_connection/mssql/dialect.py +++ b/onetl/connection/db_connection/mssql/dialect.py @@ -10,7 +10,7 @@ class MSSQLDialect(JDBCDialect): # https://docs.microsoft.com/ru-ru/sql/t-sql/functions/hashbytes-transact-sql?view=sql-server-ver16 def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str: - return f"CONVERT(BIGINT, HASHBYTES ( 'SHA' , {partition_column} )) % {num_partitions}" + return f"CONVERT(BIGINT, HASHBYTES ('SHA', {partition_column})) % {num_partitions}" def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str: return f"{partition_column} % {num_partitions}" diff --git a/onetl/connection/db_connection/oracle/dialect.py b/onetl/connection/db_connection/oracle/dialect.py index 2f1218716..c7a739039 100644 --- a/onetl/connection/db_connection/oracle/dialect.py +++ b/onetl/connection/db_connection/oracle/dialect.py @@ -43,7 +43,9 @@ def get_sql_query( ) def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str: - return f"ora_hash({partition_column}, {num_partitions})" + # ora_hash returns values from 0 to N including N. + # Balancing N+1 splits to N partitions leads to data skew in last partition. + return f"ora_hash({partition_column}, {num_partitions - 1})" def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str: return f"MOD({partition_column}, {num_partitions})"