From c75d8b18fe6f7cb250146fd9d00a15ff51df5c0c Mon Sep 17 00:00:00 2001 From: Jason Date: Thu, 25 Jul 2024 15:03:33 +0300 Subject: [PATCH] Table Operations: Added streaming update operation This operation allows adding multiple consecutive update in a single commit without equality deletes from prior updates affecting inserts that occurred after it. Before this commit you would need to do something like this: ``` for batch in batches: delta = transaction.newRowDelta() delta.add(batch.deletes) delta.add(batch.inserts) delta.commit() transaction.commit() ``` Which produces many manifest files and is very IO intensive. This operation allows: ``` update = table.newStreamingUpdate() for batch, batchIndex in enumerate(batches): update.newBatch() update.add(batch.deleteFiles) update.add(batch.dataFiles) update.commit() ``` --- .../org/apache/iceberg/StreamingUpdate.java | 129 +++++++++++ .../main/java/org/apache/iceberg/Table.java | 11 + .../java/org/apache/iceberg/Transaction.java | 11 + .../org/apache/iceberg/BaseReadOnlyTable.java | 5 + .../apache/iceberg/BaseStreamingUpdate.java | 205 ++++++++++++++++++ .../java/org/apache/iceberg/BaseTable.java | 5 + .../org/apache/iceberg/BaseTransaction.java | 15 ++ .../iceberg/MergingSnapshotProducer.java | 10 + .../org/apache/iceberg/SerializableTable.java | 5 + .../org/apache/iceberg/SnapshotProducer.java | 6 +- .../java/org/apache/iceberg/TestBase.java | 16 +- .../apache/iceberg/TestStreamingUpdate.java | 198 +++++++++++++++++ 12 files changed, 614 insertions(+), 2 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/StreamingUpdate.java create mode 100644 core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java create mode 100644 core/src/test/java/org/apache/iceberg/TestStreamingUpdate.java diff --git a/api/src/main/java/org/apache/iceberg/StreamingUpdate.java b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java new file mode 100644 index 000000000000..7b774e637d72 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; + +/** + * API for appending sequential updates to a table + * + *

This API accumulates batches of file additions and deletions by order, produces a new {@link + * Snapshot} of the changes where each batch is added to a new data sequence number, and commits + * that snapshot as the current. + * + *

When committing, these changes will be applied to the latest table snapshot. Commit conflicts + * will be resolved by applying the changes to the new latest snapshot and reattempting the commit. + * If any of the deleted files are no longer in the latest snapshot when reattempting, the commit + * will throw a {@link ValidationException}. + */ +public interface StreamingUpdate extends SnapshotUpdate { + + /** + * Start a new batch of changes. The changes in this batch will have a sequence number larger than + * the changes in the previous batches. + * + * @return this for method chaining + */ + default StreamingUpdate newBatch() { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement newBatch"); + } + + /** + * Add a new data file to the current batch. All files in this batch will receive the same data + * sequence number. + * + * @param dataFile a new data file + * @return this for method chaining + */ + default StreamingUpdate addFile(DataFile dataFile) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement addFile"); + } + + /** + * Add a new delete file to the current batch. All files in this batch will receive the same data + * sequence number. + * + * @param deleteFile a new delete file + * @return this for method chaining + */ + default StreamingUpdate addFile(DeleteFile deleteFile) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement addFile"); + } + + /** + * Set the snapshot ID used in any reads for this operation. + * + *

Validations will check changes after this snapshot ID. If the from snapshot is not set, all + * ancestor snapshots through the table's initial snapshot are validated. + * + * @param snapshotId a snapshot ID + * @return this for method chaining + */ + StreamingUpdate validateFromSnapshot(long snapshotId); + + /** + * Sets a conflict detection filter used to validate concurrently added data and delete files. + * + *

If not called, a true literal will be used as the conflict detection filter. + * + * @param newConflictDetectionFilter an expression on rows in the table + * @return this for method chaining + */ + StreamingUpdate conflictDetectionFilter(Expression newConflictDetectionFilter); + + /** + * Enables validation that data files added concurrently do not conflict with this commit's + * operation. + * + *

This method should be called when the table is queried to determine which files to + * delete/append. If a concurrent operation commits a new file after the data was read and that + * file might contain rows matching the specified conflict detection filter, this operation will + * detect this during retries and fail. + * + *

Calling this method is required to maintain serializable isolation for update/delete + * operations. Otherwise, the isolation level will be snapshot isolation. + * + *

Validation uses the conflict detection filter passed to {@link + * #conflictDetectionFilter(Expression)} and applies to operations that happened after the + * snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + StreamingUpdate validateNoConflictingDataFiles(); + + /** + * Enables validation that delete files added concurrently do not conflict with this commit's + * operation. + * + *

This method must be called when the table is queried to produce a row delta for UPDATE and + * MERGE operations independently of the isolation level. Calling this method isn't required for + * DELETE operations as it is OK to delete a record that is also deleted concurrently. + * + *

Validation uses the conflict detection filter passed to {@link + * #conflictDetectionFilter(Expression)} and applies to operations that happened after the + * snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + StreamingUpdate validateNoConflictingDeleteFiles(); +} diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 97ea9ba76526..cf05c7efb896 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -234,6 +234,17 @@ default AppendFiles newFastAppend() { */ RewriteFiles newRewrite(); + /** + * Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the + * table. + * + * @return a new {@link StreamingUpdate} + */ + default StreamingUpdate newStreamingUpdate() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement newStreamingUpdate()"); + } + /** * Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table * and commit. diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/api/src/main/java/org/apache/iceberg/Transaction.java index fd84a974013d..4abf78ea2ba5 100644 --- a/api/src/main/java/org/apache/iceberg/Transaction.java +++ b/api/src/main/java/org/apache/iceberg/Transaction.java @@ -95,6 +95,17 @@ default AppendFiles newFastAppend() { */ RewriteFiles newRewrite(); + /** + * Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the + * table. + * + * @return a new {@link StreamingUpdate} + */ + default StreamingUpdate newStreamingUpdate() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement newStreamingUpdate()"); + } + /** * Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this * table. diff --git a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java index 243c292ebc25..618fcfe900bb 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java @@ -66,6 +66,11 @@ public RewriteFiles newRewrite() { throw new UnsupportedOperationException("Cannot rewrite in a " + descriptor + " table"); } + @Override + public StreamingUpdate newStreamingUpdate() { + throw new UnsupportedOperationException("Cannot update a " + descriptor + " table"); + } + @Override public RewriteManifests rewriteManifests() { throw new UnsupportedOperationException( diff --git a/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java new file mode 100644 index 000000000000..6e9ddb143611 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Set; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.SnapshotUtil; + +class BaseStreamingUpdate extends MergingSnapshotProducer + implements StreamingUpdate { + private final List batches = Lists.newArrayList(); + + private Long appliedOnSequenceNumber = -1L; + private Long startingSnapshotId = null; // check all versions by default + + private Expression conflictDetectionFilter = Expressions.alwaysTrue(); + private boolean validateNewDataFiles; + private boolean validateNewDeleteFiles; + + BaseStreamingUpdate(String tableName, TableOperations ops) { + super(tableName, ops); + } + + @Override + protected BaseStreamingUpdate self() { + return this; + } + + @Override + protected String operation() { + return DataOperations.OVERWRITE; + } + + @Override + protected long nextSnapshotSequenceNumber(TableMetadata base) { + if (batches.isEmpty()) { + return super.nextSnapshotSequenceNumber(base); + } + // Each batch will advance the data sequence number by one, so we should advance the snapshot by + // the same amount. + // Otherwise, we will end up with data files with a sequence number larger than the snapshot + // sequence number. + return super.nextSnapshotSequenceNumber(base) + batches.size() - 1; + } + + @Override + public List apply(TableMetadata base, Snapshot snapshot) { + if (!batches.isEmpty()) { + long startingSequenceNumber = base.nextSequenceNumber(); + if (appliedOnSequenceNumber != startingSequenceNumber) { + // The base sequence number has changed, so we need to re-apply to re-calculate the sequence + // numbers + super.cleanUncommitted(EMPTY_SET); // Clean the old manifest files + super.clearFiles(); // Reset the files since they will be added again with new sequence + // numbers + int batchIndex = 0; + while (batchIndex < batches.size()) { + Batch batch = batches.get(batchIndex); + long dataSequenceNumber = startingSequenceNumber + batchIndex; + batch.getNewDataFiles().forEach(f -> add(f, dataSequenceNumber)); + batch.getNewDeleteFiles().forEach(f -> add(f, dataSequenceNumber)); + batchIndex += 1; + } + appliedOnSequenceNumber = startingSequenceNumber; + } + } + return super.apply(base, snapshot); + } + + @Override + public StreamingUpdate newBatch() { + if (batches.isEmpty() || !batches.get(batches.size() - 1).isEmpty()) { + // Only add a new batch if the there isn't one or there is one, and it's not empty + // Otherwise, we will have empty batches. + batches.add(new Batch()); + } + return this; + } + + @Override + public StreamingUpdate addFile(DataFile dataFile) { + getBatch().add(dataFile); + return this; + } + + @Override + public StreamingUpdate addFile(DeleteFile deleteFile) { + getBatch().add(deleteFile); + return this; + } + + private Batch getBatch() { + if (batches.isEmpty()) { + newBatch(); + } + return batches.get(batches.size() - 1); + } + + @Override + public BaseStreamingUpdate toBranch(String branch) { + targetBranch(branch); + return this; + } + + @Override + protected void cleanUncommitted(Set committed) { + // This is called when the commit fails and the caches are cleared, reset the state here so + // calling apply again will re-add the files + appliedOnSequenceNumber = -1L; + super.cleanUncommitted(committed); + } + + @Override + public StreamingUpdate validateFromSnapshot(long snapshotId) { + this.startingSnapshotId = snapshotId; + return this; + } + + @Override + public BaseStreamingUpdate conflictDetectionFilter(Expression newConflictDetectionFilter) { + this.conflictDetectionFilter = newConflictDetectionFilter; + return this; + } + + @Override + public BaseStreamingUpdate validateNoConflictingDataFiles() { + this.validateNewDataFiles = true; + return this; + } + + @Override + public BaseStreamingUpdate validateNoConflictingDeleteFiles() { + this.validateNewDeleteFiles = true; + return this; + } + + @Override + protected void validate(TableMetadata base, Snapshot parent) { + if (parent != null) { + if (startingSnapshotId != null) { + Preconditions.checkArgument( + SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot), + "Snapshot %s is not an ancestor of %s", + startingSnapshotId, + parent.snapshotId()); + } + + if (validateNewDataFiles) { + validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + } + + if (validateNewDeleteFiles) { + validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + } + } + } + + private static class Batch { + private final List newDataFiles = Lists.newArrayList(); + private final List newDeleteFiles = Lists.newArrayList(); + + /** Creates a new set of updates to a specific batch */ + Batch() {} + + void add(DataFile dataFile) { + newDataFiles.add(dataFile); + } + + void add(DeleteFile deleteFile) { + newDeleteFiles.add(deleteFile); + } + + List getNewDataFiles() { + return newDataFiles; + } + + List getNewDeleteFiles() { + return newDeleteFiles; + } + + boolean isEmpty() { + return newDataFiles.isEmpty() && newDeleteFiles.isEmpty(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index a44adc4e9035..8962ce3fa30b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -190,6 +190,11 @@ public RewriteFiles newRewrite() { return new BaseRewriteFiles(name, ops).reportWith(reporter); } + @Override + public StreamingUpdate newStreamingUpdate() { + return new BaseStreamingUpdate(name, ops).reportWith(reporter); + } + @Override public RewriteManifests rewriteManifests() { return new BaseRewriteManifests(ops).reportWith(reporter); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index eb8dbd2538e6..3b69fd592ef0 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -193,6 +193,16 @@ public RewriteFiles newRewrite() { return rewrite; } + @Override + public StreamingUpdate newStreamingUpdate() { + checkLastOperationCommitted("StreamingUpdate"); + StreamingUpdate streamingUpdate = + new BaseStreamingUpdate(tableName, transactionOps).reportWith(reporter); + streamingUpdate.deleteWith(enqueueDelete); + updates.add(streamingUpdate); + return streamingUpdate; + } + @Override public RewriteManifests rewriteManifests() { checkLastOperationCommitted("RewriteManifests"); @@ -712,6 +722,11 @@ public RewriteFiles newRewrite() { return BaseTransaction.this.newRewrite(); } + @Override + public StreamingUpdate newStreamingUpdate() { + return BaseTransaction.this.newStreamingUpdate(); + } + @Override public RewriteManifests rewriteManifests() { return BaseTransaction.this.rewriteManifests(); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 2ea7a4fd1ff1..b458ae3f3276 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -285,6 +285,16 @@ private void addDelete(FileHolder fileHolder) { } } + protected void clearFiles() { + newDataFilePaths.clear(); + newDataFilesBySpec.clear(); + newDeleteFilePaths.clear(); + newDeleteFilesBySpec.clear(); + addedFilesSummary.clear(); + hasNewDataFiles = false; + hasNewDeleteFiles = false; + } + /** Add all files in a manifest to the new snapshot. */ protected void add(ManifestFile manifest) { Preconditions.checkArgument( diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 082e50b840dc..7042356d11b3 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -348,6 +348,11 @@ public RewriteFiles newRewrite() { throw new UnsupportedOperationException(errorMsg("newRewrite")); } + @Override + public StreamingUpdate newStreamingUpdate() { + throw new UnsupportedOperationException("newStreamingWrite"); + } + @Override public RewriteManifests rewriteManifests() { throw new UnsupportedOperationException(errorMsg("rewriteManifests")); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 9f4bcbc6bba9..46e9a21ad692 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -224,12 +224,16 @@ protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {} */ protected abstract List apply(TableMetadata metadataToUpdate, Snapshot snapshot); + protected long nextSnapshotSequenceNumber(TableMetadata metadataToUpdate) { + return metadataToUpdate.nextSequenceNumber(); + } + @Override public Snapshot apply() { refresh(); Snapshot parentSnapshot = SnapshotUtil.latestSnapshot(base, targetBranch); - long sequenceNumber = base.nextSequenceNumber(); + long sequenceNumber = nextSnapshotSequenceNumber(base); Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId(); validate(base, parentSnapshot); diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index 2322062dad85..e95ef624cb95 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -677,9 +677,18 @@ static void validateManifestEntries( Iterator ids, Iterator expectedFiles, Iterator expectedStatuses) { + validateManifestEntries(manifest, ids, expectedFiles, expectedStatuses, null); + } + + static void validateManifestEntries( + ManifestFile manifest, + Iterator ids, + Iterator expectedFiles, + Iterator expectedStatuses, + Iterator expectedSequenceNumbers) { for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); - DataFile expected = expectedFiles.next(); + ContentFile expected = expectedFiles.next(); final ManifestEntry.Status expectedStatus = expectedStatuses.next(); assertThat(file.path().toString()) .as("Path should match expected") @@ -688,6 +697,11 @@ static void validateManifestEntries( .as("Snapshot ID should match expected ID") .isEqualTo(ids.next()); assertThat(entry.status()).as("Status should match expected").isEqualTo(expectedStatus); + if (expectedSequenceNumbers != null) { + assertThat(entry.dataSequenceNumber()) + .as("Entry status should match expected ID") + .isEqualTo(expectedSequenceNumbers.next()); + } } assertThat(expectedFiles).as("Should find all files in the manifest").isExhausted(); diff --git a/core/src/test/java/org/apache/iceberg/TestStreamingUpdate.java b/core/src/test/java/org/apache/iceberg/TestStreamingUpdate.java new file mode 100644 index 000000000000..63f034a75274 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestStreamingUpdate.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.ManifestEntry.Status.ADDED; +import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.Assert; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestStreamingUpdate extends TestBase { + + @Parameter(index = 1) + private String branch; + + @Parameters(name = "formatVersion = {0}, branch = {1}") + protected static List parameters() { + return Arrays.asList(new Object[] {2, "main"}, new Object[] {2, "testBranch"}); + } + + @TestTemplate + public void testAddBatches() { + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + StreamingUpdate streamingUpdate = + table + .newStreamingUpdate() + .addFile(FILE_A) + .addFile(FILE_A_DELETES) + .addFile(FILE_A2) + .newBatch() + .newBatch() // Extra call to new batch shouldn't mess things up + .addFile(FILE_B) + .newBatch() + .addFile(FILE_C) + .addFile(FILE_C2_DELETES); + + commit(table, streamingUpdate, branch); + + TableMetadata base = readMetadata(); + Snapshot snapshot = latestSnapshot(base, branch); + long snapshotId = snapshot.snapshotId(); + long snapshotSequenceNumber = snapshot.sequenceNumber(); + Assert.assertEquals( + "Should create only 2 manifests (1 write 1 delete)", + 2, + snapshot.allManifests(table.io()).size()); + + ManifestFile dataManifest = snapshot.allManifests(table.io()).get(0); + validateManifestEntries( + dataManifest, + ids(snapshotId, snapshotId, snapshotId, snapshotId), + files(FILE_A, FILE_A2, FILE_B, FILE_C), + statuses(ADDED, ADDED, ADDED, ADDED), + dataSeqs(1L, 1L, 2L, 3L)); + + ManifestFile deleteManifest = snapshot.allManifests(table.io()).get(1); + validateDeleteManifest( + deleteManifest, + dataSeqs(1L, 3L), + fileSeqs(snapshotSequenceNumber, snapshotSequenceNumber), + ids(snapshotId, snapshotId), + files(FILE_A_DELETES, FILE_C2_DELETES), + statuses(ADDED, ADDED)); + } + + @TestTemplate + public void testCommitConflict() { + assertThat(listManifestFiles().size()).as("Table should start empty").isEqualTo(0); + + StreamingUpdate streamingUpdate = + table.newStreamingUpdate().addFile(FILE_A).newBatch().addFile(FILE_B); + + Snapshot pending = apply(streamingUpdate, branch); + + validateManifestEntries( + pending.allManifests(table.io()).get(0), + ids(pending.snapshotId(), pending.snapshotId()), + files(FILE_A, FILE_B), + statuses(ADDED, ADDED, ADDED, ADDED), + dataSeqs(1L, 2L)); // Data sequence numbers should start at 1 + + Snapshot fastAppendSnapshot = commit(table, table.newFastAppend().appendFile(FILE_C), branch); + validateManifestEntries( + fastAppendSnapshot.allManifests(table.io()).get(0), + ids(fastAppendSnapshot.snapshotId()), + files(FILE_C), + statuses(ADDED), + dataSeqs(1L)); + + Snapshot snapshot = commit(table, streamingUpdate, branch); + + long snapshotId = snapshot.snapshotId(); + long snapshotSequenceNumber = snapshot.sequenceNumber(); + + Assert.assertEquals( + "Should update snapshot sequence number to be after both batches", + 3L, + snapshotSequenceNumber); + ManifestFile dataManifest = snapshot.allManifests(table.io()).get(0); + validateManifestEntries( + dataManifest, + ids(snapshotId, snapshotId, snapshotId, snapshotId), + files(FILE_A, FILE_B), + statuses(ADDED, ADDED, ADDED, ADDED), + dataSeqs(2L, 3L)); // Due to the conflict with the append we now start at 2 + } + + @TestTemplate + public void testFailureCleanup() { + + table.ops().failCommits(5); + + StreamingUpdate streamingUpdate = table.newStreamingUpdate(); + streamingUpdate.addFile(FILE_A); + streamingUpdate.addFile(FILE_A_DELETES); + + Snapshot pending = apply(streamingUpdate, branch); + + Assert.assertEquals("Should produce 2 manifests", 2, pending.allManifests(table.io()).size()); + ManifestFile manifest1 = pending.allManifests(table.io()).get(0); + ManifestFile manifest2 = pending.allManifests(table.io()).get(1); + + assertThatThrownBy(() -> commit(table, streamingUpdate, branch)) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); + + Assert.assertFalse("Should clean up new manifest", new File(manifest1.path()).exists()); + Assert.assertFalse("Should clean up new manifest", new File(manifest2.path()).exists()); + + // As commit failed all the manifests added with streaming update should be cleaned up + Assert.assertEquals("No manifests should remain", 0, listManifestFiles().size()); + } + + @TestTemplate + public void testRecovery() { + + table.ops().failCommits(3); + + StreamingUpdate streamingUpdate = + table.newStreamingUpdate().addFile(FILE_A).addFile(FILE_A_DELETES); + Snapshot pending = apply(streamingUpdate, branch); + + Assert.assertEquals("Should produce 2 manifests", 2, pending.allManifests(table.io()).size()); + ManifestFile manifest1 = pending.allManifests(table.io()).get(0); + ManifestFile manifest2 = pending.allManifests(table.io()).get(1); + + validateManifestEntries(manifest1, ids(pending.snapshotId()), files(FILE_A), statuses(ADDED)); + validateDeleteManifest( + manifest2, + dataSeqs(pending.sequenceNumber()), + fileSeqs(pending.sequenceNumber()), + ids(pending.snapshotId()), + files(FILE_A_DELETES), + statuses(ADDED)); + + commit(table, streamingUpdate, branch); + + Assert.assertTrue("Should reuse the manifest for appends", new File(manifest1.path()).exists()); + Assert.assertTrue( + "Should reuse the manifest with deletes", new File(manifest2.path()).exists()); + + TableMetadata metadata = readMetadata(); + Assert.assertTrue( + "Should commit the manifest for append", + latestSnapshot(metadata, branch).allManifests(table.io()).contains(manifest1)); + Assert.assertTrue( + "Should commit the manifest for delete", + latestSnapshot(metadata, branch).allManifests(table.io()).contains(manifest2)); + + // 1 for data file 1 for delete file + Assert.assertEquals("Only 2 manifests should exist", 2, listManifestFiles().size()); + } +}