diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java b/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java
new file mode 100644
index 000000000000..b28c90df1753
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+public class CharSequenceUtil {
+
+ private CharSequenceUtil() {}
+
+ public static boolean unequalPaths(CharSequence s1, CharSequence s2) {
+ if (s1 == s2) {
+ return false;
+ }
+
+ int s1Length = s1.length();
+ int s2Length = s2.length();
+
+ if (s1Length != s2Length) {
+ return true;
+ }
+
+ for (int index = s1Length - 1; index >= 0; index--) {
+ if (s1.charAt(index) != s2.charAt(index)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 84188c0f7891..2267ba03fd7b 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
import java.util.Set;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
public class TableProperties {
@@ -334,6 +335,9 @@ private TableProperties() {}
public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms";
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE;
+ public static final String DELETE_GRANULARITY = "write.delete.granularity";
+ public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString();
+
public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level";
public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";
diff --git a/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java b/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java
new file mode 100644
index 000000000000..c225192fa121
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * An enum that represents the granularity of deletes.
+ *
+ *
Under partition granularity, delete writers are directed to group deletes for different data
+ * files into one delete file. This strategy tends to reduce the total number of delete files in the
+ * table. However, a scan for a single data file will require reading delete information for
+ * multiple data files even if those other files are not required for the scan. All irrelevant
+ * deletes will be discarded by readers but reading this extra information will cause overhead. The
+ * overhead can potentially be mitigated via delete file caching.
+ *
+ *
Under file granularity, delete writers always organize deletes by their target data file,
+ * creating separate delete files for each referenced data file. This strategy ensures the job
+ * planning does not assign irrelevant deletes to data files and readers only load necessary delete
+ * information. However, it also increases the total number of delete files in the table and may
+ * require a more aggressive approach for delete file compaction.
+ *
+ *
Currently, this configuration is only applicable to position deletes.
+ *
+ *
Each granularity has its own benefits and drawbacks and should be picked based on a use case.
+ * Regular delete compaction is still required regardless of which granularity is chosen. It is also
+ * possible to use one granularity for ingestion and another one for table maintenance.
+ */
+public enum DeleteGranularity {
+ FILE,
+ PARTITION;
+
+ @Override
+ public String toString() {
+ switch (this) {
+ case FILE:
+ return "file";
+ case PARTITION:
+ return "partition";
+ default:
+ throw new IllegalArgumentException("Unknown delete granularity: " + this);
+ }
+ }
+
+ public static DeleteGranularity fromString(String valueAsString) {
+ Preconditions.checkArgument(valueAsString != null, "Value is null");
+ if (FILE.toString().equalsIgnoreCase(valueAsString)) {
+ return FILE;
+ } else if (PARTITION.toString().equalsIgnoreCase(valueAsString)) {
+ return PARTITION;
+ } else {
+ throw new IllegalArgumentException("Unknown delete granularity: " + valueAsString);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java
new file mode 100644
index 000000000000..d85a5645fb37
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.CharSequenceUtil;
+
+/**
+ * A position delete writer that produces a separate delete file for each referenced data file.
+ *
+ *
This writer does not keep track of seen deletes and assumes all incoming records are ordered
+ * by file and position as required by the spec. If there is no external process to order the
+ * records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
+ */
+public class FileScopedPositionDeleteWriter
+ implements FileWriter, DeleteWriteResult> {
+
+ private final Supplier, DeleteWriteResult>> writers;
+ private final List deleteFiles;
+ private final CharSequenceSet referencedDataFiles;
+
+ private FileWriter, DeleteWriteResult> currentWriter = null;
+ private CharSequence currentPath = null;
+ private boolean closed = false;
+
+ public FileScopedPositionDeleteWriter(
+ Supplier, DeleteWriteResult>> writers) {
+ this.writers = writers;
+ this.deleteFiles = Lists.newArrayList();
+ this.referencedDataFiles = CharSequenceSet.empty();
+ }
+
+ @Override
+ public void write(PositionDelete positionDelete) {
+ writer(positionDelete.path()).write(positionDelete);
+ }
+
+ private FileWriter, DeleteWriteResult> writer(CharSequence path) {
+ if (currentWriter == null) {
+ openCurrentWriter(path);
+ } else if (CharSequenceUtil.unequalPaths(currentPath, path)) {
+ closeCurrentWriter();
+ openCurrentWriter(path);
+ }
+
+ return currentWriter;
+ }
+
+ @Override
+ public long length() {
+ throw new UnsupportedOperationException(getClass().getName() + " does not implement length");
+ }
+
+ @Override
+ public DeleteWriteResult result() {
+ Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closeCurrentWriter();
+ this.closed = true;
+ }
+ }
+
+ private void openCurrentWriter(CharSequence path) {
+ Preconditions.checkState(!closed, "Writer has already been closed");
+ this.currentWriter = writers.get();
+ this.currentPath = path;
+ }
+
+ private void closeCurrentWriter() {
+ if (currentWriter != null) {
+ try {
+ currentWriter.close();
+ DeleteWriteResult result = currentWriter.result();
+ deleteFiles.addAll(result.deleteFiles());
+ referencedDataFiles.addAll(result.referencedDataFiles());
+ this.currentWriter = null;
+ this.currentPath = null;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close current writer", e);
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
index 8819640e9f82..3fc6c5eec9d3 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
@@ -19,12 +19,17 @@
package org.apache.iceberg.deletes;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.CharSequenceSet;
import org.roaringbitmap.longlong.PeekableLongIterator;
import org.roaringbitmap.longlong.Roaring64Bitmap;
@@ -41,12 +46,20 @@
public class SortingPositionOnlyDeleteWriter
implements FileWriter, DeleteWriteResult> {
- private final FileWriter, DeleteWriteResult> writer;
+ private final Supplier, DeleteWriteResult>> writers;
+ private final DeleteGranularity granularity;
private final CharSequenceMap positionsByPath;
private DeleteWriteResult result = null;
public SortingPositionOnlyDeleteWriter(FileWriter, DeleteWriteResult> writer) {
- this.writer = writer;
+ this(() -> writer, DeleteGranularity.PARTITION);
+ }
+
+ public SortingPositionOnlyDeleteWriter(
+ Supplier, DeleteWriteResult>> writers,
+ DeleteGranularity granularity) {
+ this.writers = writers;
+ this.granularity = granularity;
this.positionsByPath = CharSequenceMap.create();
}
@@ -60,7 +73,7 @@ public void write(PositionDelete positionDelete) {
@Override
public long length() {
- return writer.length();
+ throw new UnsupportedOperationException(getClass().getName() + " does not implement length");
}
@Override
@@ -71,14 +84,44 @@ public DeleteWriteResult result() {
@Override
public void close() throws IOException {
if (result == null) {
- this.result = writeDeletes();
+ switch (granularity) {
+ case FILE:
+ this.result = writeFileDeletes();
+ return;
+ case PARTITION:
+ this.result = writePartitionDeletes();
+ return;
+ default:
+ throw new UnsupportedOperationException("Unsupported delete granularity: " + granularity);
+ }
+ }
+ }
+
+ // write deletes for all data files together
+ private DeleteWriteResult writePartitionDeletes() throws IOException {
+ return writeDeletes(positionsByPath.keySet());
+ }
+
+ // write deletes for different data files into distinct delete files
+ private DeleteWriteResult writeFileDeletes() throws IOException {
+ List deleteFiles = Lists.newArrayList();
+ CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
+
+ for (CharSequence path : positionsByPath.keySet()) {
+ DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path));
+ deleteFiles.addAll(writeResult.deleteFiles());
+ referencedDataFiles.addAll(writeResult.referencedDataFiles());
}
+
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles);
}
- private DeleteWriteResult writeDeletes() throws IOException {
+ private DeleteWriteResult writeDeletes(Collection paths) throws IOException {
+ FileWriter, DeleteWriteResult> writer = writers.get();
+
try {
PositionDelete positionDelete = PositionDelete.create();
- for (CharSequence path : sortedPaths()) {
+ for (CharSequence path : sort(paths)) {
// the iterator provides values in ascending sorted order
PeekableLongIterator positions = positionsByPath.get(path).getLongIterator();
while (positions.hasNext()) {
@@ -93,9 +136,13 @@ private DeleteWriteResult writeDeletes() throws IOException {
return writer.result();
}
- private List sortedPaths() {
- List paths = Lists.newArrayList(positionsByPath.keySet());
- paths.sort(Comparators.charSequences());
- return paths;
+ private Collection sort(Collection paths) {
+ if (paths.size() <= 1) {
+ return paths;
+ } else {
+ List sortedPaths = Lists.newArrayList(paths);
+ sortedPaths.sort(Comparators.charSequences());
+ return sortedPaths;
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
index c0c26c2b9086..c9d911894c77 100644
--- a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
@@ -22,6 +22,8 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.deletes.FileScopedPositionDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
@@ -38,6 +40,7 @@ public class ClusteredPositionDeleteWriter
private final OutputFileFactory fileFactory;
private final FileIO io;
private final long targetFileSizeInBytes;
+ private final DeleteGranularity granularity;
private final List deleteFiles;
private final CharSequenceSet referencedDataFiles;
@@ -46,10 +49,20 @@ public ClusteredPositionDeleteWriter(
OutputFileFactory fileFactory,
FileIO io,
long targetFileSizeInBytes) {
+ this(writerFactory, fileFactory, io, targetFileSizeInBytes, DeleteGranularity.PARTITION);
+ }
+
+ public ClusteredPositionDeleteWriter(
+ FileWriterFactory writerFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSizeInBytes,
+ DeleteGranularity granularity) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.granularity = granularity;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
}
@@ -57,6 +70,18 @@ public ClusteredPositionDeleteWriter(
@Override
protected FileWriter, DeleteWriteResult> newWriter(
PartitionSpec spec, StructLike partition) {
+ switch (granularity) {
+ case FILE:
+ return new FileScopedPositionDeleteWriter<>(() -> newRollingWriter(spec, partition));
+ case PARTITION:
+ return newRollingWriter(spec, partition);
+ default:
+ throw new UnsupportedOperationException("Unsupported delete granularity: " + granularity);
+ }
+ }
+
+ private RollingPositionDeleteWriter newRollingWriter(
+ PartitionSpec spec, StructLike partition) {
return new RollingPositionDeleteWriter<>(
writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
index 15c1cc6bb4ab..c6a55064b756 100644
--- a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
@@ -22,6 +22,7 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -41,6 +42,7 @@ public class FanoutPositionOnlyDeleteWriter
private final OutputFileFactory fileFactory;
private final FileIO io;
private final long targetFileSizeInBytes;
+ private final DeleteGranularity granularity;
private final List deleteFiles;
private final CharSequenceSet referencedDataFiles;
@@ -49,10 +51,20 @@ public FanoutPositionOnlyDeleteWriter(
OutputFileFactory fileFactory,
FileIO io,
long targetFileSizeInBytes) {
+ this(writerFactory, fileFactory, io, targetFileSizeInBytes, DeleteGranularity.PARTITION);
+ }
+
+ public FanoutPositionOnlyDeleteWriter(
+ FileWriterFactory writerFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSizeInBytes,
+ DeleteGranularity granularity) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.granularity = granularity;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
}
@@ -60,10 +72,11 @@ public FanoutPositionOnlyDeleteWriter(
@Override
protected FileWriter, DeleteWriteResult> newWriter(
PartitionSpec spec, StructLike partition) {
- FileWriter, DeleteWriteResult> delegate =
- new RollingPositionDeleteWriter<>(
- writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
- return new SortingPositionOnlyDeleteWriter<>(delegate);
+ return new SortingPositionOnlyDeleteWriter<>(
+ () ->
+ new RollingPositionDeleteWriter<>(
+ writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition),
+ granularity);
}
@Override
diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
index 32a8c8d2cc80..d76774326272 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.io;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -26,6 +28,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -278,11 +281,21 @@ public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() thro
}
@Test
- public void testClusteredPositionDeleteWriterNoRecords() throws IOException {
+ public void testClusteredPositionDeleteWriterNoRecordsPartitionGranularity() throws IOException {
+ checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterNoRecordsFileGranularity() throws IOException {
+ checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity.FILE);
+ }
+
+ private void checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity deleteGranularity)
+ throws IOException {
FileWriterFactory writerFactory = newWriterFactory(table.schema());
ClusteredPositionDeleteWriter writer =
new ClusteredPositionDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity);
writer.close();
Assert.assertEquals(0, writer.result().deleteFiles().size());
@@ -296,7 +309,18 @@ public void testClusteredPositionDeleteWriterNoRecords() throws IOException {
}
@Test
- public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException {
+ public void testClusteredPositionDeleteWriterMultipleSpecsPartitionGranularity()
+ throws IOException {
+ checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterMultipleSpecsFileGranularity() throws IOException {
+ checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity.FILE);
+ }
+
+ private void checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity deleteGranularity)
+ throws IOException {
FileWriterFactory writerFactory = newWriterFactory(table.schema());
// add an unpartitioned data file
@@ -330,7 +354,7 @@ public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException
ClusteredPositionDeleteWriter writer =
new ClusteredPositionDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity);
PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
@@ -364,7 +388,19 @@ public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException
}
@Test
- public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException {
+ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitionsPartitionGranularity()
+ throws IOException {
+ checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitionsFileGranularity()
+ throws IOException {
+ checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(DeleteGranularity.FILE);
+ }
+
+ private void checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(
+ DeleteGranularity deleteGranularity) throws IOException {
FileWriterFactory writerFactory = newWriterFactory(table.schema());
table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
@@ -377,7 +413,7 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() thro
ClusteredPositionDeleteWriter writer =
new ClusteredPositionDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity);
PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
@@ -419,6 +455,61 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() thro
writer.close();
}
+ @Test
+ public void testClusteredPositionDeleteWriterPartitionGranularity() throws IOException {
+ checkClusteredPositionDeleteWriterGranularity(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterFileGranularity() throws IOException {
+ checkClusteredPositionDeleteWriterGranularity(DeleteGranularity.FILE);
+ }
+
+ private void checkClusteredPositionDeleteWriterGranularity(DeleteGranularity deleteGranularity)
+ throws IOException {
+ FileWriterFactory writerFactory = newWriterFactory(table.schema());
+
+ // add the first data file
+ List rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(11, "aaa"));
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null);
+ table.newFastAppend().appendFile(dataFile1).commit();
+
+ // add the second data file
+ List rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"), toRow(12, "aaa"));
+ DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), null);
+ table.newFastAppend().appendFile(dataFile2).commit();
+
+ // init the delete writer
+ ClusteredPositionDeleteWriter writer =
+ new ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity);
+
+ PartitionSpec spec = table.spec();
+
+ // write deletes for both data files
+ writer.write(positionDelete(dataFile1.path(), 0L, null), spec, null);
+ writer.write(positionDelete(dataFile1.path(), 1L, null), spec, null);
+ writer.write(positionDelete(dataFile2.path(), 0L, null), spec, null);
+ writer.write(positionDelete(dataFile2.path(), 1L, null), spec, null);
+ writer.close();
+
+ // verify the writer result
+ DeleteWriteResult result = writer.result();
+ int expectedNumDeleteFiles = deleteGranularity == DeleteGranularity.FILE ? 2 : 1;
+ assertThat(result.deleteFiles()).hasSize(expectedNumDeleteFiles);
+ assertThat(result.referencedDataFiles()).hasSize(2);
+ assertThat(result.referencesDataFiles()).isTrue();
+
+ // commit the deletes
+ RowDelta rowDelta = table.newRowDelta();
+ result.deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+
+ // verify correctness
+ List expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, "aaa"));
+ assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
+ }
+
@Test
public void testFanoutDataWriterNoRecords() throws IOException {
FileWriterFactory writerFactory = newWriterFactory(table.schema());
@@ -464,11 +555,21 @@ public void testFanoutDataWriterMultiplePartitions() throws IOException {
}
@Test
- public void testFanoutPositionOnlyDeleteWriterNoRecords() throws IOException {
+ public void testFanoutPositionOnlyDeleteWriterNoRecordsPartitionGranularity() throws IOException {
+ checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testFanoutPositionOnlyDeleteWriterNoRecordsFileGranularity() throws IOException {
+ checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity.FILE);
+ }
+
+ private void checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity deleteGranularity)
+ throws IOException {
FileWriterFactory writerFactory = newWriterFactory(table.schema());
FanoutPositionOnlyDeleteWriter writer =
new FanoutPositionOnlyDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity);
writer.close();
Assert.assertEquals(0, writer.result().deleteFiles().size());
@@ -482,7 +583,19 @@ public void testFanoutPositionOnlyDeleteWriterNoRecords() throws IOException {
}
@Test
- public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws IOException {
+ public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecordsPartitionGranularity()
+ throws IOException {
+ checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecordsFileGranularity()
+ throws IOException {
+ checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(DeleteGranularity.FILE);
+ }
+
+ private void checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(
+ DeleteGranularity deleteGranularity) throws IOException {
FileWriterFactory writerFactory = newWriterFactory(table.schema());
// add an unpartitioned data file
@@ -516,7 +629,7 @@ public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws IOExcep
FanoutPositionOnlyDeleteWriter writer =
new FanoutPositionOnlyDeleteWriter<>(
- writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity);
PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
@@ -557,4 +670,59 @@ public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws IOExcep
List expectedRows = ImmutableList.of(toRow(12, "bbb"));
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}
+
+ @Test
+ public void testFanoutPositionOnlyDeleteWriterPartitionGranularity() throws IOException {
+ checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testFanoutPositionOnlyDeleteWriterFileGranularity() throws IOException {
+ checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity.FILE);
+ }
+
+ private void checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity deleteGranularity)
+ throws IOException {
+ FileWriterFactory writerFactory = newWriterFactory(table.schema());
+
+ // add the first data file
+ List rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(11, "aaa"));
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null);
+ table.newFastAppend().appendFile(dataFile1).commit();
+
+ // add the second data file
+ List rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"), toRow(12, "aaa"));
+ DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), null);
+ table.newFastAppend().appendFile(dataFile2).commit();
+
+ // init the delete writer
+ FanoutPositionOnlyDeleteWriter writer =
+ new FanoutPositionOnlyDeleteWriter<>(
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity);
+
+ PartitionSpec spec = table.spec();
+
+ // write deletes for both data files (the order of records is mixed)
+ writer.write(positionDelete(dataFile1.path(), 1L, null), spec, null);
+ writer.write(positionDelete(dataFile2.path(), 0L, null), spec, null);
+ writer.write(positionDelete(dataFile1.path(), 0L, null), spec, null);
+ writer.write(positionDelete(dataFile2.path(), 1L, null), spec, null);
+ writer.close();
+
+ // verify the writer result
+ DeleteWriteResult result = writer.result();
+ int expectedNumDeleteFiles = deleteGranularity == DeleteGranularity.FILE ? 2 : 1;
+ assertThat(result.deleteFiles()).hasSize(expectedNumDeleteFiles);
+ assertThat(result.referencedDataFiles()).hasSize(2);
+ assertThat(result.referencesDataFiles()).isTrue();
+
+ // commit the deletes
+ RowDelta rowDelta = table.newRowDelta();
+ result.deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+
+ // verify correctness
+ List expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, "aaa"));
+ assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
+ }
}
diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index 9c0e8235f842..01f24c4dfe04 100644
--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -28,14 +29,18 @@
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.TestSparkCatalog;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
@@ -80,6 +85,46 @@ public static void clearTestSparkCatalogCache() {
TestSparkCatalog.clearTables();
}
+ @Test
+ public void testDeleteFileGranularity() throws NoSuchTableException {
+ checkDeleteFileGranularity(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testDeletePartitionGranularity() throws NoSuchTableException {
+ checkDeleteFileGranularity(DeleteGranularity.PARTITION);
+ }
+
+ private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
+ throws NoSuchTableException {
+ createAndInitPartitionedTable();
+
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
+ tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
+
+ append(tableName, new Employee(1, "hr"), new Employee(2, "hr"));
+ append(tableName, new Employee(3, "hr"), new Employee(4, "hr"));
+ append(tableName, new Employee(1, "hardware"), new Employee(2, "hardware"));
+ append(tableName, new Employee(3, "hardware"), new Employee(4, "hardware"));
+
+ createBranchIfNeeded();
+
+ sql("DELETE FROM %s WHERE id = 1 OR id = 3", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.snapshots()).hasSize(5);
+
+ Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+ String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
+ validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, null);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(2, "hardware"), row(2, "hr"), row(4, "hardware"), row(4, "hr")),
+ sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
+ }
+
@Test
public void testCommitUnknownException() {
createAndInitTable("id INT, dep STRING, category STRING");
diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
index e743b32b45db..f9c13d828cd3 100644
--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
+++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
@@ -18,11 +18,20 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Encoders;
+import org.junit.Test;
public class TestMergeOnReadMerge extends TestMerge {
@@ -56,4 +65,52 @@ protected Map extraTableProperties() {
TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}
+
+ @Test
+ public void testMergeDeleteFileGranularity() {
+ checkMergeDeleteGranularity(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testMergeDeletePartitionGranularity() {
+ checkMergeDeleteGranularity(DeleteGranularity.PARTITION);
+ }
+
+ private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) {
+ createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);
+
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
+ tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
+
+ append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+ append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
+ append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
+ append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");
+
+ createBranchIfNeeded();
+
+ createOrReplaceView("source", ImmutableList.of(1, 3, 5), Encoders.INT());
+
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, dep) VALUES (-1, 'other')",
+ commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.snapshots()).hasSize(5);
+
+ Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+ String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
+ validateMergeOnRead(currentSnapshot, "3", expectedDeleteFilesCount, "1");
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(-1, "other"), row(2, "hr"), row(2, "it"), row(4, "hr"), row(4, "it")),
+ sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
+ }
}
diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index 0207d4ce4d51..45ef343b2dfe 100644
--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -18,11 +18,19 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.junit.Test;
public class TestMergeOnReadUpdate extends TestUpdate {
@@ -56,4 +64,51 @@ protected Map extraTableProperties() {
TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}
+
+ @Test
+ public void testUpdateFileGranularity() {
+ checkUpdateFileGranularity(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testUpdatePartitionGranularity() {
+ checkUpdateFileGranularity(DeleteGranularity.PARTITION);
+ }
+
+ private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
+ createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);
+
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
+ tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
+
+ append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
+ append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
+ append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
+ append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");
+
+ createBranchIfNeeded();
+
+ sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.snapshots()).hasSize(5);
+
+ Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+ String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
+ validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(0, "hr"),
+ row(2, "hr"),
+ row(2, "hr"),
+ row(4, "hr"),
+ row(0, "it"),
+ row(2, "it"),
+ row(2, "it"),
+ row(4, "it")),
+ sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
+ }
}
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 094fd5844398..71813c5a63a6 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -33,6 +33,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
@@ -364,8 +365,20 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole)
@Benchmark
@Threads(1)
- public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)
+ public void writeUnpartitionedClusteredPositionDeleteWriterPartitionGranularity(
+ Blackhole blackhole) throws IOException {
+ writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.PARTITION);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUnpartitionedClusteredPositionDeleteWriterFileGranularity(Blackhole blackhole)
throws IOException {
+ writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.FILE);
+ }
+
+ private void writeUnpartitionedClusteredPositionDeleteWriter(
+ Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {
FileIO io = table().io();
OutputFileFactory fileFactory = newFileFactory();
@@ -374,7 +387,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)
ClusteredPositionDeleteWriter writer =
new ClusteredPositionDeleteWriter<>(
- writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+ writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);
PositionDelete positionDelete = PositionDelete.create();
try (ClusteredPositionDeleteWriter closeableWriter = writer) {
@@ -391,7 +404,20 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)
@Benchmark
@Threads(1)
- public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) throws IOException {
+ public void writeUnpartitionedFanoutPositionDeleteWriterPartitionGranularity(Blackhole blackhole)
+ throws IOException {
+ writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.PARTITION);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUnpartitionedFanoutPositionDeleteWriterFileGranularity(Blackhole blackhole)
+ throws IOException {
+ writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.FILE);
+ }
+
+ private void writeUnpartitionedFanoutPositionDeleteWriterPartition(
+ Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {
FileIO io = table().io();
OutputFileFactory fileFactory = newFileFactory();
@@ -400,7 +426,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th
FanoutPositionOnlyDeleteWriter writer =
new FanoutPositionOnlyDeleteWriter<>(
- writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+ writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);
PositionDelete positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter closeableWriter = writer) {
@@ -417,8 +443,20 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th
@Benchmark
@Threads(1)
- public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole blackhole)
- throws IOException {
+ public void writeUnpartitionedFanoutPositionDeleteWriterShuffledPartitionGranularity(
+ Blackhole blackhole) throws IOException {
+ writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.PARTITION);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUnpartitionedFanoutPositionDeleteWriterShuffledFileGranularity(
+ Blackhole blackhole) throws IOException {
+ writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.FILE);
+ }
+
+ private void writeUnpartitionedFanoutPositionDeleteWriterShuffled(
+ Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {
FileIO io = table().io();
@@ -428,7 +466,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole black
FanoutPositionOnlyDeleteWriter writer =
new FanoutPositionOnlyDeleteWriter<>(
- writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+ writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);
PositionDelete positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter closeableWriter = writer) {
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 1454fc534e7d..07393a67fe31 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -43,6 +43,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -708,4 +709,15 @@ private long sparkAdvisoryPartitionSize() {
private double shuffleCompressionRatio(FileFormat outputFileFormat, String outputCodec) {
return SparkCompressionUtil.shuffleCompressionRatio(spark, outputFileFormat, outputCodec);
}
+
+ public DeleteGranularity deleteGranularity() {
+ String valueAsString =
+ confParser
+ .stringConf()
+ .option(SparkWriteOptions.DELETE_GRANULARITY)
+ .tableProperty(TableProperties.DELETE_GRANULARITY)
+ .defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT)
+ .parse();
+ return DeleteGranularity.fromString(valueAsString);
+ }
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 48dfa44c9122..d9c4f66b192b 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -83,4 +83,7 @@ private SparkWriteOptions() {}
// Overrides the advisory partition size
public static final String ADVISORY_PARTITION_SIZE = "advisory-partition-size";
+
+ // Overrides the delete granularity
+ public static final String DELETE_GRANULARITY = "delete-granularity";
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index a397a069ee1d..d91779475845 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -30,6 +30,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
import org.apache.iceberg.io.FileIO;
@@ -70,6 +71,7 @@ public class SparkPositionDeletesRewrite implements Write {
private final String queryId;
private final FileFormat format;
private final long targetFileSize;
+ private final DeleteGranularity deleteGranularity;
private final Schema writeSchema;
private final StructType dsSchema;
private final String fileSetId;
@@ -103,6 +105,7 @@ public class SparkPositionDeletesRewrite implements Write {
this.queryId = writeInfo.queryId();
this.format = writeConf.deleteFileFormat();
this.targetFileSize = writeConf.targetDeleteFileSize();
+ this.deleteGranularity = writeConf.deleteGranularity();
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.fileSetId = writeConf.rewrittenFileSetId();
@@ -129,6 +132,7 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
queryId,
format,
targetFileSize,
+ deleteGranularity,
writeSchema,
dsSchema,
specId,
@@ -179,6 +183,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
private final String queryId;
private final FileFormat format;
private final Long targetFileSize;
+ private final DeleteGranularity deleteGranularity;
private final Schema writeSchema;
private final StructType dsSchema;
private final int specId;
@@ -190,6 +195,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
String queryId,
FileFormat format,
long targetFileSize,
+ DeleteGranularity deleteGranularity,
Schema writeSchema,
StructType dsSchema,
int specId,
@@ -199,6 +205,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
this.queryId = queryId;
this.format = format;
this.targetFileSize = targetFileSize;
+ this.deleteGranularity = deleteGranularity;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.specId = specId;
@@ -241,6 +248,7 @@ public DataWriter createWriter(int partitionId, long taskId) {
writerFactoryWithoutRow,
deleteFileFactory,
targetFileSize,
+ deleteGranularity,
dsSchema,
specId,
partition);
@@ -289,6 +297,7 @@ private static class DeleteWriter implements DataWriter {
private final SparkFileWriterFactory writerFactoryWithoutRow;
private final OutputFileFactory deleteFileFactory;
private final long targetFileSize;
+ private final DeleteGranularity deleteGranularity;
private final PositionDelete positionDelete;
private final FileIO io;
private final PartitionSpec spec;
@@ -322,11 +331,13 @@ private static class DeleteWriter implements DataWriter {
SparkFileWriterFactory writerFactoryWithoutRow,
OutputFileFactory deleteFileFactory,
long targetFileSize,
+ DeleteGranularity deleteGranularity,
StructType dsSchema,
int specId,
StructLike partition) {
this.deleteFileFactory = deleteFileFactory;
this.targetFileSize = targetFileSize;
+ this.deleteGranularity = deleteGranularity;
this.writerFactoryWithRow = writerFactoryWithRow;
this.writerFactoryWithoutRow = writerFactoryWithoutRow;
this.positionDelete = PositionDelete.create();
@@ -387,7 +398,7 @@ private ClusteredPositionDeleteWriter lazyWriterWithRow() {
if (writerWithRow == null) {
this.writerWithRow =
new ClusteredPositionDeleteWriter<>(
- writerFactoryWithRow, deleteFileFactory, io, targetFileSize);
+ writerFactoryWithRow, deleteFileFactory, io, targetFileSize, deleteGranularity);
}
return writerWithRow;
}
@@ -396,7 +407,7 @@ private ClusteredPositionDeleteWriter lazyWriterWithoutRow() {
if (writerWithoutRow == null) {
this.writerWithoutRow =
new ClusteredPositionDeleteWriter<>(
- writerFactoryWithoutRow, deleteFileFactory, io, targetFileSize);
+ writerFactoryWithoutRow, deleteFileFactory, io, targetFileSize, deleteGranularity);
}
return writerWithoutRow;
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index dcc949b290af..022283631fd8 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -42,6 +42,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
@@ -441,11 +442,14 @@ protected PartitioningWriter, DeleteWriteResult> new
FileIO io = table.io();
boolean inputOrdered = context.inputOrdered();
long targetFileSize = context.targetDeleteFileSize();
+ DeleteGranularity deleteGranularity = context.deleteGranularity();
if (inputOrdered) {
- return new ClusteredPositionDeleteWriter<>(writers, files, io, targetFileSize);
+ return new ClusteredPositionDeleteWriter<>(
+ writers, files, io, targetFileSize, deleteGranularity);
} else {
- return new FanoutPositionOnlyDeleteWriter<>(writers, files, io, targetFileSize);
+ return new FanoutPositionOnlyDeleteWriter<>(
+ writers, files, io, targetFileSize, deleteGranularity);
}
}
}
@@ -684,6 +688,7 @@ private static class Context implements Serializable {
private final StructType metadataSparkType;
private final FileFormat deleteFileFormat;
private final long targetDeleteFileSize;
+ private final DeleteGranularity deleteGranularity;
private final String queryId;
private final boolean useFanoutWriter;
private final boolean inputOrdered;
@@ -700,6 +705,7 @@ private static class Context implements Serializable {
this.deleteSparkType = info.rowIdSchema().get();
this.deleteFileFormat = writeConf.deleteFileFormat();
this.targetDeleteFileSize = writeConf.targetDeleteFileSize();
+ this.deleteGranularity = writeConf.deleteGranularity();
this.metadataSparkType = info.metadataSchema().get();
this.queryId = info.queryId();
this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements);
@@ -734,6 +740,10 @@ long targetDeleteFileSize() {
return targetDeleteFileSize;
}
+ DeleteGranularity deleteGranularity() {
+ return deleteGranularity;
+ }
+
String queryId() {
return queryId;
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index abf40ebd953d..7c3ecac676ef 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -45,6 +45,7 @@
import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE;
import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.List;
import java.util.Map;
@@ -52,6 +53,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.internal.SQLConf;
@@ -76,6 +78,61 @@ public void after() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
+ @Test
+ public void testDeleteGranularityDefault() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
+
+ DeleteGranularity value = writeConf.deleteGranularity();
+ assertThat(value).isEqualTo(DeleteGranularity.PARTITION);
+ }
+
+ @Test
+ public void testDeleteGranularityTableProperty() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table
+ .updateProperties()
+ .set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.FILE.toString())
+ .commit();
+
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
+
+ DeleteGranularity value = writeConf.deleteGranularity();
+ assertThat(value).isEqualTo(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testDeleteGranularityWriteOption() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table
+ .updateProperties()
+ .set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.PARTITION.toString())
+ .commit();
+
+ Map options =
+ ImmutableMap.of(SparkWriteOptions.DELETE_GRANULARITY, DeleteGranularity.FILE.toString());
+
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table, options);
+
+ DeleteGranularity value = writeConf.deleteGranularity();
+ assertThat(value).isEqualTo(DeleteGranularity.FILE);
+ }
+
+ @Test
+ public void testDeleteGranularityInvalidValue() {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateProperties().set(TableProperties.DELETE_GRANULARITY, "invalid").commit();
+
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
+
+ assertThatThrownBy(writeConf::deleteGranularity)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unknown delete granularity");
+ }
+
@Test
public void testAdvisoryPartitionSize() {
Table table = validationCatalog.loadTable(tableIdent);
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index d1e33950ebe9..7c55ff82df1e 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -57,6 +57,7 @@
import org.apache.iceberg.actions.SizeBasedFileRewriter;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -131,6 +132,42 @@ public void testEmptyTable() {
assertThat(result.addedDeleteFilesCount()).as("No added delete files").isZero();
}
+ @TestTemplate
+ public void testFileGranularity() throws Exception {
+ checkDeleteGranularity(DeleteGranularity.FILE);
+ }
+
+ @TestTemplate
+ public void testPartitionGranularity() throws Exception {
+ checkDeleteGranularity(DeleteGranularity.PARTITION);
+ }
+
+ private void checkDeleteGranularity(DeleteGranularity deleteGranularity) throws Exception {
+ Table table = createTableUnpartitioned(2, SCALE);
+
+ table
+ .updateProperties()
+ .set(TableProperties.DELETE_GRANULARITY, deleteGranularity.toString())
+ .commit();
+
+ List dataFiles = TestHelpers.dataFiles(table);
+ assertThat(dataFiles).hasSize(2);
+
+ writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
+
+ List deleteFiles = deleteFiles(table);
+ assertThat(deleteFiles).hasSize(2);
+
+ Result result =
+ SparkActions.get(spark)
+ .rewritePositionDeletes(table)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .execute();
+
+ int expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? 2 : 1;
+ assertThat(result.addedDeleteFilesCount()).isEqualTo(expectedDeleteFilesCount);
+ }
+
@TestTemplate
public void testUnpartitioned() throws Exception {
Table table = createTableUnpartitioned(2, SCALE);