diff --git a/tests/integration/iceberg/docker-compose/provision.py b/tests/integration/iceberg/docker-compose/provision.py index db67b3be27..c8589621a3 100644 --- a/tests/integration/iceberg/docker-compose/provision.py +++ b/tests/integration/iceberg/docker-compose/provision.py @@ -21,7 +21,7 @@ from pyiceberg.schema import Schema from pyiceberg.types import FixedType, NestedField, UUIDType from pyspark.sql import SparkSession -from pyspark.sql.functions import current_date, date_add, expr +from pyspark.sql.functions import col, current_date, date_add, expr, struct spark = SparkSession.builder.getOrCreate() @@ -357,17 +357,19 @@ spark.sql("ALTER TABLE default.test_new_column_with_no_data ADD COLUMN name STRING") -spark.sql( - """ - CREATE OR REPLACE TABLE default.test_table_rename - USING iceberg - AS SELECT - 1 AS idx, 10 AS data - UNION ALL SELECT - 2 AS idx, 20 AS data - UNION ALL SELECT - 3 AS idx, 30 AS data -""" -) +### +# Renaming columns test table +### + +renaming_columns_dataframe = ( + spark.range(1, 2, 3) + .withColumnRenamed("id", "idx") + .withColumn("data", col("idx") * 10) + .withColumn("structcol", struct("idx")) + .withColumn("structcol_oldname", struct("idx")) +) +renaming_columns_dataframe.writeTo("default.test_table_rename").tableProperty("format-version", "2").createOrReplace() spark.sql("ALTER TABLE default.test_table_rename RENAME COLUMN idx TO pos") +spark.sql("ALTER TABLE default.test_table_rename RENAME COLUMN structcol.idx TO pos") +spark.sql("ALTER TABLE default.test_table_rename RENAME COLUMN structcol_oldname TO structcol_2") diff --git a/tests/integration/iceberg/test_table_load.py b/tests/integration/iceberg/test_table_load.py index 225f1ecb8d..1d1032d041 100644 --- a/tests/integration/iceberg/test_table_load.py +++ b/tests/integration/iceberg/test_table_load.py @@ -63,7 +63,7 @@ def test_daft_iceberg_table_collect_correct(table_name, local_iceberg_catalog): @pytest.mark.integration() -def test_daft_iceberg_table_filtered_collect_correct(local_iceberg_catalog): +def test_daft_iceberg_table_renamed_filtered_collect_correct(local_iceberg_catalog): tab = local_iceberg_catalog.load_table(f"default.test_table_rename") df = daft.read_iceberg(tab) df = df.where(df["pos"] <= 1) @@ -74,7 +74,7 @@ def test_daft_iceberg_table_filtered_collect_correct(local_iceberg_catalog): @pytest.mark.integration() -def test_daft_iceberg_table_column_pushdown_collect_correct(local_iceberg_catalog): +def test_daft_iceberg_table_renamed_column_pushdown_collect_correct(local_iceberg_catalog): tab = local_iceberg_catalog.load_table(f"default.test_table_rename") df = daft.read_iceberg(tab) df = df.select("pos")