From 172bab75c8549229b0ab468506feecb504923e34 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Thu, 7 Sep 2023 11:06:09 -0700 Subject: [PATCH] Core: Add a validation API to DeleteFiles which validates files exist prior to attempting to deletion. Simplify/improve the validation check Use failMissingDeletePaths, more simplification --- .../java/org/apache/iceberg/DeleteFiles.java | 11 +++++++ .../org/apache/iceberg/StreamingDelete.java | 15 +++++++++ .../org/apache/iceberg/TestDeleteFiles.java | 31 +++++++++++++++++++ 3 files changed, 57 insertions(+) diff --git a/api/src/main/java/org/apache/iceberg/DeleteFiles.java b/api/src/main/java/org/apache/iceberg/DeleteFiles.java index 74d31a6dad81..8a396920e03b 100644 --- a/api/src/main/java/org/apache/iceberg/DeleteFiles.java +++ b/api/src/main/java/org/apache/iceberg/DeleteFiles.java @@ -81,4 +81,15 @@ default DeleteFiles deleteFile(DataFile file) { * @return this for method chaining */ DeleteFiles caseSensitive(boolean caseSensitive); + + /** + * Enables validation that any files that are part of the deletion still exist when committing the + * operation. + * + * @return this for method chaining + */ + default DeleteFiles validateFilesExist() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement validateFilesExist"); + } } diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java b/core/src/main/java/org/apache/iceberg/StreamingDelete.java index 8ff7bb831ec9..df5a11bf31c5 100644 --- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java +++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java @@ -28,6 +28,8 @@ * CommitFailedException}. */ public class StreamingDelete extends MergingSnapshotProducer implements DeleteFiles { + private boolean validateFilesToDeleteExist = false; + protected StreamingDelete(String tableName, TableOperations ops) { super(tableName, ops); } @@ -60,9 +62,22 @@ public StreamingDelete deleteFromRowFilter(Expression expr) { return this; } + @Override + public DeleteFiles validateFilesExist() { + this.validateFilesToDeleteExist = true; + return this; + } + @Override public StreamingDelete toBranch(String branch) { targetBranch(branch); return this; } + + @Override + protected void validate(TableMetadata base, Snapshot parent) { + if (validateFilesToDeleteExist) { + failMissingDeletePaths(); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java index 4e4565306c00..63fc7010c49c 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java @@ -412,6 +412,37 @@ public void testDeleteWithCollision() { afterDeletePartitions); } + @Test + public void testDeleteValidateFileExistence() { + commit(table, table.newFastAppend().appendFile(FILE_B), branch); + Snapshot delete = + commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch); + validateManifestEntries( + Iterables.getOnlyElement(delete.allManifests(FILE_IO)), + ids(delete.snapshotId()), + files(FILE_B), + statuses(Status.DELETED)); + + Assertions.assertThatThrownBy( + () -> commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch)) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testDeleteFilesNoValidation() { + commit(table, table.newFastAppend().appendFile(FILE_B), branch); + Snapshot delete1 = commit(table, table.newDelete().deleteFile(FILE_B), branch); + validateManifestEntries( + Iterables.getOnlyElement(delete1.allManifests(FILE_IO)), + ids(delete1.snapshotId()), + files(FILE_B), + statuses(Status.DELETED)); + + Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B), branch); + Assertions.assertThat(delete2.allManifests(FILE_IO).isEmpty()).isTrue(); + Assertions.assertThat(delete2.removedDataFiles(FILE_IO).iterator().hasNext()).isFalse(); + } + private static ByteBuffer longToBuffer(long value) { return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value); }