Skip to content

Commit

Permalink
Core: Add a validation API to DeleteFiles which validates files exist
Browse files Browse the repository at this point in the history
prior to attempting to deletion.

Simplify/improve the validation check

Use failMissingDeletePaths, more simplification
  • Loading branch information
amogh-jahagirdar committed Sep 20, 2023
1 parent 768e516 commit ac725f3
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,10 @@ acceptedBreaks:
- code: "java.class.removed"
old: "interface org.apache.iceberg.view.SQLViewRepresentation"
justification: "Moving from iceberg-api to iceberg-core"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.DeleteFiles org.apache.iceberg.DeleteFiles::validateFilesExist()"
justification: "Adding validateFilesExist API for validating files exist when\
\ commiting DeleteFiles"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.catalog.Namespace org.apache.iceberg.view.ViewVersion::defaultNamespace()"
justification: "Acceptable break due to updating View APIs and the View Spec"
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,12 @@ default DeleteFiles deleteFile(DataFile file) {
* @return this for method chaining
*/
DeleteFiles caseSensitive(boolean caseSensitive);

/**
* Enables validation that any files that are part of the deletion still exist when committing the
* operation.
*
* @return this for method chaining
*/
DeleteFiles validateFilesExist();
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ void caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
}

CharSequenceSet deletedPaths() {
return deletePaths;
}

/** Add a specific path to be deleted in the new snapshot. */
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/StreamingDelete.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
* CommitFailedException}.
*/
public class StreamingDelete extends MergingSnapshotProducer<DeleteFiles> implements DeleteFiles {
private boolean validateFilesToDeleteExist = false;

protected StreamingDelete(String tableName, TableOperations ops) {
super(tableName, ops);
}
Expand Down Expand Up @@ -60,9 +62,22 @@ public StreamingDelete deleteFromRowFilter(Expression expr) {
return this;
}

@Override
public DeleteFiles validateFilesExist() {
this.validateFilesToDeleteExist = true;
return this;
}

@Override
public StreamingDelete toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
protected void validate(TableMetadata base, Snapshot parent) {
if (validateFilesToDeleteExist) {
failMissingDeletePaths();
}
}
}
18 changes: 18 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,24 @@ public void testDeleteWithCollision() {
afterDeletePartitions);
}

@Test
public void testDeleteValidateFileExistence() {
commit(table, table.newFastAppend().appendFile(FILE_B), branch);
commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch);
Assertions.assertThatThrownBy(
() -> commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch))
.isInstanceOf(ValidationException.class);
Assertions.assertThat(table.io().newInputFile(FILE_B.path().toString()).exists()).isFalse();
}

@Test
public void testDeleteFilesNoValidation() {
commit(table, table.newFastAppend().appendFile(FILE_B), branch);
commit(table, table.newDelete().deleteFile(FILE_B), branch);
commit(table, table.newDelete().deleteFile(FILE_B), branch);
Assertions.assertThat(table.io().newInputFile(FILE_B.path().toString()).exists()).isFalse();
}

private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}
Expand Down

0 comments on commit ac725f3

Please sign in to comment.