Skip to content

Commit

Permalink
Core: Add a validation API to DeleteFiles which validates files exist
Browse files Browse the repository at this point in the history
prior to attempting to deletion.

Simplify/improve the validation check

Use failMissingDeletePaths, more simplification
  • Loading branch information
amogh-jahagirdar committed Sep 21, 2023
1 parent 768e516 commit 172bab7
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 0 deletions.
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,15 @@ default DeleteFiles deleteFile(DataFile file) {
* @return this for method chaining
*/
DeleteFiles caseSensitive(boolean caseSensitive);

/**
* Enables validation that any files that are part of the deletion still exist when committing the
* operation.
*
* @return this for method chaining
*/
default DeleteFiles validateFilesExist() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement validateFilesExist");
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/StreamingDelete.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
* CommitFailedException}.
*/
public class StreamingDelete extends MergingSnapshotProducer<DeleteFiles> implements DeleteFiles {
private boolean validateFilesToDeleteExist = false;

protected StreamingDelete(String tableName, TableOperations ops) {
super(tableName, ops);
}
Expand Down Expand Up @@ -60,9 +62,22 @@ public StreamingDelete deleteFromRowFilter(Expression expr) {
return this;
}

@Override
public DeleteFiles validateFilesExist() {
this.validateFilesToDeleteExist = true;
return this;
}

@Override
public StreamingDelete toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
protected void validate(TableMetadata base, Snapshot parent) {
if (validateFilesToDeleteExist) {
failMissingDeletePaths();
}
}
}
31 changes: 31 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,37 @@ public void testDeleteWithCollision() {
afterDeletePartitions);
}

@Test
public void testDeleteValidateFileExistence() {
commit(table, table.newFastAppend().appendFile(FILE_B), branch);
Snapshot delete =
commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch);
validateManifestEntries(
Iterables.getOnlyElement(delete.allManifests(FILE_IO)),
ids(delete.snapshotId()),
files(FILE_B),
statuses(Status.DELETED));

Assertions.assertThatThrownBy(
() -> commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch))
.isInstanceOf(ValidationException.class);
}

@Test
public void testDeleteFilesNoValidation() {
commit(table, table.newFastAppend().appendFile(FILE_B), branch);
Snapshot delete1 = commit(table, table.newDelete().deleteFile(FILE_B), branch);
validateManifestEntries(
Iterables.getOnlyElement(delete1.allManifests(FILE_IO)),
ids(delete1.snapshotId()),
files(FILE_B),
statuses(Status.DELETED));

Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B), branch);
Assertions.assertThat(delete2.allManifests(FILE_IO).isEmpty()).isTrue();
Assertions.assertThat(delete2.removedDataFiles(FILE_IO).iterator().hasNext()).isFalse();
}

private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}
Expand Down

0 comments on commit 172bab7

Please sign in to comment.