From 3e67c95e3fc9e0b762db003baad9b35052119b9b Mon Sep 17 00:00:00 2001 From: Jason Fine Date: Mon, 11 Dec 2023 18:59:54 +0200 Subject: [PATCH] Table Operations: Added streaming update operation This operation allows adding multiple consecutive update in a single commit without equality deletes from prior updates affecting inserts that occurred after it. Before this commit you would need to do something like this: ``` for batch in batches: delta = transaction.newRowDelta() delta.add(batch.deletes) delta.add(batch.inserts) delta.commit() transaction.commit() ``` Which produces many manifest files and is very IO intensive. This operation allows: ``` update = table.newStreamingUpdate() for batch, batchIndex in enumerate(batches): update.newBatch() update.add(batch.deleteFiles) update.add(batch.dataFiles) update.commit() ``` --- .../org/apache/iceberg/StreamingUpdate.java | 129 ++++++++++++ .../main/java/org/apache/iceberg/Table.java | 11 + .../java/org/apache/iceberg/Transaction.java | 11 + .../org/apache/iceberg/BaseReadOnlyTable.java | 5 + .../apache/iceberg/BaseStreamingUpdate.java | 198 ++++++++++++++++++ .../java/org/apache/iceberg/BaseTable.java | 5 + .../org/apache/iceberg/BaseTransaction.java | 15 ++ .../org/apache/iceberg/SerializableTable.java | 5 + .../org/apache/iceberg/SnapshotProducer.java | 6 +- .../org/apache/iceberg/TableTestBase.java | 17 +- .../apache/iceberg/TestStreamingUpdate.java | 161 ++++++++++++++ docs/java-api.md | 1 + 12 files changed, 562 insertions(+), 2 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/StreamingUpdate.java create mode 100644 core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java create mode 100644 core/src/test/java/org/apache/iceberg/TestStreamingUpdate.java diff --git a/api/src/main/java/org/apache/iceberg/StreamingUpdate.java b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java new file mode 100644 index 000000000000..65b8b07c1ba6 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; + +/** + * API for appending sequential updates to a table + * + *

This API accumulates batches of file additions and deletions by order, produces a new {@link + * Snapshot} of the changes where each batch is added to a new data sequence number, and commits + * that snapshot as the current. + * + *

When committing, these changes will be applied to the latest table snapshot. Commit conflicts + * will be resolved by applying the changes to the new latest snapshot and reattempting the commit. + * If any of the deleted files are no longer in the latest snapshot when reattempting, the commit + * will throw a {@link ValidationException}. + */ +public interface StreamingUpdate extends SnapshotUpdate { + + /** + * Start a new batch of changes. The changes in this batch will have a sequence number larger than + * the changes in the previous batches. + * + * @return this for method chaining + */ + default StreamingUpdate newBatch() { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement newBatch"); + } + + /** + * Add a new data file to the current batch. All files in this batch will receive the same data + * sequence number. + * + * @param dataFile a new data file + * @return this for method chaining + */ + default StreamingUpdate addFile(DataFile dataFile) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement addFile"); + } + + /** + * Add a new delete file to the current batch. All files in this batch will receive the same data + * sequence number. + * + * @param deleteFile a new delete file + * @return this for method chaining + */ + default StreamingUpdate addFile(DeleteFile deleteFile) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement addFile"); + } + + /** + * Set the snapshot ID used in any reads for this operation. + * + *

Validations will check changes after this snapshot ID. If the from snapshot is not set, all + * ancestor snapshots through the table's initial snapshot are validated. + * + * @param snapshotId a snapshot ID + * @return this for method chaining + */ + StreamingUpdate validateFromSnapshot(long snapshotId); + + /** + * Sets a conflict detection filter used to validate concurrently added data and delete files. + * + *

If not called, a true literal will be used as the conflict detection filter. + * + * @param conflictDetectionFilter an expression on rows in the table + * @return this for method chaining + */ + StreamingUpdate conflictDetectionFilter(Expression conflictDetectionFilter); + + /** + * Enables validation that data files added concurrently do not conflict with this commit's + * operation. + * + *

This method should be called when the table is queried to determine which files to + * delete/append. If a concurrent operation commits a new file after the data was read and that + * file might contain rows matching the specified conflict detection filter, this operation will + * detect this during retries and fail. + * + *

Calling this method is required to maintain serializable isolation for update/delete + * operations. Otherwise, the isolation level will be snapshot isolation. + * + *

Validation uses the conflict detection filter passed to {@link + * #conflictDetectionFilter(Expression)} and applies to operations that happened after the + * snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + StreamingUpdate validateNoConflictingDataFiles(); + + /** + * Enables validation that delete files added concurrently do not conflict with this commit's + * operation. + * + *

This method must be called when the table is queried to produce a row delta for UPDATE and + * MERGE operations independently of the isolation level. Calling this method isn't required for + * DELETE operations as it is OK to delete a record that is also deleted concurrently. + * + *

Validation uses the conflict detection filter passed to {@link + * #conflictDetectionFilter(Expression)} and applies to operations that happened after the + * snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + StreamingUpdate validateNoConflictingDeleteFiles(); +} diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 5ab1def51ca0..7683f1d59d84 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -233,6 +233,17 @@ default AppendFiles newFastAppend() { */ RewriteFiles newRewrite(); + /** + * Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the + * table. + * + * @return a new {@link StreamingUpdate} + */ + default StreamingUpdate newStreamingUpdate() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement newStreamingUpdate()"); + } + /** * Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table * and commit. diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/api/src/main/java/org/apache/iceberg/Transaction.java index aeec1f589d06..85d0267bd379 100644 --- a/api/src/main/java/org/apache/iceberg/Transaction.java +++ b/api/src/main/java/org/apache/iceberg/Transaction.java @@ -95,6 +95,17 @@ default AppendFiles newFastAppend() { */ RewriteFiles newRewrite(); + /** + * Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the + * table. + * + * @return a new {@link StreamingUpdate} + */ + default StreamingUpdate newStreamingUpdate() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement newStreamingUpdate()"); + } + /** * Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this * table. diff --git a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java index 4acbf2a16396..f4b015c16053 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java @@ -66,6 +66,11 @@ public RewriteFiles newRewrite() { throw new UnsupportedOperationException("Cannot rewrite in a " + descriptor + " table"); } + @Override + public StreamingUpdate newStreamingUpdate() { + throw new UnsupportedOperationException("Cannot update a " + descriptor + " table"); + } + @Override public RewriteManifests rewriteManifests() { throw new UnsupportedOperationException( diff --git a/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java new file mode 100644 index 000000000000..2e9bf4dfe970 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Set; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.SnapshotUtil; + +class BaseStreamingUpdate extends MergingSnapshotProducer + implements StreamingUpdate { + private final List batches = Lists.newArrayList(); + + private boolean requiresApply = true; + private Long startingSnapshotId = null; // check all versions by default + + private Expression conflictDetectionFilter = Expressions.alwaysTrue(); + private boolean validateNewDataFiles; + private boolean validateNewDeleteFiles; + + BaseStreamingUpdate(String tableName, TableOperations ops) { + super(tableName, ops); + } + + @Override + protected BaseStreamingUpdate self() { + return this; + } + + @Override + protected String operation() { + return DataOperations.OVERWRITE; + } + + @Override + protected long nextSnapshotSequenceNumber(TableMetadata base) { + if (batches.isEmpty()) { + return super.nextSnapshotSequenceNumber(base); + } + // Each batch will advance the data sequence number by one, so we should advance the snapshot by + // the same amount. + // Otherwise, we will end up with data files with a sequence number larger than the snapshot + // sequence number. + return super.nextSnapshotSequenceNumber(base) + batches.size() - 1; + } + + @Override + public List apply(TableMetadata base, Snapshot snapshot) { + if (requiresApply && !batches.isEmpty()) { + long startingSequenceNumber = base.nextSequenceNumber(); + int batchIndex = 0; + while (batchIndex < batches.size()) { + Batch batch = batches.get(batchIndex); + long dataSequenceNumber = startingSequenceNumber + batchIndex; + batch.getNewDataFiles().forEach(f -> add(f, dataSequenceNumber)); + batch.getNewDeleteFiles().forEach(f -> add(f, dataSequenceNumber)); + batchIndex += 1; + } + requiresApply = false; + } + return super.apply(base, snapshot); + } + + @Override + public StreamingUpdate newBatch() { + if (batches.isEmpty() || !batches.get(batches.size() - 1).isEmpty()) { + // Only add a new batch if the there isn't one or there is one, and it's not empty + // Otherwise, we will have empty batches. + batches.add(new Batch()); + } + return this; + } + + @Override + public StreamingUpdate addFile(DataFile dataFile) { + getBatch().add(dataFile); + return this; + } + + @Override + public StreamingUpdate addFile(DeleteFile deleteFile) { + getBatch().add(deleteFile); + return this; + } + + private Batch getBatch() { + if (batches.isEmpty()) { + newBatch(); + } + return batches.get(batches.size() - 1); + } + + @Override + public BaseStreamingUpdate toBranch(String branch) { + targetBranch(branch); + return this; + } + + @Override + protected void cleanUncommitted(Set committed) { + // This is called when the commit fails and the caches are cleared, reset the state here so + // calling apply again will re-add the files + requiresApply = true; + super.cleanUncommitted(committed); + } + + @Override + public StreamingUpdate validateFromSnapshot(long snapshotId) { + this.startingSnapshotId = snapshotId; + return this; + } + + @Override + public BaseStreamingUpdate conflictDetectionFilter(Expression conflictDetectionFilter) { + this.conflictDetectionFilter = conflictDetectionFilter; + return this; + } + + @Override + public BaseStreamingUpdate validateNoConflictingDataFiles() { + this.validateNewDataFiles = true; + return this; + } + + @Override + public BaseStreamingUpdate validateNoConflictingDeleteFiles() { + this.validateNewDeleteFiles = true; + return this; + } + + @Override + protected void validate(TableMetadata base, Snapshot parent) { + if (parent != null) { + if (startingSnapshotId != null) { + Preconditions.checkArgument( + SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot), + "Snapshot %s is not an ancestor of %s", + startingSnapshotId, + parent.snapshotId()); + } + + if (validateNewDataFiles) { + validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + } + + if (validateNewDeleteFiles) { + validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + } + } + } + + private static class Batch { + private final List newDataFiles = Lists.newArrayList(); + private final List newDeleteFiles = Lists.newArrayList(); + + /** Creates a new set of updates to a specific batch */ + Batch() {} + + void add(DataFile dataFile) { + newDataFiles.add(dataFile); + } + + void add(DeleteFile deleteFile) { + newDeleteFiles.add(deleteFile); + } + + List getNewDataFiles() { + return newDataFiles; + } + + List getNewDeleteFiles() { + return newDeleteFiles; + } + + boolean isEmpty() { + return newDataFiles.isEmpty() && newDeleteFiles.isEmpty(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index d4cf1f4b76f4..ccc018768347 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -190,6 +190,11 @@ public RewriteFiles newRewrite() { return new BaseRewriteFiles(name, ops).reportWith(reporter); } + @Override + public StreamingUpdate newStreamingUpdate() { + return new BaseStreamingUpdate(name, ops).reportWith(reporter); + } + @Override public RewriteManifests rewriteManifests() { return new BaseRewriteManifests(ops).reportWith(reporter); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 30103fd87fe2..cf263b755002 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -193,6 +193,16 @@ public RewriteFiles newRewrite() { return rewrite; } + @Override + public StreamingUpdate newStreamingUpdate() { + checkLastOperationCommitted("StreamingUpdate"); + StreamingUpdate streamingUpdate = + new BaseStreamingUpdate(tableName, transactionOps).reportWith(reporter); + streamingUpdate.deleteWith(enqueueDelete); + updates.add(streamingUpdate); + return streamingUpdate; + } + @Override public RewriteManifests rewriteManifests() { checkLastOperationCommitted("RewriteManifests"); @@ -703,6 +713,11 @@ public RewriteFiles newRewrite() { return BaseTransaction.this.newRewrite(); } + @Override + public StreamingUpdate newStreamingUpdate() { + return BaseTransaction.this.newStreamingUpdate(); + } + @Override public RewriteManifests rewriteManifests() { return BaseTransaction.this.rewriteManifests(); diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 778a993c5144..b1fbed43b626 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -331,6 +331,11 @@ public RewriteFiles newRewrite() { throw new UnsupportedOperationException(errorMsg("newRewrite")); } + @Override + public StreamingUpdate newStreamingUpdate() { + throw new UnsupportedOperationException("newStreamingWrite"); + } + @Override public RewriteManifests rewriteManifests() { throw new UnsupportedOperationException(errorMsg("rewriteManifests")); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 757d0b78bca7..8302ab07e06b 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -222,12 +222,16 @@ protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {} */ protected abstract List apply(TableMetadata metadataToUpdate, Snapshot snapshot); + protected long nextSnapshotSequenceNumber(TableMetadata metadataToUpdate) { + return metadataToUpdate.nextSequenceNumber(); + } + @Override public Snapshot apply() { refresh(); Snapshot parentSnapshot = SnapshotUtil.latestSnapshot(base, targetBranch); - long sequenceNumber = base.nextSequenceNumber(); + long sequenceNumber = nextSnapshotSequenceNumber(base); Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId(); validate(base, parentSnapshot); diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index c3db85910138..d46d7fee3919 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -670,14 +670,29 @@ static void validateManifestEntries( Iterator ids, Iterator expectedFiles, Iterator expectedStatuses) { + validateManifestEntries(manifest, ids, expectedFiles, expectedStatuses, null); + } + + static void validateManifestEntries( + ManifestFile manifest, + Iterator ids, + Iterator expectedFiles, + Iterator expectedStatuses, + Iterator expectedSequenceNumbers) { for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); - DataFile expected = expectedFiles.next(); + ContentFile expected = expectedFiles.next(); final ManifestEntry.Status expectedStatus = expectedStatuses.next(); Assert.assertEquals( "Path should match expected", expected.path().toString(), file.path().toString()); Assert.assertEquals("Snapshot ID should match expected ID", ids.next(), entry.snapshotId()); Assert.assertEquals("Entry status should match expected ID", expectedStatus, entry.status()); + if (expectedSequenceNumbers != null) { + Assert.assertEquals( + "Entry status should match expected ID", + expectedSequenceNumbers.next(), + entry.dataSequenceNumber()); + } } Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext()); diff --git a/core/src/test/java/org/apache/iceberg/TestStreamingUpdate.java b/core/src/test/java/org/apache/iceberg/TestStreamingUpdate.java new file mode 100644 index 000000000000..e36ee3db1abc --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestStreamingUpdate.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.ManifestEntry.Status.ADDED; +import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; + +import java.io.File; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestStreamingUpdate extends TableTestBase { + + private final String branch; + + @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}") + public static Object[] parameters() { + return new Object[][] { + new Object[] {2, "main"}, + new Object[] {2, "testBranch"} + }; + } + + public TestStreamingUpdate(int formatVersion, String branch) { + super(formatVersion); + this.branch = branch; + } + + @Test + public void testAddBatches() { + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + StreamingUpdate streamingUpdate = + table + .newStreamingUpdate() + .addFile(FILE_A) + .addFile(FILE_A_DELETES) + .addFile(FILE_A2) + .newBatch() + .newBatch() // Extra call to new batch shouldn't mess things up + .addFile(FILE_B) + .newBatch() + .addFile(FILE_C) + .addFile(FILE_C2_DELETES); + + commit(table, streamingUpdate, branch); + + TableMetadata base = readMetadata(); + Snapshot snapshot = latestSnapshot(base, branch); + long snapshotId = snapshot.snapshotId(); + long snapshotSequenceNumber = snapshot.sequenceNumber(); + Assert.assertEquals( + "Should create only 2 manifests (1 write 1 delete)", + 2, + snapshot.allManifests(table.io()).size()); + + ManifestFile dataManifest = snapshot.allManifests(table.io()).get(0); + validateManifestEntries( + dataManifest, + ids(snapshotId, snapshotId, snapshotId, snapshotId), + files(FILE_A, FILE_A2, FILE_B, FILE_C), + statuses(ADDED, ADDED, ADDED, ADDED), + dataSeqs(1L, 1L, 2L, 3L)); + + ManifestFile deleteManifest = snapshot.allManifests(table.io()).get(1); + validateDeleteManifest( + deleteManifest, + dataSeqs(1L, 3L), + fileSeqs(snapshotSequenceNumber, snapshotSequenceNumber), + ids(snapshotId, snapshotId), + files(FILE_A_DELETES, FILE_C2_DELETES), + statuses(ADDED, ADDED)); + } + + @Test + public void testFailureCleanup() { + + table.ops().failCommits(5); + + StreamingUpdate streamingUpdate = table.newStreamingUpdate(); + streamingUpdate.addFile(FILE_A); + streamingUpdate.addFile(FILE_A_DELETES); + + Snapshot pending = apply(streamingUpdate, branch); + + Assert.assertEquals("Should produce 2 manifests", 2, pending.allManifests(table.io()).size()); + ManifestFile manifest1 = pending.allManifests(table.io()).get(0); + ManifestFile manifest2 = pending.allManifests(table.io()).get(1); + + Assertions.assertThatThrownBy(() -> commit(table, streamingUpdate, branch)) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); + + Assert.assertFalse("Should clean up new manifest", new File(manifest1.path()).exists()); + Assert.assertFalse("Should clean up new manifest", new File(manifest2.path()).exists()); + + // As commit failed all the manifests added with streaming update should be cleaned up + Assert.assertEquals("No manifests should remain", 0, listManifestFiles().size()); + } + + @Test + public void testRecovery() { + + table.ops().failCommits(3); + + StreamingUpdate streamingUpdate = + table.newStreamingUpdate().addFile(FILE_A).addFile(FILE_A_DELETES); + Snapshot pending = apply(streamingUpdate, branch); + + Assert.assertEquals("Should produce 2 manifests", 2, pending.allManifests(table.io()).size()); + ManifestFile manifest1 = pending.allManifests(table.io()).get(0); + ManifestFile manifest2 = pending.allManifests(table.io()).get(1); + + validateManifestEntries(manifest1, ids(pending.snapshotId()), files(FILE_A), statuses(ADDED)); + validateDeleteManifest( + manifest2, + dataSeqs(pending.sequenceNumber()), + fileSeqs(pending.sequenceNumber()), + ids(pending.snapshotId()), + files(FILE_A_DELETES), + statuses(ADDED)); + + commit(table, streamingUpdate, branch); + + Assert.assertTrue("Should reuse the manifest for appends", new File(manifest1.path()).exists()); + Assert.assertTrue( + "Should reuse the manifest with deletes", new File(manifest2.path()).exists()); + + TableMetadata metadata = readMetadata(); + Assert.assertTrue( + "Should commit the manifest for append", + latestSnapshot(metadata, branch).allManifests(table.io()).contains(manifest1)); + Assert.assertTrue( + "Should commit the manifest for delete", + latestSnapshot(metadata, branch).allManifests(table.io()).contains(manifest2)); + + // 1 for data file 1 for delete file + Assert.assertEquals("Only 2 manifests should exist", 2, listManifestFiles().size()); + } +} diff --git a/docs/java-api.md b/docs/java-api.md index 62b51e096fb2..f0ae4a0802cb 100644 --- a/docs/java-api.md +++ b/docs/java-api.md @@ -128,6 +128,7 @@ Available operations to update a table are: * `newOverwrite` -- used to append data files and remove files that are overwritten * `newDelete` -- used to delete data files * `newRewrite` -- used to rewrite data files; will replace existing files with new versions +* `newStreamingUpdate` -- used to add sequential batches of updates; deletes in each batch will not affect rows in later batches * `newTransaction` -- create a new table-level transaction * `rewriteManifests` -- rewrite manifest data by clustering files, for faster scan planning * `rollback` -- rollback the table state to a specific snapshot