diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index e523814fe3b4..dda5da78e311 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -781,6 +781,10 @@ acceptedBreaks: - code: "java.class.removed" old: "interface org.apache.iceberg.view.SQLViewRepresentation" justification: "Moving from iceberg-api to iceberg-core" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.DeleteFiles org.apache.iceberg.DeleteFiles::validateFilesExist()" + justification: "Adding validateFilesExist API for validating files exist when\ + \ commiting DeleteFiles" - code: "java.method.addedToInterface" new: "method org.apache.iceberg.catalog.Namespace org.apache.iceberg.view.ViewVersion::defaultNamespace()" justification: "Acceptable break due to updating View APIs and the View Spec" diff --git a/api/src/main/java/org/apache/iceberg/DeleteFiles.java b/api/src/main/java/org/apache/iceberg/DeleteFiles.java index 74d31a6dad81..8ee9362e40ba 100644 --- a/api/src/main/java/org/apache/iceberg/DeleteFiles.java +++ b/api/src/main/java/org/apache/iceberg/DeleteFiles.java @@ -81,4 +81,12 @@ default DeleteFiles deleteFile(DataFile file) { * @return this for method chaining */ DeleteFiles caseSensitive(boolean caseSensitive); + + /** + * Enables validation that any files that are part of the deletion still exist when committing the + * operation. + * + * @return this for method chaining + */ + DeleteFiles validateFilesExist(); } diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java b/core/src/main/java/org/apache/iceberg/StreamingDelete.java index 8ff7bb831ec9..df5a11bf31c5 100644 --- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java +++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java @@ -28,6 +28,8 @@ * CommitFailedException}. */ public class StreamingDelete extends MergingSnapshotProducer implements DeleteFiles { + private boolean validateFilesToDeleteExist = false; + protected StreamingDelete(String tableName, TableOperations ops) { super(tableName, ops); } @@ -60,9 +62,22 @@ public StreamingDelete deleteFromRowFilter(Expression expr) { return this; } + @Override + public DeleteFiles validateFilesExist() { + this.validateFilesToDeleteExist = true; + return this; + } + @Override public StreamingDelete toBranch(String branch) { targetBranch(branch); return this; } + + @Override + protected void validate(TableMetadata base, Snapshot parent) { + if (validateFilesToDeleteExist) { + failMissingDeletePaths(); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java index 4e4565306c00..7b771de5903e 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java @@ -412,6 +412,24 @@ public void testDeleteWithCollision() { afterDeletePartitions); } + @Test + public void testDeleteValidateFileExistence() { + commit(table, table.newFastAppend().appendFile(FILE_B), branch); + commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch); + Assertions.assertThatThrownBy( + () -> commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch)) + .isInstanceOf(ValidationException.class); + Assertions.assertThat(table.io().newInputFile(FILE_B.path().toString()).exists()).isFalse(); + } + + @Test + public void testDeleteFilesNoValidation() { + commit(table, table.newFastAppend().appendFile(FILE_B), branch); + commit(table, table.newDelete().deleteFile(FILE_B), branch); + commit(table, table.newDelete().deleteFile(FILE_B), branch); + Assertions.assertThat(table.io().newInputFile(FILE_B.path().toString()).exists()).isFalse(); + } + private static ByteBuffer longToBuffer(long value) { return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value); }