From 6ecf7aaeb25969355eeb996441a9019afd3e4faa Mon Sep 17 00:00:00 2001 From: Jason Fine Date: Thu, 14 Dec 2023 23:16:22 +0200 Subject: [PATCH] restored fast append fix --- .../java/org/apache/iceberg/FastAppend.java | 18 ++++++++------- .../org/apache/iceberg/TestFastAppend.java | 22 +++++++++---------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 50cad294c570..85d81424646c 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -49,7 +49,7 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final List newFiles = Lists.newArrayList(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); - private List newManifests = null; + private List newManifests = Lists.newLinkedList(); private boolean hasNewFiles = false; FastAppend(String tableName, TableOperations ops) { @@ -147,7 +147,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { try { List newWrittenManifests = writeNewManifests(); - if (newWrittenManifests != null) { + if (!newWrittenManifests.isEmpty()) { manifests.addAll(newWrittenManifests); } } catch (IOException e) { @@ -178,16 +178,18 @@ public Object updateEvent() { @Override protected void cleanUncommitted(Set committed) { - if (newManifests != null) { + if (!newManifests.isEmpty()) { List committedNewManifests = Lists.newArrayList(); - for (ManifestFile manifest : newManifests) { + java.util.Iterator newManifestsIterator = newManifests.iterator(); + while (newManifestsIterator.hasNext()) { + ManifestFile manifest = newManifestsIterator.next(); if (committed.contains(manifest)) { committedNewManifests.add(manifest); } else { deleteFile(manifest.path()); + newManifestsIterator.remove(); } } - this.newManifests = committedNewManifests; } // clean up only rewrittenAppendManifests as they are always owned by the table @@ -200,12 +202,12 @@ protected void cleanUncommitted(Set committed) { } private List writeNewManifests() throws IOException { - if (hasNewFiles && newManifests != null) { + if (hasNewFiles && !newManifests.isEmpty()) { newManifests.forEach(file -> deleteFile(file.path())); - newManifests = null; + newManifests.clear(); } - if (newManifests == null && !newFiles.isEmpty()) { + if (newManifests.isEmpty() && !newFiles.isEmpty()) { RollingManifestWriter writer = newRollingManifestWriter(spec); try { newFiles.forEach(writer::add); diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index d39e0e4a1aad..95e6baf9f32f 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -314,34 +314,34 @@ public void testRecoveryWithoutManifestList() { } @Test - public void testRecoveryWithTransaction() { + public void testRecoveryWithManualReCommit() { TestTables.TestTableOperations ops = table.ops(); ops.failCommits(5); - Transaction transaction = table.newTransaction(); - AppendFiles append = transaction.newFastAppend().appendFile(FILE_B); + AppendFiles append = table.newFastAppend().appendFile(FILE_B); Snapshot pending = append.apply(); ManifestFile originalManifest = pending.allManifests(FILE_IO).get(0); - append.commit(); - Assertions.assertThatThrownBy(transaction::commitTransaction) + Assertions.assertThatThrownBy(append::commit) .isInstanceOf(CommitFailedException.class) .hasMessage("Injected failure"); TableMetadata metadata = readMetadata(); Assert.assertNull("No snapshot is committed", metadata.currentSnapshot()); + Assert.assertFalse( + "Original manifest from before failure should be deleted", + new File(originalManifest.path()).exists()); - transaction.commitTransaction(); + ManifestFile newManifest = append.apply().allManifests(FILE_IO).get(0); + append.commit(); metadata = readMetadata(); validateSnapshot(null, metadata.currentSnapshot(), FILE_B); + Assert.assertTrue("New manifest file should exist", new File(newManifest.path()).exists()); Assert.assertTrue( - "Original manifest from before retry should exist", - new File(originalManifest.path()).exists()); - Assert.assertTrue( - "Should commit the original manifest created before retrying the transaction", - metadata.currentSnapshot().allManifests(FILE_IO).contains(originalManifest)); + "Should commit the a new manifest created for retrying", + metadata.currentSnapshot().allManifests(FILE_IO).contains(newManifest)); } @Test