From 4170bfda1970e31f7256d7b44188e49b288340b8 Mon Sep 17 00:00:00 2001 From: Jason Fine Date: Fri, 15 Dec 2023 00:20:19 +0200 Subject: [PATCH] Fixed manual retries failing with transactions This is a similar issue to the bugs with manual retries with SnapshotProducer but the transaction was caching the TableMetadata which points to deleted files after cleanup. This forces the transaction to re-apply the changes ensuring a new valid TableMetadata is used --- .../org/apache/iceberg/BaseTransaction.java | 9 ++++- .../org/apache/iceberg/TestTransaction.java | 36 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 018f70eb16fa..fc5883248d14 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -73,6 +73,7 @@ enum TransactionType { private TransactionType type; private TableMetadata base; private TableMetadata current; + private boolean forceReApply; private boolean hasLastOpCommitted; private final MetricsReporter reporter; @@ -97,6 +98,7 @@ enum TransactionType { this.type = type; this.hasLastOpCommitted = true; this.reporter = reporter; + this.forceReApply = false; } @Override @@ -463,6 +465,10 @@ private void commitSimpleTransaction() { } private void cleanUpOnCommitFailure() { + // After this cleanup we will delete manifests that the current metadata is using. So we must + // re-apply updates + this.forceReApply = true; + // the commit failed and no files were committed. clean up each update. cleanAllUpdates(); @@ -485,7 +491,7 @@ private void cleanAllUpdates() { } private void applyUpdates(TableOperations underlyingOps) { - if (base != underlyingOps.refresh()) { + if (base != underlyingOps.refresh() || forceReApply) { // use refreshed the metadata this.base = underlyingOps.current(); this.current = underlyingOps.current(); @@ -499,6 +505,7 @@ private void applyUpdates(TableOperations underlyingOps) { throw new PendingUpdateFailedException(e); } } + forceReApply = false; } } diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index b76974e2440d..1c1646ea7c9c 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -760,4 +760,40 @@ public void testSimpleTransactionNotDeletingMetadataOnUnknownSate() throws IOExc Assert.assertTrue("Manifest file should exist", new File(manifests.get(0).path()).exists()); Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir)); } + + @Test + public void testManuallRetryWithTransaction() { + TestTables.TestTableOperations ops = table.ops(); + ops.failCommits(5); + + Transaction transaction = table.newTransaction(); + AppendFiles append = transaction.newFastAppend().appendFile(FILE_B); + + Snapshot pending = append.apply(); + ManifestFile originalManifest = pending.allManifests(FILE_IO).get(0); + append.commit(); + + Assertions.assertThatThrownBy(transaction::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); + Assert.assertFalse( + "Original manifest should be deleted because commit failed", + new File(originalManifest.path()).exists()); + + TableMetadata metadata = readMetadata(); + Assert.assertNull("No snapshot is committed", metadata.currentSnapshot()); + + ManifestFile newManifest = append.apply().allManifests(FILE_IO).get(0); + + // append.commit(); + transaction.commitTransaction(); + + metadata = readMetadata(); + validateSnapshot(null, metadata.currentSnapshot(), FILE_B); + Assert.assertTrue( + "Should commit the new manifest created after retrying the transaction", + metadata.currentSnapshot().allManifests(FILE_IO).contains(newManifest)); + Assert.assertTrue( + "New manifest recreated after cleanup should exist", new File(newManifest.path()).exists()); + } }