Skip to content

Commit

Permalink
Using if_exists=replace_overlapping_partitions is DANGEROUS on non-pa…
Browse files Browse the repository at this point in the history
…rtitioned tables
  • Loading branch information
dolfinus committed Dec 6, 2024
1 parent 1cc7fec commit e9a4999
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
5 changes: 5 additions & 0 deletions onetl/connection/db_connection/hive/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ class Config:
Same as Spark's ``df.write.insertInto(table, overwrite=True)`` +
``spark.sql.sources.partitionOverwriteMode=dynamic``.
.. DANGER::
This mode does make sense **ONLY** if the table is partitioned.
**IF NOT, YOU'LL LOOSE YOUR DATA!**
.. dropdown:: Behavior in details
* Table does not exist
Expand Down
5 changes: 5 additions & 0 deletions onetl/file/file_df_writer/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ class Config:
Same as Spark's ``df.write.mode("overwrite").save()`` +
``spark.sql.sources.partitionOverwriteMode=dynamic``.
.. DANGER::
This mode does make sense **ONLY** if the directory is partitioned.
**IF NOT, YOU'LL LOOSE YOUR DATA!**
.. dropdown:: Behavior in details
* Directory does not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,60 @@ def test_hive_writer_insert_into_replace_overlapping_partitions_in_partitioned_t
assert new_ddl == old_ddl


@pytest.mark.parametrize(
"partition_by",
[
pytest.param(None, id="table_partitioned_dataframe_is_not"),
pytest.param("hwm_int", id="different_partitioning_schema"),
pytest.param("id_int", id="same_partitioning_schema"),
],
)
def test_hive_writer_insert_into_replace_overlapping_partitions_in_partitioned_table_empty_dataframe(
spark,
processing,
get_schema_table,
partition_by,
):
df = processing.create_spark_df(spark=spark)

df1 = df[df.id_int <= 25]
df2 = df.where("id_int > 25 AND id_int <= 50")
empty_df = spark.createDataFrame([], df.schema)

hive = Hive(cluster="rnd-dwh", spark=spark)

writer1 = DBWriter(
connection=hive,
target=get_schema_table.full_name,
options=Hive.WriteOptions(partition_by="id_int"),
)

# create & fill up the table with some data
writer1.run(df1.union(df2))
old_ddl = hive.sql(f"SHOW CREATE TABLE {get_schema_table.full_name}").collect()[0][0]

# insert empty dataframe with the same schema
writer2 = DBWriter(
connection=hive,
target=get_schema_table.full_name,
options=Hive.WriteOptions(if_exists="replace_overlapping_partitions", partition_by=partition_by),
)

writer2.run(empty_df)
new_ddl = hive.sql(f"SHOW CREATE TABLE {get_schema_table.full_name}").collect()[0][0]

# table contains all data from (df1, df2), no partitions deleted
processing.assert_equal_df(
schema=get_schema_table.schema,
table=get_schema_table.table,
df=df1.union(df2),
order_by="id_int",
)

# table DDL remains the same
assert new_ddl == old_ddl


@pytest.mark.parametrize("mode", ["append", "replace_overlapping_partitions"])
def test_hive_writer_insert_into_wrong_columns(spark, processing, prepare_schema_table, mode):
df = processing.create_spark_df(spark=spark)
Expand Down

0 comments on commit e9a4999

Please sign in to comment.