diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java b/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java new file mode 100644 index 000000000000..b28c90df1753 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +public class CharSequenceUtil { + + private CharSequenceUtil() {} + + public static boolean unequalPaths(CharSequence s1, CharSequence s2) { + if (s1 == s2) { + return false; + } + + int s1Length = s1.length(); + int s2Length = s2.length(); + + if (s1Length != s2Length) { + return true; + } + + for (int index = s1Length - 1; index >= 0; index--) { + if (s1.charAt(index) != s2.charAt(index)) { + return true; + } + } + + return false; + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 84188c0f7891..2267ba03fd7b 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.Set; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; public class TableProperties { @@ -334,6 +335,9 @@ private TableProperties() {} public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms"; public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE; + public static final String DELETE_GRANULARITY = "write.delete.granularity"; + public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString(); + public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level"; public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable"; diff --git a/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java b/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java new file mode 100644 index 000000000000..c225192fa121 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * An enum that represents the granularity of deletes. + * + *

Under partition granularity, delete writers are directed to group deletes for different data + * files into one delete file. This strategy tends to reduce the total number of delete files in the + * table. However, a scan for a single data file will require reading delete information for + * multiple data files even if those other files are not required for the scan. All irrelevant + * deletes will be discarded by readers but reading this extra information will cause overhead. The + * overhead can potentially be mitigated via delete file caching. + * + *

Under file granularity, delete writers always organize deletes by their target data file, + * creating separate delete files for each referenced data file. This strategy ensures the job + * planning does not assign irrelevant deletes to data files and readers only load necessary delete + * information. However, it also increases the total number of delete files in the table and may + * require a more aggressive approach for delete file compaction. + * + *

Currently, this configuration is only applicable to position deletes. + * + *

Each granularity has its own benefits and drawbacks and should be picked based on a use case. + * Regular delete compaction is still required regardless of which granularity is chosen. It is also + * possible to use one granularity for ingestion and another one for table maintenance. + */ +public enum DeleteGranularity { + FILE, + PARTITION; + + @Override + public String toString() { + switch (this) { + case FILE: + return "file"; + case PARTITION: + return "partition"; + default: + throw new IllegalArgumentException("Unknown delete granularity: " + this); + } + } + + public static DeleteGranularity fromString(String valueAsString) { + Preconditions.checkArgument(valueAsString != null, "Value is null"); + if (FILE.toString().equalsIgnoreCase(valueAsString)) { + return FILE; + } else if (PARTITION.toString().equalsIgnoreCase(valueAsString)) { + return PARTITION; + } else { + throw new IllegalArgumentException("Unknown delete granularity: " + valueAsString); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java new file mode 100644 index 000000000000..d85a5645fb37 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.function.Supplier; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.CharSequenceUtil; + +/** + * A position delete writer that produces a separate delete file for each referenced data file. + * + *

This writer does not keep track of seen deletes and assumes all incoming records are ordered + * by file and position as required by the spec. If there is no external process to order the + * records, consider using {@link SortingPositionOnlyDeleteWriter} instead. + */ +public class FileScopedPositionDeleteWriter + implements FileWriter, DeleteWriteResult> { + + private final Supplier, DeleteWriteResult>> writers; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + private FileWriter, DeleteWriteResult> currentWriter = null; + private CharSequence currentPath = null; + private boolean closed = false; + + public FileScopedPositionDeleteWriter( + Supplier, DeleteWriteResult>> writers) { + this.writers = writers; + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + public void write(PositionDelete positionDelete) { + writer(positionDelete.path()).write(positionDelete); + } + + private FileWriter, DeleteWriteResult> writer(CharSequence path) { + if (currentWriter == null) { + openCurrentWriter(path); + } else if (CharSequenceUtil.unequalPaths(currentPath, path)) { + closeCurrentWriter(); + openCurrentWriter(path); + } + + return currentWriter; + } + + @Override + public long length() { + throw new UnsupportedOperationException(getClass().getName() + " does not implement length"); + } + + @Override + public DeleteWriteResult result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } + + @Override + public void close() throws IOException { + if (!closed) { + closeCurrentWriter(); + this.closed = true; + } + } + + private void openCurrentWriter(CharSequence path) { + Preconditions.checkState(!closed, "Writer has already been closed"); + this.currentWriter = writers.get(); + this.currentPath = path; + } + + private void closeCurrentWriter() { + if (currentWriter != null) { + try { + currentWriter.close(); + DeleteWriteResult result = currentWriter.result(); + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + this.currentWriter = null; + this.currentPath = null; + } catch (IOException e) { + throw new UncheckedIOException("Failed to close current writer", e); + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java index 8819640e9f82..3fc6c5eec9d3 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java @@ -19,12 +19,17 @@ package org.apache.iceberg.deletes; import java.io.IOException; +import java.util.Collection; import java.util.List; +import java.util.function.Supplier; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceMap; +import org.apache.iceberg.util.CharSequenceSet; import org.roaringbitmap.longlong.PeekableLongIterator; import org.roaringbitmap.longlong.Roaring64Bitmap; @@ -41,12 +46,20 @@ public class SortingPositionOnlyDeleteWriter implements FileWriter, DeleteWriteResult> { - private final FileWriter, DeleteWriteResult> writer; + private final Supplier, DeleteWriteResult>> writers; + private final DeleteGranularity granularity; private final CharSequenceMap positionsByPath; private DeleteWriteResult result = null; public SortingPositionOnlyDeleteWriter(FileWriter, DeleteWriteResult> writer) { - this.writer = writer; + this(() -> writer, DeleteGranularity.PARTITION); + } + + public SortingPositionOnlyDeleteWriter( + Supplier, DeleteWriteResult>> writers, + DeleteGranularity granularity) { + this.writers = writers; + this.granularity = granularity; this.positionsByPath = CharSequenceMap.create(); } @@ -60,7 +73,7 @@ public void write(PositionDelete positionDelete) { @Override public long length() { - return writer.length(); + throw new UnsupportedOperationException(getClass().getName() + " does not implement length"); } @Override @@ -71,14 +84,44 @@ public DeleteWriteResult result() { @Override public void close() throws IOException { if (result == null) { - this.result = writeDeletes(); + switch (granularity) { + case FILE: + this.result = writeFileDeletes(); + return; + case PARTITION: + this.result = writePartitionDeletes(); + return; + default: + throw new UnsupportedOperationException("Unsupported delete granularity: " + granularity); + } + } + } + + // write deletes for all data files together + private DeleteWriteResult writePartitionDeletes() throws IOException { + return writeDeletes(positionsByPath.keySet()); + } + + // write deletes for different data files into distinct delete files + private DeleteWriteResult writeFileDeletes() throws IOException { + List deleteFiles = Lists.newArrayList(); + CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); + + for (CharSequence path : positionsByPath.keySet()) { + DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path)); + deleteFiles.addAll(writeResult.deleteFiles()); + referencedDataFiles.addAll(writeResult.referencedDataFiles()); } + + return new DeleteWriteResult(deleteFiles, referencedDataFiles); } - private DeleteWriteResult writeDeletes() throws IOException { + private DeleteWriteResult writeDeletes(Collection paths) throws IOException { + FileWriter, DeleteWriteResult> writer = writers.get(); + try { PositionDelete positionDelete = PositionDelete.create(); - for (CharSequence path : sortedPaths()) { + for (CharSequence path : sort(paths)) { // the iterator provides values in ascending sorted order PeekableLongIterator positions = positionsByPath.get(path).getLongIterator(); while (positions.hasNext()) { @@ -93,9 +136,13 @@ private DeleteWriteResult writeDeletes() throws IOException { return writer.result(); } - private List sortedPaths() { - List paths = Lists.newArrayList(positionsByPath.keySet()); - paths.sort(Comparators.charSequences()); - return paths; + private Collection sort(Collection paths) { + if (paths.size() <= 1) { + return paths; + } else { + List sortedPaths = Lists.newArrayList(paths); + sortedPaths.sort(Comparators.charSequences()); + return sortedPaths; + } } } diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java index c0c26c2b9086..c9d911894c77 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java @@ -22,6 +22,8 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.DeleteGranularity; +import org.apache.iceberg.deletes.FileScopedPositionDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.CharSequenceSet; @@ -38,6 +40,7 @@ public class ClusteredPositionDeleteWriter private final OutputFileFactory fileFactory; private final FileIO io; private final long targetFileSizeInBytes; + private final DeleteGranularity granularity; private final List deleteFiles; private final CharSequenceSet referencedDataFiles; @@ -46,10 +49,20 @@ public ClusteredPositionDeleteWriter( OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes) { + this(writerFactory, fileFactory, io, targetFileSizeInBytes, DeleteGranularity.PARTITION); + } + + public ClusteredPositionDeleteWriter( + FileWriterFactory writerFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSizeInBytes, + DeleteGranularity granularity) { this.writerFactory = writerFactory; this.fileFactory = fileFactory; this.io = io; this.targetFileSizeInBytes = targetFileSizeInBytes; + this.granularity = granularity; this.deleteFiles = Lists.newArrayList(); this.referencedDataFiles = CharSequenceSet.empty(); } @@ -57,6 +70,18 @@ public ClusteredPositionDeleteWriter( @Override protected FileWriter, DeleteWriteResult> newWriter( PartitionSpec spec, StructLike partition) { + switch (granularity) { + case FILE: + return new FileScopedPositionDeleteWriter<>(() -> newRollingWriter(spec, partition)); + case PARTITION: + return newRollingWriter(spec, partition); + default: + throw new UnsupportedOperationException("Unsupported delete granularity: " + granularity); + } + } + + private RollingPositionDeleteWriter newRollingWriter( + PartitionSpec spec, StructLike partition) { return new RollingPositionDeleteWriter<>( writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); } diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java index 15c1cc6bb4ab..c6a55064b756 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java @@ -22,6 +22,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -41,6 +42,7 @@ public class FanoutPositionOnlyDeleteWriter private final OutputFileFactory fileFactory; private final FileIO io; private final long targetFileSizeInBytes; + private final DeleteGranularity granularity; private final List deleteFiles; private final CharSequenceSet referencedDataFiles; @@ -49,10 +51,20 @@ public FanoutPositionOnlyDeleteWriter( OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes) { + this(writerFactory, fileFactory, io, targetFileSizeInBytes, DeleteGranularity.PARTITION); + } + + public FanoutPositionOnlyDeleteWriter( + FileWriterFactory writerFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSizeInBytes, + DeleteGranularity granularity) { this.writerFactory = writerFactory; this.fileFactory = fileFactory; this.io = io; this.targetFileSizeInBytes = targetFileSizeInBytes; + this.granularity = granularity; this.deleteFiles = Lists.newArrayList(); this.referencedDataFiles = CharSequenceSet.empty(); } @@ -60,10 +72,11 @@ public FanoutPositionOnlyDeleteWriter( @Override protected FileWriter, DeleteWriteResult> newWriter( PartitionSpec spec, StructLike partition) { - FileWriter, DeleteWriteResult> delegate = - new RollingPositionDeleteWriter<>( - writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); - return new SortingPositionOnlyDeleteWriter<>(delegate); + return new SortingPositionOnlyDeleteWriter<>( + () -> + new RollingPositionDeleteWriter<>( + writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition), + granularity); } @Override diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index 32a8c8d2cc80..d76774326272 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.io; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.util.List; @@ -26,6 +28,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -278,11 +281,21 @@ public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() thro } @Test - public void testClusteredPositionDeleteWriterNoRecords() throws IOException { + public void testClusteredPositionDeleteWriterNoRecordsPartitionGranularity() throws IOException { + checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity.PARTITION); + } + + @Test + public void testClusteredPositionDeleteWriterNoRecordsFileGranularity() throws IOException { + checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity.FILE); + } + + private void checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity deleteGranularity) + throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity); writer.close(); Assert.assertEquals(0, writer.result().deleteFiles().size()); @@ -296,7 +309,18 @@ public void testClusteredPositionDeleteWriterNoRecords() throws IOException { } @Test - public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException { + public void testClusteredPositionDeleteWriterMultipleSpecsPartitionGranularity() + throws IOException { + checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity.PARTITION); + } + + @Test + public void testClusteredPositionDeleteWriterMultipleSpecsFileGranularity() throws IOException { + checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity.FILE); + } + + private void checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity deleteGranularity) + throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); // add an unpartitioned data file @@ -330,7 +354,7 @@ public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity); PartitionSpec unpartitionedSpec = table.specs().get(0); PartitionSpec bucketSpec = table.specs().get(1); @@ -364,7 +388,19 @@ public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException } @Test - public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException { + public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitionsPartitionGranularity() + throws IOException { + checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(DeleteGranularity.PARTITION); + } + + @Test + public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitionsFileGranularity() + throws IOException { + checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(DeleteGranularity.FILE); + } + + private void checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions( + DeleteGranularity deleteGranularity) throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); table.updateSpec().addField(Expressions.bucket("data", 16)).commit(); @@ -377,7 +413,7 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() thro ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity); PartitionSpec unpartitionedSpec = table.specs().get(0); PartitionSpec bucketSpec = table.specs().get(1); @@ -419,6 +455,61 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() thro writer.close(); } + @Test + public void testClusteredPositionDeleteWriterPartitionGranularity() throws IOException { + checkClusteredPositionDeleteWriterGranularity(DeleteGranularity.PARTITION); + } + + @Test + public void testClusteredPositionDeleteWriterFileGranularity() throws IOException { + checkClusteredPositionDeleteWriterGranularity(DeleteGranularity.FILE); + } + + private void checkClusteredPositionDeleteWriterGranularity(DeleteGranularity deleteGranularity) + throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + // add the first data file + List rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(11, "aaa")); + DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null); + table.newFastAppend().appendFile(dataFile1).commit(); + + // add the second data file + List rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"), toRow(12, "aaa")); + DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), null); + table.newFastAppend().appendFile(dataFile2).commit(); + + // init the delete writer + ClusteredPositionDeleteWriter writer = + new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity); + + PartitionSpec spec = table.spec(); + + // write deletes for both data files + writer.write(positionDelete(dataFile1.path(), 0L, null), spec, null); + writer.write(positionDelete(dataFile1.path(), 1L, null), spec, null); + writer.write(positionDelete(dataFile2.path(), 0L, null), spec, null); + writer.write(positionDelete(dataFile2.path(), 1L, null), spec, null); + writer.close(); + + // verify the writer result + DeleteWriteResult result = writer.result(); + int expectedNumDeleteFiles = deleteGranularity == DeleteGranularity.FILE ? 2 : 1; + assertThat(result.deleteFiles()).hasSize(expectedNumDeleteFiles); + assertThat(result.referencedDataFiles()).hasSize(2); + assertThat(result.referencesDataFiles()).isTrue(); + + // commit the deletes + RowDelta rowDelta = table.newRowDelta(); + result.deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.commit(); + + // verify correctness + List expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, "aaa")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); + } + @Test public void testFanoutDataWriterNoRecords() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); @@ -464,11 +555,21 @@ public void testFanoutDataWriterMultiplePartitions() throws IOException { } @Test - public void testFanoutPositionOnlyDeleteWriterNoRecords() throws IOException { + public void testFanoutPositionOnlyDeleteWriterNoRecordsPartitionGranularity() throws IOException { + checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity.PARTITION); + } + + @Test + public void testFanoutPositionOnlyDeleteWriterNoRecordsFileGranularity() throws IOException { + checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity.FILE); + } + + private void checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity deleteGranularity) + throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); FanoutPositionOnlyDeleteWriter writer = new FanoutPositionOnlyDeleteWriter<>( - writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity); writer.close(); Assert.assertEquals(0, writer.result().deleteFiles().size()); @@ -482,7 +583,19 @@ public void testFanoutPositionOnlyDeleteWriterNoRecords() throws IOException { } @Test - public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws IOException { + public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecordsPartitionGranularity() + throws IOException { + checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(DeleteGranularity.PARTITION); + } + + @Test + public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecordsFileGranularity() + throws IOException { + checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(DeleteGranularity.FILE); + } + + private void checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords( + DeleteGranularity deleteGranularity) throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); // add an unpartitioned data file @@ -516,7 +629,7 @@ public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws IOExcep FanoutPositionOnlyDeleteWriter writer = new FanoutPositionOnlyDeleteWriter<>( - writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity); PartitionSpec unpartitionedSpec = table.specs().get(0); PartitionSpec bucketSpec = table.specs().get(1); @@ -557,4 +670,59 @@ public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws IOExcep List expectedRows = ImmutableList.of(toRow(12, "bbb")); Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); } + + @Test + public void testFanoutPositionOnlyDeleteWriterPartitionGranularity() throws IOException { + checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity.PARTITION); + } + + @Test + public void testFanoutPositionOnlyDeleteWriterFileGranularity() throws IOException { + checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity.FILE); + } + + private void checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity deleteGranularity) + throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + // add the first data file + List rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(11, "aaa")); + DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null); + table.newFastAppend().appendFile(dataFile1).commit(); + + // add the second data file + List rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"), toRow(12, "aaa")); + DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), null); + table.newFastAppend().appendFile(dataFile2).commit(); + + // init the delete writer + FanoutPositionOnlyDeleteWriter writer = + new FanoutPositionOnlyDeleteWriter<>( + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, deleteGranularity); + + PartitionSpec spec = table.spec(); + + // write deletes for both data files (the order of records is mixed) + writer.write(positionDelete(dataFile1.path(), 1L, null), spec, null); + writer.write(positionDelete(dataFile2.path(), 0L, null), spec, null); + writer.write(positionDelete(dataFile1.path(), 0L, null), spec, null); + writer.write(positionDelete(dataFile2.path(), 1L, null), spec, null); + writer.close(); + + // verify the writer result + DeleteWriteResult result = writer.result(); + int expectedNumDeleteFiles = deleteGranularity == DeleteGranularity.FILE ? 2 : 1; + assertThat(result.deleteFiles()).hasSize(expectedNumDeleteFiles); + assertThat(result.referencedDataFiles()).hasSize(2); + assertThat(result.referencesDataFiles()).isTrue(); + + // commit the deletes + RowDelta rowDelta = table.newRowDelta(); + result.deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.commit(); + + // verify correctness + List expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, "aaa")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); + } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java index 9c0e8235f842..01f24c4dfe04 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -28,14 +29,18 @@ import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowDelta; import org.apache.iceberg.RowLevelOperationMode; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.TestSparkCatalog; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -80,6 +85,46 @@ public static void clearTestSparkCatalogCache() { TestSparkCatalog.clearTables(); } + @Test + public void testDeleteFileGranularity() throws NoSuchTableException { + checkDeleteFileGranularity(DeleteGranularity.FILE); + } + + @Test + public void testDeletePartitionGranularity() throws NoSuchTableException { + checkDeleteFileGranularity(DeleteGranularity.PARTITION); + } + + private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity) + throws NoSuchTableException { + createAndInitPartitionedTable(); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + + append(tableName, new Employee(1, "hr"), new Employee(2, "hr")); + append(tableName, new Employee(3, "hr"), new Employee(4, "hr")); + append(tableName, new Employee(1, "hardware"), new Employee(2, "hardware")); + append(tableName, new Employee(3, "hardware"), new Employee(4, "hardware")); + + createBranchIfNeeded(); + + sql("DELETE FROM %s WHERE id = 1 OR id = 3", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(table.snapshots()).hasSize(5); + + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2"; + validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, null); + + assertEquals( + "Should have expected rows", + ImmutableList.of(row(2, "hardware"), row(2, "hr"), row(4, "hardware"), row(4, "hr")), + sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget())); + } + @Test public void testCommitUnknownException() { createAndInitTable("id INT, dep STRING, category STRING"); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java index e743b32b45db..f9c13d828cd3 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java @@ -18,11 +18,20 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.Map; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowLevelOperationMode; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.DeleteGranularity; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Encoders; +import org.junit.Test; public class TestMergeOnReadMerge extends TestMerge { @@ -56,4 +65,52 @@ protected Map extraTableProperties() { TableProperties.MERGE_MODE, RowLevelOperationMode.MERGE_ON_READ.modeName()); } + + @Test + public void testMergeDeleteFileGranularity() { + checkMergeDeleteGranularity(DeleteGranularity.FILE); + } + + @Test + public void testMergeDeletePartitionGranularity() { + checkMergeDeleteGranularity(DeleteGranularity.PARTITION); + } + + private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) { + createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + + append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }"); + + createBranchIfNeeded(); + + createOrReplaceView("source", ImmutableList.of(1, 3, 5), Encoders.INT()); + + sql( + "MERGE INTO %s AS t USING source AS s " + + "ON t.id == s.value " + + "WHEN MATCHED THEN " + + " DELETE " + + "WHEN NOT MATCHED THEN " + + " INSERT (id, dep) VALUES (-1, 'other')", + commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(table.snapshots()).hasSize(5); + + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2"; + validateMergeOnRead(currentSnapshot, "3", expectedDeleteFilesCount, "1"); + + assertEquals( + "Should have expected rows", + ImmutableList.of(row(-1, "other"), row(2, "hr"), row(2, "it"), row(4, "hr"), row(4, "it")), + sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget())); + } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java index 0207d4ce4d51..45ef343b2dfe 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java @@ -18,11 +18,19 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.Map; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowLevelOperationMode; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.DeleteGranularity; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.SnapshotUtil; +import org.junit.Test; public class TestMergeOnReadUpdate extends TestUpdate { @@ -56,4 +64,51 @@ protected Map extraTableProperties() { TableProperties.UPDATE_MODE, RowLevelOperationMode.MERGE_ON_READ.modeName()); } + + @Test + public void testUpdateFileGranularity() { + checkUpdateFileGranularity(DeleteGranularity.FILE); + } + + @Test + public void testUpdatePartitionGranularity() { + checkUpdateFileGranularity(DeleteGranularity.PARTITION); + } + + private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) { + createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + + append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }"); + + createBranchIfNeeded(); + + sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(table.snapshots()).hasSize(5); + + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2"; + validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2"); + + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(0, "hr"), + row(2, "hr"), + row(2, "hr"), + row(4, "hr"), + row(0, "it"), + row(2, "it"), + row(2, "it"), + row(4, "it")), + sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); + } } diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java index 094fd5844398..71813c5a63a6 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java @@ -33,6 +33,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; @@ -364,8 +365,20 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) @Benchmark @Threads(1) - public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) + public void writeUnpartitionedClusteredPositionDeleteWriterPartitionGranularity( + Blackhole blackhole) throws IOException { + writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.PARTITION); + } + + @Benchmark + @Threads(1) + public void writeUnpartitionedClusteredPositionDeleteWriterFileGranularity(Blackhole blackhole) throws IOException { + writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.FILE); + } + + private void writeUnpartitionedClusteredPositionDeleteWriter( + Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException { FileIO io = table().io(); OutputFileFactory fileFactory = newFileFactory(); @@ -374,7 +387,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity); PositionDelete positionDelete = PositionDelete.create(); try (ClusteredPositionDeleteWriter closeableWriter = writer) { @@ -391,7 +404,20 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) @Benchmark @Threads(1) - public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) throws IOException { + public void writeUnpartitionedFanoutPositionDeleteWriterPartitionGranularity(Blackhole blackhole) + throws IOException { + writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.PARTITION); + } + + @Benchmark + @Threads(1) + public void writeUnpartitionedFanoutPositionDeleteWriterFileGranularity(Blackhole blackhole) + throws IOException { + writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.FILE); + } + + private void writeUnpartitionedFanoutPositionDeleteWriterPartition( + Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException { FileIO io = table().io(); OutputFileFactory fileFactory = newFileFactory(); @@ -400,7 +426,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th FanoutPositionOnlyDeleteWriter writer = new FanoutPositionOnlyDeleteWriter<>( - writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity); PositionDelete positionDelete = PositionDelete.create(); try (FanoutPositionOnlyDeleteWriter closeableWriter = writer) { @@ -417,8 +443,20 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th @Benchmark @Threads(1) - public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole blackhole) - throws IOException { + public void writeUnpartitionedFanoutPositionDeleteWriterShuffledPartitionGranularity( + Blackhole blackhole) throws IOException { + writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.PARTITION); + } + + @Benchmark + @Threads(1) + public void writeUnpartitionedFanoutPositionDeleteWriterShuffledFileGranularity( + Blackhole blackhole) throws IOException { + writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.FILE); + } + + private void writeUnpartitionedFanoutPositionDeleteWriterShuffled( + Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException { FileIO io = table().io(); @@ -428,7 +466,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole black FanoutPositionOnlyDeleteWriter writer = new FanoutPositionOnlyDeleteWriter<>( - writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity); PositionDelete positionDelete = PositionDelete.create(); try (FanoutPositionOnlyDeleteWriter closeableWriter = writer) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 1454fc534e7d..07393a67fe31 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -43,6 +43,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -708,4 +709,15 @@ private long sparkAdvisoryPartitionSize() { private double shuffleCompressionRatio(FileFormat outputFileFormat, String outputCodec) { return SparkCompressionUtil.shuffleCompressionRatio(spark, outputFileFormat, outputCodec); } + + public DeleteGranularity deleteGranularity() { + String valueAsString = + confParser + .stringConf() + .option(SparkWriteOptions.DELETE_GRANULARITY) + .tableProperty(TableProperties.DELETE_GRANULARITY) + .defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT) + .parse(); + return DeleteGranularity.fromString(valueAsString); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 48dfa44c9122..d9c4f66b192b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -83,4 +83,7 @@ private SparkWriteOptions() {} // Overrides the advisory partition size public static final String ADVISORY_PARTITION_SIZE = "advisory-partition-size"; + + // Overrides the delete granularity + public static final String DELETE_GRANULARITY = "delete-granularity"; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index a397a069ee1d..d91779475845 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -30,6 +30,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.io.ClusteredPositionDeleteWriter; import org.apache.iceberg.io.FileIO; @@ -70,6 +71,7 @@ public class SparkPositionDeletesRewrite implements Write { private final String queryId; private final FileFormat format; private final long targetFileSize; + private final DeleteGranularity deleteGranularity; private final Schema writeSchema; private final StructType dsSchema; private final String fileSetId; @@ -103,6 +105,7 @@ public class SparkPositionDeletesRewrite implements Write { this.queryId = writeInfo.queryId(); this.format = writeConf.deleteFileFormat(); this.targetFileSize = writeConf.targetDeleteFileSize(); + this.deleteGranularity = writeConf.deleteGranularity(); this.writeSchema = writeSchema; this.dsSchema = dsSchema; this.fileSetId = writeConf.rewrittenFileSetId(); @@ -129,6 +132,7 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { queryId, format, targetFileSize, + deleteGranularity, writeSchema, dsSchema, specId, @@ -179,6 +183,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final String queryId; private final FileFormat format; private final Long targetFileSize; + private final DeleteGranularity deleteGranularity; private final Schema writeSchema; private final StructType dsSchema; private final int specId; @@ -190,6 +195,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { String queryId, FileFormat format, long targetFileSize, + DeleteGranularity deleteGranularity, Schema writeSchema, StructType dsSchema, int specId, @@ -199,6 +205,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.queryId = queryId; this.format = format; this.targetFileSize = targetFileSize; + this.deleteGranularity = deleteGranularity; this.writeSchema = writeSchema; this.dsSchema = dsSchema; this.specId = specId; @@ -241,6 +248,7 @@ public DataWriter createWriter(int partitionId, long taskId) { writerFactoryWithoutRow, deleteFileFactory, targetFileSize, + deleteGranularity, dsSchema, specId, partition); @@ -289,6 +297,7 @@ private static class DeleteWriter implements DataWriter { private final SparkFileWriterFactory writerFactoryWithoutRow; private final OutputFileFactory deleteFileFactory; private final long targetFileSize; + private final DeleteGranularity deleteGranularity; private final PositionDelete positionDelete; private final FileIO io; private final PartitionSpec spec; @@ -322,11 +331,13 @@ private static class DeleteWriter implements DataWriter { SparkFileWriterFactory writerFactoryWithoutRow, OutputFileFactory deleteFileFactory, long targetFileSize, + DeleteGranularity deleteGranularity, StructType dsSchema, int specId, StructLike partition) { this.deleteFileFactory = deleteFileFactory; this.targetFileSize = targetFileSize; + this.deleteGranularity = deleteGranularity; this.writerFactoryWithRow = writerFactoryWithRow; this.writerFactoryWithoutRow = writerFactoryWithoutRow; this.positionDelete = PositionDelete.create(); @@ -387,7 +398,7 @@ private ClusteredPositionDeleteWriter lazyWriterWithRow() { if (writerWithRow == null) { this.writerWithRow = new ClusteredPositionDeleteWriter<>( - writerFactoryWithRow, deleteFileFactory, io, targetFileSize); + writerFactoryWithRow, deleteFileFactory, io, targetFileSize, deleteGranularity); } return writerWithRow; } @@ -396,7 +407,7 @@ private ClusteredPositionDeleteWriter lazyWriterWithoutRow() { if (writerWithoutRow == null) { this.writerWithoutRow = new ClusteredPositionDeleteWriter<>( - writerFactoryWithoutRow, deleteFileFactory, io, targetFileSize); + writerFactoryWithoutRow, deleteFileFactory, io, targetFileSize, deleteGranularity); } return writerWithoutRow; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index dcc949b290af..022283631fd8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -42,6 +42,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.expressions.Expression; @@ -441,11 +442,14 @@ protected PartitioningWriter, DeleteWriteResult> new FileIO io = table.io(); boolean inputOrdered = context.inputOrdered(); long targetFileSize = context.targetDeleteFileSize(); + DeleteGranularity deleteGranularity = context.deleteGranularity(); if (inputOrdered) { - return new ClusteredPositionDeleteWriter<>(writers, files, io, targetFileSize); + return new ClusteredPositionDeleteWriter<>( + writers, files, io, targetFileSize, deleteGranularity); } else { - return new FanoutPositionOnlyDeleteWriter<>(writers, files, io, targetFileSize); + return new FanoutPositionOnlyDeleteWriter<>( + writers, files, io, targetFileSize, deleteGranularity); } } } @@ -684,6 +688,7 @@ private static class Context implements Serializable { private final StructType metadataSparkType; private final FileFormat deleteFileFormat; private final long targetDeleteFileSize; + private final DeleteGranularity deleteGranularity; private final String queryId; private final boolean useFanoutWriter; private final boolean inputOrdered; @@ -700,6 +705,7 @@ private static class Context implements Serializable { this.deleteSparkType = info.rowIdSchema().get(); this.deleteFileFormat = writeConf.deleteFileFormat(); this.targetDeleteFileSize = writeConf.targetDeleteFileSize(); + this.deleteGranularity = writeConf.deleteGranularity(); this.metadataSparkType = info.metadataSchema().get(); this.queryId = info.queryId(); this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements); @@ -734,6 +740,10 @@ long targetDeleteFileSize() { return targetDeleteFileSize; } + DeleteGranularity deleteGranularity() { + return deleteGranularity; + } + String queryId() { return queryId; } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index abf40ebd953d..7c3ecac676ef 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -45,6 +45,7 @@ import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Map; @@ -52,6 +53,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.spark.sql.internal.SQLConf; @@ -76,6 +78,61 @@ public void after() { sql("DROP TABLE IF EXISTS %s", tableName); } + @Test + public void testDeleteGranularityDefault() { + Table table = validationCatalog.loadTable(tableIdent); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + DeleteGranularity value = writeConf.deleteGranularity(); + assertThat(value).isEqualTo(DeleteGranularity.PARTITION); + } + + @Test + public void testDeleteGranularityTableProperty() { + Table table = validationCatalog.loadTable(tableIdent); + + table + .updateProperties() + .set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.FILE.toString()) + .commit(); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + DeleteGranularity value = writeConf.deleteGranularity(); + assertThat(value).isEqualTo(DeleteGranularity.FILE); + } + + @Test + public void testDeleteGranularityWriteOption() { + Table table = validationCatalog.loadTable(tableIdent); + + table + .updateProperties() + .set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.PARTITION.toString()) + .commit(); + + Map options = + ImmutableMap.of(SparkWriteOptions.DELETE_GRANULARITY, DeleteGranularity.FILE.toString()); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, options); + + DeleteGranularity value = writeConf.deleteGranularity(); + assertThat(value).isEqualTo(DeleteGranularity.FILE); + } + + @Test + public void testDeleteGranularityInvalidValue() { + Table table = validationCatalog.loadTable(tableIdent); + + table.updateProperties().set(TableProperties.DELETE_GRANULARITY, "invalid").commit(); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + assertThatThrownBy(writeConf::deleteGranularity) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unknown delete granularity"); + } + @Test public void testAdvisoryPartitionSize() { Table table = validationCatalog.loadTable(tableIdent); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index d1e33950ebe9..7c55ff82df1e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -57,6 +57,7 @@ import org.apache.iceberg.actions.SizeBasedFileRewriter; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -131,6 +132,42 @@ public void testEmptyTable() { assertThat(result.addedDeleteFilesCount()).as("No added delete files").isZero(); } + @TestTemplate + public void testFileGranularity() throws Exception { + checkDeleteGranularity(DeleteGranularity.FILE); + } + + @TestTemplate + public void testPartitionGranularity() throws Exception { + checkDeleteGranularity(DeleteGranularity.PARTITION); + } + + private void checkDeleteGranularity(DeleteGranularity deleteGranularity) throws Exception { + Table table = createTableUnpartitioned(2, SCALE); + + table + .updateProperties() + .set(TableProperties.DELETE_GRANULARITY, deleteGranularity.toString()) + .commit(); + + List dataFiles = TestHelpers.dataFiles(table); + assertThat(dataFiles).hasSize(2); + + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + + List deleteFiles = deleteFiles(table); + assertThat(deleteFiles).hasSize(2); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + + int expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? 2 : 1; + assertThat(result.addedDeleteFilesCount()).isEqualTo(expectedDeleteFilesCount); + } + @TestTemplate public void testUnpartitioned() throws Exception { Table table = createTableUnpartitioned(2, SCALE);