diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index ec9825d40bbe..df3e2051f771 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -23,6 +23,12 @@ import static org.apache.iceberg.DistributionMode.RANGE; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; @@ -425,7 +431,107 @@ public String branch() { return branch; } - public String parquetCompressionCodec() { + public Map writeProperties() { + Map writeProperties = Maps.newHashMap(); + writeProperties.putAll(dataWriteProperties()); + writeProperties.putAll(deleteWriteProperties()); + return writeProperties; + } + + private Map dataWriteProperties() { + Map writeProperties = Maps.newHashMap(); + FileFormat dataFormat = dataFileFormat(); + + switch (dataFormat) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); + String parquetCompressionLevel = parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + break; + + case AVRO: + writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); + String avroCompressionLevel = avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel); + } + break; + + case ORC: + writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + break; + + default: + // skip + } + + return writeProperties; + } + + private Map deleteWriteProperties() { + Map writeProperties = Maps.newHashMap(); + FileFormat deleteFormat = deleteFileFormat(); + + switch (deleteFormat) { + case PARQUET: + setWritePropertyWithFallback( + writeProperties, + DELETE_PARQUET_COMPRESSION, + deleteParquetCompressionCodec(), + parquetCompressionCodec()); + setWritePropertyWithFallback( + writeProperties, + DELETE_PARQUET_COMPRESSION_LEVEL, + deleteParquetCompressionLevel(), + parquetCompressionLevel()); + break; + + case AVRO: + setWritePropertyWithFallback( + writeProperties, + DELETE_AVRO_COMPRESSION, + deleteAvroCompressionCodec(), + avroCompressionCodec()); + setWritePropertyWithFallback( + writeProperties, + DELETE_AVRO_COMPRESSION_LEVEL, + deleteAvroCompressionLevel(), + avroCompressionLevel()); + break; + + case ORC: + setWritePropertyWithFallback( + writeProperties, + DELETE_ORC_COMPRESSION, + deleteOrcCompressionCodec(), + orcCompressionCodec()); + setWritePropertyWithFallback( + writeProperties, + DELETE_ORC_COMPRESSION_STRATEGY, + deleteOrcCompressionStrategy(), + orcCompressionStrategy()); + break; + + default: + // skip + } + + return writeProperties; + } + + private void setWritePropertyWithFallback( + Map writeProperties, String key, String value, String fallbackValue) { + if (value != null) { + writeProperties.put(key, value); + } else if (fallbackValue != null) { + writeProperties.put(key, fallbackValue); + } + } + + private String parquetCompressionCodec() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_CODEC) @@ -435,7 +541,16 @@ public String parquetCompressionCodec() { .parse(); } - public String parquetCompressionLevel() { + private String deleteParquetCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_PARQUET_COMPRESSION) + .parseOptional(); + } + + private String parquetCompressionLevel() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_LEVEL) @@ -445,7 +560,16 @@ public String parquetCompressionLevel() { .parseOptional(); } - public String avroCompressionCodec() { + private String deleteParquetCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL) + .parseOptional(); + } + + private String avroCompressionCodec() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_CODEC) @@ -455,7 +579,16 @@ public String avroCompressionCodec() { .parse(); } - public String avroCompressionLevel() { + private String deleteAvroCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_AVRO_COMPRESSION) + .parseOptional(); + } + + private String avroCompressionLevel() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_LEVEL) @@ -465,7 +598,16 @@ public String avroCompressionLevel() { .parseOptional(); } - public String orcCompressionCodec() { + private String deleteAvroCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL) + .parseOptional(); + } + + private String orcCompressionCodec() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_CODEC) @@ -475,7 +617,16 @@ public String orcCompressionCodec() { .parse(); } - public String orcCompressionStrategy() { + private String deleteOrcCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_ORC_COMPRESSION) + .parseOptional(); + } + + private String orcCompressionStrategy() { return confParser .stringConf() .option(SparkWriteOptions.COMPRESSION_STRATEGY) @@ -485,35 +636,12 @@ public String orcCompressionStrategy() { .parse(); } - public Map writeProperties(FileFormat format) { - Map writeProperties = Maps.newHashMap(); - - switch (format) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); - String parquetCompressionLevel = parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - break; - - case AVRO: - writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); - String avroCompressionLevel = avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel()); - } - break; - - case ORC: - writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); - break; - - default: - // skip - } - - return writeProperties; + private String deleteOrcCompressionStrategy() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_STRATEGY) + .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) + .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY) + .parseOptional(); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 152279daaf72..d0769eaa5f4e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -108,7 +108,7 @@ public class SparkPositionDeletesRewrite implements Write { this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; - this.writeProperties = writeConf.writeProperties(format); + this.writeProperties = writeConf.writeProperties(); } @Override @@ -221,6 +221,7 @@ public DataWriter createWriter(int partitionId, long taskId) { .deleteFileFormat(format) .positionDeleteRowSchema(positionDeleteRowSchema) .positionDeleteSparkType(deleteSparkType) + .writeProperties(writeProperties) .build(); SparkFileWriterFactory writerFactoryWithoutRow = SparkFileWriterFactory.builderFor(table) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index b80d28368543..9fea33948b3e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -127,7 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); - this.writeProperties = writeConf.writeProperties(context.dataFileFormat); + this.writeProperties = writeConf.writeProperties(); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index d4a4f22bfd49..15881098e7a3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -129,7 +129,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled(); this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); - this.writeProperties = writeConf.writeProperties(format); + this.writeProperties = writeConf.writeProperties(); } @Override diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 1dcdc1fc6595..a7c22b446813 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -18,21 +18,41 @@ */ package org.apache.iceberg.spark; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_RANGE; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_CODEC; +import static org.apache.iceberg.spark.SparkSQLProperties.COMPRESSION_LEVEL; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE; +import java.util.List; import java.util.Map; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -164,6 +184,259 @@ public void testSparkWriteConfDistributionModeWithEverything() { }); } + @Test + public void testSparkConfOverride() { + List>> propertiesSuites = + Lists.newArrayList( + Lists.newArrayList( + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "3"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "gzip", + TableProperties.DELETE_PARQUET_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "3", + DELETE_PARQUET_COMPRESSION_LEVEL, + "3")), + Lists.newArrayList( + ImmutableMap.of( + COMPRESSION_CODEC, + "zstd", + SparkSQLProperties.COMPRESSION_STRATEGY, + "compression"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "zlib", + DELETE_ORC_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression", + ORC_COMPRESSION_STRATEGY, + "compression")), + Lists.newArrayList( + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "9"), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "gzip", + DELETE_AVRO_COMPRESSION, + "snappy"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9"))); + for (List> propertiesSuite : propertiesSuites) { + testWriteProperties(propertiesSuite); + } + } + + @Test + public void testDataPropsDefaultsAsDeleteProps() { + List>> propertiesSuites = + Lists.newArrayList( + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "5")), + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "speed")), + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "9"))); + for (List> propertiesSuite : propertiesSuites) { + testWriteProperties(propertiesSuite); + } + } + + @Test + public void testDeleteFileWriteConf() { + List>> propertiesSuites = + Lists.newArrayList( + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "parquet", + DELETE_DEFAULT_FILE_FORMAT, + "parquet", + TableProperties.PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6"), + ImmutableMap.of( + DELETE_PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION, + "zstd", + PARQUET_COMPRESSION_LEVEL, + "5", + DELETE_PARQUET_COMPRESSION_LEVEL, + "6")), + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "orc", + DELETE_DEFAULT_FILE_FORMAT, + "orc", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION, + "zstd", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression"), + ImmutableMap.of( + DELETE_ORC_COMPRESSION, + "zstd", + ORC_COMPRESSION, + "snappy", + ORC_COMPRESSION_STRATEGY, + "speed", + DELETE_ORC_COMPRESSION_STRATEGY, + "compression")), + Lists.newArrayList( + ImmutableMap.of(), + ImmutableMap.of( + DEFAULT_FILE_FORMAT, + "avro", + DELETE_DEFAULT_FILE_FORMAT, + "avro", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION, + "zstd", + DELETE_AVRO_COMPRESSION_LEVEL, + "16"), + ImmutableMap.of( + DELETE_AVRO_COMPRESSION, + "zstd", + AVRO_COMPRESSION, + "snappy", + AVRO_COMPRESSION_LEVEL, + "9", + DELETE_AVRO_COMPRESSION_LEVEL, + "16"))); + for (List> propertiesSuite : propertiesSuites) { + testWriteProperties(propertiesSuite); + } + } + + private void testWriteProperties(List> propertiesSuite) { + withSQLConf( + propertiesSuite.get(0), + () -> { + Table table = validationCatalog.loadTable(tableIdent); + Map tableProperties = propertiesSuite.get(1); + UpdateProperties updateProperties = table.updateProperties(); + for (Map.Entry entry : tableProperties.entrySet()) { + updateProperties.set(entry.getKey(), entry.getValue()); + } + + updateProperties.commit(); + + Map writeOptions = ImmutableMap.of(); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + Map writeProperties = writeConf.writeProperties(); + Map expectedProperties = propertiesSuite.get(2); + Assert.assertEquals(expectedProperties.size(), writeConf.writeProperties().size()); + for (Map.Entry entry : writeProperties.entrySet()) { + Assert.assertEquals(entry.getValue(), expectedProperties.get(entry.getKey())); + } + + table.refresh(); + updateProperties = table.updateProperties(); + for (Map.Entry entry : tableProperties.entrySet()) { + updateProperties.remove(entry.getKey()); + } + + updateProperties.commit(); + }); + } + private void checkMode(DistributionMode expectedMode, SparkWriteConf writeConf) { Assert.assertEquals(expectedMode, writeConf.distributionMode()); Assert.assertEquals(expectedMode, writeConf.copyOnWriteDistributionMode(DELETE)); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index cc3aa9121b3a..760e735acee9 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -22,8 +22,11 @@ import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DELETE_MODE; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; @@ -132,6 +135,9 @@ public void testWriteDataWithDifferentSetting() throws Exception { tableProperties.put(PARQUET_COMPRESSION, "gzip"); tableProperties.put(AVRO_COMPRESSION, "gzip"); tableProperties.put(ORC_COMPRESSION, "zlib"); + tableProperties.put(DELETE_PARQUET_COMPRESSION, "gzip"); + tableProperties.put(DELETE_AVRO_COMPRESSION, "gzip"); + tableProperties.put(DELETE_ORC_COMPRESSION, "zlib"); tableProperties.put(DELETE_MODE, MERGE_ON_READ.modeName()); tableProperties.put(FORMAT_VERSION, "2"); sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", tableName, DEFAULT_FILE_FORMAT, format);