Skip to content

Commit

Permalink
Fixed manual retries failing with transactions
Browse files Browse the repository at this point in the history
This is a similar issue to the bugs with manual retries with SnapshotProducer but the transaction was caching the TableMetadata which points to deleted files after cleanup.
This forces the transaction to re-apply the changes ensuring a new valid TableMetadata is used
  • Loading branch information
jasonf20 committed Dec 14, 2023
1 parent 6ecf7aa commit 83fe828
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
13 changes: 12 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ enum TransactionType {
private TransactionType type;
private TableMetadata base;
private TableMetadata current;
private boolean forceReApply;
private boolean hasLastOpCommitted;
private final MetricsReporter reporter;

Expand All @@ -97,6 +98,7 @@ enum TransactionType {
this.type = type;
this.hasLastOpCommitted = true;
this.reporter = reporter;
this.forceReApply = false;
}

@Override
Expand Down Expand Up @@ -394,6 +396,10 @@ private void commitReplaceTransaction(boolean orCreate) {
}

private void commitSimpleTransaction() {
if (forceReApply) {
applyUpdates(ops);
}

// if there were no changes, don't try to commit
if (base == current) {
return;
Expand Down Expand Up @@ -463,6 +469,10 @@ private void commitSimpleTransaction() {
}

private void cleanUpOnCommitFailure() {
// After this cleanup we will delete manifests that the current metadata is using. So we must
// re-apply updates
this.forceReApply = true;

// the commit failed and no files were committed. clean up each update.
cleanAllUpdates();

Expand All @@ -485,7 +495,7 @@ private void cleanAllUpdates() {
}

private void applyUpdates(TableOperations underlyingOps) {
if (base != underlyingOps.refresh()) {
if (base != underlyingOps.refresh() || forceReApply) {
// use refreshed the metadata
this.base = underlyingOps.current();
this.current = underlyingOps.current();
Expand All @@ -499,6 +509,7 @@ private void applyUpdates(TableOperations underlyingOps) {
throw new PendingUpdateFailedException(e);
}
}
forceReApply = false;
}
}

Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,11 @@ protected void cleanAll() {
}

protected void deleteFile(String path) {
deleteFunc.accept(path);
try {
deleteFunc.accept(path);
} catch (RuntimeIOException ignored) {
// Allow other deletes to run even if this one fails
}
}

protected OutputFile manifestListPath() {
Expand Down
37 changes: 37 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -760,4 +760,41 @@ public void testSimpleTransactionNotDeletingMetadataOnUnknownSate() throws IOExc
Assert.assertTrue("Manifest file should exist", new File(manifests.get(0).path()).exists());
Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir));
}

@Test
public void testManuallRetryWithTransaction() {
TestTables.TestTableOperations ops = table.ops();
ops.failCommits(5);

Transaction transaction = table.newTransaction();
AppendFiles append = transaction.newFastAppend().appendFile(FILE_B);

Snapshot pending = append.apply();
ManifestFile originalManifest = pending.allManifests(FILE_IO).get(0);
append.commit();

Assertions.assertThatThrownBy(transaction::commitTransaction)
.isInstanceOf(CommitFailedException.class)
.hasMessage("Injected failure");
Assert.assertFalse(
"Original manifest should be deleted because commit failed",
new File(originalManifest.path()).exists());

TableMetadata metadata = readMetadata();
Assert.assertNull("No snapshot is committed", metadata.currentSnapshot());

ManifestFile newManifest = append.apply().allManifests(FILE_IO).get(0);

// append.commit();
transaction.commitTransaction();

metadata = readMetadata();
validateSnapshot(null, metadata.currentSnapshot(), FILE_B);
Assert.assertTrue(
"Should commit the new manifest created after retrying the transaction",
metadata.currentSnapshot().allManifests(FILE_IO).contains(newManifest));
Assert.assertTrue(
"New manifest recreated after cleanup should exist", new File(newManifest.path()).exists());
}

}

0 comments on commit 83fe828

Please sign in to comment.