diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 3079757392cd..15ff16e39365 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -179,16 +179,16 @@ public Object updateEvent() { @Override protected void cleanUncommitted(Set committed) { if (newManifests != null) { - List committedNewManifests = Lists.newArrayList(); + boolean hasDeletes = false; for (ManifestFile manifest : newManifests) { - if (committed.contains(manifest)) { - committedNewManifests.add(manifest); - } else { + if (!committed.contains(manifest)) { deleteFile(manifest.path()); + hasDeletes = true; } } - - this.newManifests = committedNewManifests; + if (hasDeletes) { + this.newManifests = null; + } } // clean up only rewrittenAppendManifests as they are always owned by the table diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e7b2ccf69020..29cdf71275a1 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -886,16 +886,17 @@ public Object updateEvent() { private void cleanUncommittedAppends(Set committed) { if (cachedNewDataManifests != null) { - List committedNewDataManifests = Lists.newArrayList(); + boolean hasDeletes = false; for (ManifestFile manifest : cachedNewDataManifests) { - if (committed.contains(manifest)) { - committedNewDataManifests.add(manifest); - } else { + if (!committed.contains(manifest)) { deleteFile(manifest.path()); + hasDeletes = true; } } - this.cachedNewDataManifests = committedNewDataManifests; + if (hasDeletes) { + this.cachedNewDataManifests = null; + } } ListIterator deleteManifestsIterator = cachedNewDeleteManifests.listIterator(); diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index b76974e2440d..6aec39e76ee5 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -760,4 +760,51 @@ public void testSimpleTransactionNotDeletingMetadataOnUnknownSate() throws IOExc Assert.assertTrue("Manifest file should exist", new File(manifests.get(0).path()).exists()); Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir)); } + + @Test + public void testTransactionRecommit() { + // update table settings to merge when there are 3 manifests + table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "3").commit(); + + // create manifests so that the next commit will trigger a merge + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).commit(); + + // start a transaction with appended files that will merge + Transaction transaction = Transactions.newTransaction(table.name(), table.ops()); + + AppendFiles append = transaction.newAppend().appendFile(FILE_D); + Snapshot pending = append.apply(); + + Assert.assertEquals( + "Should produce 1 pending merged manifest", 1, pending.allManifests(table.io()).size()); + + // because a merge happened, the appended manifest is deleted the by append operation + append.commit(); + + // concurrently commit FILE_A without a transaction to cause the previous append to retry + table.newAppend().appendFile(FILE_C).commit(); + Assert.assertEquals( + "Should produce 1 committed merged manifest", + 1, + table.currentSnapshot().allManifests(table.io()).size()); + + transaction.commitTransaction(); + + Set paths = + Sets.newHashSet( + Iterables.transform( + table.newScan().planFiles(), task -> task.file().path().toString())); + Set expectedPaths = + Sets.newHashSet( + FILE_A.path().toString(), + FILE_B.path().toString(), + FILE_C.path().toString(), + FILE_D.path().toString()); + + Assert.assertEquals("Should contain all committed files", expectedPaths, paths); + + Assert.assertEquals( + "Should produce 2 manifests", 2, table.currentSnapshot().allManifests(table.io()).size()); + } }