diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java index 9c0e8235f842..01f24c4dfe04 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -28,14 +29,18 @@ import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowDelta; import org.apache.iceberg.RowLevelOperationMode; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.TestSparkCatalog; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -80,6 +85,46 @@ public static void clearTestSparkCatalogCache() { TestSparkCatalog.clearTables(); } + @Test + public void testDeleteFileGranularity() throws NoSuchTableException { + checkDeleteFileGranularity(DeleteGranularity.FILE); + } + + @Test + public void testDeletePartitionGranularity() throws NoSuchTableException { + checkDeleteFileGranularity(DeleteGranularity.PARTITION); + } + + private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity) + throws NoSuchTableException { + createAndInitPartitionedTable(); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + + append(tableName, new Employee(1, "hr"), new Employee(2, "hr")); + append(tableName, new Employee(3, "hr"), new Employee(4, "hr")); + append(tableName, new Employee(1, "hardware"), new Employee(2, "hardware")); + append(tableName, new Employee(3, "hardware"), new Employee(4, "hardware")); + + createBranchIfNeeded(); + + sql("DELETE FROM %s WHERE id = 1 OR id = 3", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(table.snapshots()).hasSize(5); + + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2"; + validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, null); + + assertEquals( + "Should have expected rows", + ImmutableList.of(row(2, "hardware"), row(2, "hr"), row(4, "hardware"), row(4, "hr")), + sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget())); + } + @Test public void testCommitUnknownException() { createAndInitTable("id INT, dep STRING, category STRING"); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java index e743b32b45db..f9c13d828cd3 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java @@ -18,11 +18,20 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.Map; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowLevelOperationMode; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.DeleteGranularity; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Encoders; +import org.junit.Test; public class TestMergeOnReadMerge extends TestMerge { @@ -56,4 +65,52 @@ protected Map extraTableProperties() { TableProperties.MERGE_MODE, RowLevelOperationMode.MERGE_ON_READ.modeName()); } + + @Test + public void testMergeDeleteFileGranularity() { + checkMergeDeleteGranularity(DeleteGranularity.FILE); + } + + @Test + public void testMergeDeletePartitionGranularity() { + checkMergeDeleteGranularity(DeleteGranularity.PARTITION); + } + + private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) { + createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + + append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }"); + + createBranchIfNeeded(); + + createOrReplaceView("source", ImmutableList.of(1, 3, 5), Encoders.INT()); + + sql( + "MERGE INTO %s AS t USING source AS s " + + "ON t.id == s.value " + + "WHEN MATCHED THEN " + + " DELETE " + + "WHEN NOT MATCHED THEN " + + " INSERT (id, dep) VALUES (-1, 'other')", + commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(table.snapshots()).hasSize(5); + + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2"; + validateMergeOnRead(currentSnapshot, "3", expectedDeleteFilesCount, "1"); + + assertEquals( + "Should have expected rows", + ImmutableList.of(row(-1, "other"), row(2, "hr"), row(2, "it"), row(4, "hr"), row(4, "it")), + sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget())); + } } diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java index 0207d4ce4d51..45ef343b2dfe 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java @@ -18,11 +18,19 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.Map; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowLevelOperationMode; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.DeleteGranularity; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.SnapshotUtil; +import org.junit.Test; public class TestMergeOnReadUpdate extends TestUpdate { @@ -56,4 +64,51 @@ protected Map extraTableProperties() { TableProperties.UPDATE_MODE, RowLevelOperationMode.MERGE_ON_READ.modeName()); } + + @Test + public void testUpdateFileGranularity() { + checkUpdateFileGranularity(DeleteGranularity.FILE); + } + + @Test + public void testUpdatePartitionGranularity() { + checkUpdateFileGranularity(DeleteGranularity.PARTITION); + } + + private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) { + createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + + append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }"); + + createBranchIfNeeded(); + + sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(table.snapshots()).hasSize(5); + + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2"; + validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2"); + + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(0, "hr"), + row(2, "hr"), + row(2, "hr"), + row(4, "hr"), + row(0, "it"), + row(2, "it"), + row(2, "it"), + row(4, "it")), + sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); + } } diff --git a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java index 094fd5844398..71813c5a63a6 100644 --- a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java +++ b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java @@ -33,6 +33,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; @@ -364,8 +365,20 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) @Benchmark @Threads(1) - public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) + public void writeUnpartitionedClusteredPositionDeleteWriterPartitionGranularity( + Blackhole blackhole) throws IOException { + writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.PARTITION); + } + + @Benchmark + @Threads(1) + public void writeUnpartitionedClusteredPositionDeleteWriterFileGranularity(Blackhole blackhole) throws IOException { + writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.FILE); + } + + private void writeUnpartitionedClusteredPositionDeleteWriter( + Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException { FileIO io = table().io(); OutputFileFactory fileFactory = newFileFactory(); @@ -374,7 +387,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity); PositionDelete positionDelete = PositionDelete.create(); try (ClusteredPositionDeleteWriter closeableWriter = writer) { @@ -391,7 +404,20 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) @Benchmark @Threads(1) - public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) throws IOException { + public void writeUnpartitionedFanoutPositionDeleteWriterPartitionGranularity(Blackhole blackhole) + throws IOException { + writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.PARTITION); + } + + @Benchmark + @Threads(1) + public void writeUnpartitionedFanoutPositionDeleteWriterFileGranularity(Blackhole blackhole) + throws IOException { + writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.FILE); + } + + private void writeUnpartitionedFanoutPositionDeleteWriterPartition( + Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException { FileIO io = table().io(); OutputFileFactory fileFactory = newFileFactory(); @@ -400,7 +426,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th FanoutPositionOnlyDeleteWriter writer = new FanoutPositionOnlyDeleteWriter<>( - writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity); PositionDelete positionDelete = PositionDelete.create(); try (FanoutPositionOnlyDeleteWriter closeableWriter = writer) { @@ -417,8 +443,20 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th @Benchmark @Threads(1) - public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole blackhole) - throws IOException { + public void writeUnpartitionedFanoutPositionDeleteWriterShuffledPartitionGranularity( + Blackhole blackhole) throws IOException { + writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.PARTITION); + } + + @Benchmark + @Threads(1) + public void writeUnpartitionedFanoutPositionDeleteWriterShuffledFileGranularity( + Blackhole blackhole) throws IOException { + writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.FILE); + } + + private void writeUnpartitionedFanoutPositionDeleteWriterShuffled( + Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException { FileIO io = table().io(); @@ -428,7 +466,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole black FanoutPositionOnlyDeleteWriter writer = new FanoutPositionOnlyDeleteWriter<>( - writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity); PositionDelete positionDelete = PositionDelete.create(); try (FanoutPositionOnlyDeleteWriter closeableWriter = writer) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index df3e2051f771..824e3aca9a86 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -42,6 +42,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -644,4 +645,15 @@ private String deleteOrcCompressionStrategy() { .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY) .parseOptional(); } + + public DeleteGranularity deleteGranularity() { + String valueAsString = + confParser + .stringConf() + .option(SparkWriteOptions.DELETE_GRANULARITY) + .tableProperty(TableProperties.DELETE_GRANULARITY) + .defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT) + .parse(); + return DeleteGranularity.fromString(valueAsString); + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index f8cb2c5a1942..391cb6bae3bf 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -85,4 +85,7 @@ private SparkWriteOptions() {} public static final String COMPRESSION_CODEC = "compression-codec"; public static final String COMPRESSION_LEVEL = "compression-level"; public static final String COMPRESSION_STRATEGY = "compression-strategy"; + + // Overrides the delete granularity + public static final String DELETE_GRANULARITY = "delete-granularity"; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index a397a069ee1d..08c06e85b576 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -30,6 +30,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.io.ClusteredPositionDeleteWriter; import org.apache.iceberg.io.FileIO; @@ -70,6 +71,7 @@ public class SparkPositionDeletesRewrite implements Write { private final String queryId; private final FileFormat format; private final long targetFileSize; + private final DeleteGranularity deleteGranularity; private final Schema writeSchema; private final StructType dsSchema; private final String fileSetId; @@ -103,6 +105,7 @@ public class SparkPositionDeletesRewrite implements Write { this.queryId = writeInfo.queryId(); this.format = writeConf.deleteFileFormat(); this.targetFileSize = writeConf.targetDeleteFileSize(); + this.deleteGranularity = writeConf.deleteGranularity(); this.writeSchema = writeSchema; this.dsSchema = dsSchema; this.fileSetId = writeConf.rewrittenFileSetId(); @@ -129,6 +132,7 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { queryId, format, targetFileSize, + deleteGranularity, writeSchema, dsSchema, specId, @@ -179,6 +183,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final String queryId; private final FileFormat format; private final Long targetFileSize; + private final DeleteGranularity deleteGranularity; private final Schema writeSchema; private final StructType dsSchema; private final int specId; @@ -190,6 +195,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { String queryId, FileFormat format, long targetFileSize, + DeleteGranularity deleteGranularity, Schema writeSchema, StructType dsSchema, int specId, @@ -199,6 +205,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.queryId = queryId; this.format = format; this.targetFileSize = targetFileSize; + this.deleteGranularity = deleteGranularity; this.writeSchema = writeSchema; this.dsSchema = dsSchema; this.specId = specId; @@ -241,6 +248,7 @@ public DataWriter createWriter(int partitionId, long taskId) { writerFactoryWithoutRow, deleteFileFactory, targetFileSize, + deleteGranularity, dsSchema, specId, partition); @@ -289,6 +297,7 @@ private static class DeleteWriter implements DataWriter { private final SparkFileWriterFactory writerFactoryWithoutRow; private final OutputFileFactory deleteFileFactory; private final long targetFileSize; + private final DeleteGranularity deleteGranularity; private final PositionDelete positionDelete; private final FileIO io; private final PartitionSpec spec; @@ -310,6 +319,7 @@ private static class DeleteWriter implements DataWriter { * @param writerFactoryWithoutRow writer factory for deletes with null 'row' * @param deleteFileFactory delete file factory * @param targetFileSize target file size + * @param deleteGranularity delete granularity * @param dsSchema schema of incoming dataset of position deletes * @param specId partition spec id of incoming position deletes. All incoming partition deletes * are required to have the same spec id. @@ -322,11 +332,13 @@ private static class DeleteWriter implements DataWriter { SparkFileWriterFactory writerFactoryWithoutRow, OutputFileFactory deleteFileFactory, long targetFileSize, + DeleteGranularity deleteGranularity, StructType dsSchema, int specId, StructLike partition) { this.deleteFileFactory = deleteFileFactory; this.targetFileSize = targetFileSize; + this.deleteGranularity = deleteGranularity; this.writerFactoryWithRow = writerFactoryWithRow; this.writerFactoryWithoutRow = writerFactoryWithoutRow; this.positionDelete = PositionDelete.create(); @@ -387,7 +399,7 @@ private ClusteredPositionDeleteWriter lazyWriterWithRow() { if (writerWithRow == null) { this.writerWithRow = new ClusteredPositionDeleteWriter<>( - writerFactoryWithRow, deleteFileFactory, io, targetFileSize); + writerFactoryWithRow, deleteFileFactory, io, targetFileSize, deleteGranularity); } return writerWithRow; } @@ -396,7 +408,7 @@ private ClusteredPositionDeleteWriter lazyWriterWithoutRow() { if (writerWithoutRow == null) { this.writerWithoutRow = new ClusteredPositionDeleteWriter<>( - writerFactoryWithoutRow, deleteFileFactory, io, targetFileSize); + writerFactoryWithoutRow, deleteFileFactory, io, targetFileSize, deleteGranularity); } return writerWithoutRow; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 791cbd8462a4..5c6243bbb06f 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -42,6 +42,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.expressions.Expression; @@ -431,11 +432,14 @@ protected PartitioningWriter, DeleteWriteResult> new FileIO io = table.io(); boolean inputOrdered = context.inputOrdered(); long targetFileSize = context.targetDeleteFileSize(); + DeleteGranularity deleteGranularity = context.deleteGranularity(); if (inputOrdered) { - return new ClusteredPositionDeleteWriter<>(writers, files, io, targetFileSize); + return new ClusteredPositionDeleteWriter<>( + writers, files, io, targetFileSize, deleteGranularity); } else { - return new FanoutPositionOnlyDeleteWriter<>(writers, files, io, targetFileSize); + return new FanoutPositionOnlyDeleteWriter<>( + writers, files, io, targetFileSize, deleteGranularity); } } } @@ -674,6 +678,7 @@ private static class Context implements Serializable { private final StructType metadataSparkType; private final FileFormat deleteFileFormat; private final long targetDeleteFileSize; + private final DeleteGranularity deleteGranularity; private final String queryId; private final boolean fanoutWriterEnabled; private final boolean inputOrdered; @@ -690,6 +695,7 @@ private static class Context implements Serializable { this.deleteSparkType = info.rowIdSchema().get(); this.deleteFileFormat = writeConf.deleteFileFormat(); this.targetDeleteFileSize = writeConf.targetDeleteFileSize(); + this.deleteGranularity = writeConf.deleteGranularity(); this.metadataSparkType = info.metadataSchema().get(); this.queryId = info.queryId(); this.fanoutWriterEnabled = writeConf.fanoutWriterEnabled(); @@ -724,6 +730,10 @@ long targetDeleteFileSize() { return targetDeleteFileSize; } + DeleteGranularity deleteGranularity() { + return deleteGranularity; + } + String queryId() { return queryId; } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index b7af797d149c..63d037bd7362 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -44,6 +44,8 @@ import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.After; @@ -74,6 +77,61 @@ public void after() { sql("DROP TABLE IF EXISTS %s", tableName); } + @Test + public void testDeleteGranularityDefault() { + Table table = validationCatalog.loadTable(tableIdent); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + DeleteGranularity value = writeConf.deleteGranularity(); + assertThat(value).isEqualTo(DeleteGranularity.PARTITION); + } + + @Test + public void testDeleteGranularityTableProperty() { + Table table = validationCatalog.loadTable(tableIdent); + + table + .updateProperties() + .set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.FILE.toString()) + .commit(); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + DeleteGranularity value = writeConf.deleteGranularity(); + assertThat(value).isEqualTo(DeleteGranularity.FILE); + } + + @Test + public void testDeleteGranularityWriteOption() { + Table table = validationCatalog.loadTable(tableIdent); + + table + .updateProperties() + .set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.PARTITION.toString()) + .commit(); + + Map options = + ImmutableMap.of(SparkWriteOptions.DELETE_GRANULARITY, DeleteGranularity.FILE.toString()); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, options); + + DeleteGranularity value = writeConf.deleteGranularity(); + assertThat(value).isEqualTo(DeleteGranularity.FILE); + } + + @Test + public void testDeleteGranularityInvalidValue() { + Table table = validationCatalog.loadTable(tableIdent); + + table.updateProperties().set(TableProperties.DELETE_GRANULARITY, "invalid").commit(); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + assertThatThrownBy(writeConf::deleteGranularity) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unknown delete granularity"); + } + @Test public void testSparkWriteConfDistributionDefault() { Table table = validationCatalog.loadTable(tableIdent); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index 9149bb7652dc..2c5981dc7e45 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -51,6 +51,7 @@ import org.apache.iceberg.actions.SizeBasedFileRewriter; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -135,6 +136,42 @@ public void testEmptyTable() { Assert.assertEquals("No added delete files", 0, result.addedDeleteFilesCount()); } + @Test + public void testFileGranularity() throws Exception { + checkDeleteGranularity(DeleteGranularity.FILE); + } + + @Test + public void testPartitionGranularity() throws Exception { + checkDeleteGranularity(DeleteGranularity.PARTITION); + } + + private void checkDeleteGranularity(DeleteGranularity deleteGranularity) throws Exception { + Table table = createTableUnpartitioned(2, SCALE); + + table + .updateProperties() + .set(TableProperties.DELETE_GRANULARITY, deleteGranularity.toString()) + .commit(); + + List dataFiles = TestHelpers.dataFiles(table); + assertThat(dataFiles).hasSize(2); + + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + + List deleteFiles = deleteFiles(table); + assertThat(deleteFiles).hasSize(2); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + + int expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? 2 : 1; + assertThat(result.addedDeleteFilesCount()).isEqualTo(expectedDeleteFilesCount); + } + @Test public void testUnpartitioned() throws Exception { Table table = createTableUnpartitioned(2, SCALE);