diff --git a/api/src/main/java/org/apache/iceberg/StreamingUpdate.java b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java new file mode 100644 index 000000000000..65b8b07c1ba6 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; + +/** + * API for appending sequential updates to a table + * + *
This API accumulates batches of file additions and deletions by order, produces a new {@link + * Snapshot} of the changes where each batch is added to a new data sequence number, and commits + * that snapshot as the current. + * + *
When committing, these changes will be applied to the latest table snapshot. Commit conflicts
+ * will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
+ * If any of the deleted files are no longer in the latest snapshot when reattempting, the commit
+ * will throw a {@link ValidationException}.
+ */
+public interface StreamingUpdate extends SnapshotUpdate Validations will check changes after this snapshot ID. If the from snapshot is not set, all
+ * ancestor snapshots through the table's initial snapshot are validated.
+ *
+ * @param snapshotId a snapshot ID
+ * @return this for method chaining
+ */
+ StreamingUpdate validateFromSnapshot(long snapshotId);
+
+ /**
+ * Sets a conflict detection filter used to validate concurrently added data and delete files.
+ *
+ * If not called, a true literal will be used as the conflict detection filter.
+ *
+ * @param conflictDetectionFilter an expression on rows in the table
+ * @return this for method chaining
+ */
+ StreamingUpdate conflictDetectionFilter(Expression conflictDetectionFilter);
+
+ /**
+ * Enables validation that data files added concurrently do not conflict with this commit's
+ * operation.
+ *
+ * This method should be called when the table is queried to determine which files to
+ * delete/append. If a concurrent operation commits a new file after the data was read and that
+ * file might contain rows matching the specified conflict detection filter, this operation will
+ * detect this during retries and fail.
+ *
+ * Calling this method is required to maintain serializable isolation for update/delete
+ * operations. Otherwise, the isolation level will be snapshot isolation.
+ *
+ * Validation uses the conflict detection filter passed to {@link
+ * #conflictDetectionFilter(Expression)} and applies to operations that happened after the
+ * snapshot passed to {@link #validateFromSnapshot(long)}.
+ *
+ * @return this for method chaining
+ */
+ StreamingUpdate validateNoConflictingDataFiles();
+
+ /**
+ * Enables validation that delete files added concurrently do not conflict with this commit's
+ * operation.
+ *
+ * This method must be called when the table is queried to produce a row delta for UPDATE and
+ * MERGE operations independently of the isolation level. Calling this method isn't required for
+ * DELETE operations as it is OK to delete a record that is also deleted concurrently.
+ *
+ * Validation uses the conflict detection filter passed to {@link
+ * #conflictDetectionFilter(Expression)} and applies to operations that happened after the
+ * snapshot passed to {@link #validateFromSnapshot(long)}.
+ *
+ * @return this for method chaining
+ */
+ StreamingUpdate validateNoConflictingDeleteFiles();
+}
diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java
index 5ab1def51ca0..7683f1d59d84 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -233,6 +233,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();
+ /**
+ * Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
+ * table.
+ *
+ * @return a new {@link StreamingUpdate}
+ */
+ default StreamingUpdate newStreamingUpdate() {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement newStreamingUpdate()");
+ }
+
/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table
* and commit.
diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/api/src/main/java/org/apache/iceberg/Transaction.java
index aeec1f589d06..85d0267bd379 100644
--- a/api/src/main/java/org/apache/iceberg/Transaction.java
+++ b/api/src/main/java/org/apache/iceberg/Transaction.java
@@ -95,6 +95,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();
+ /**
+ * Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
+ * table.
+ *
+ * @return a new {@link StreamingUpdate}
+ */
+ default StreamingUpdate newStreamingUpdate() {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement newStreamingUpdate()");
+ }
+
/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this
* table.
diff --git a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
index 4acbf2a16396..f4b015c16053 100644
--- a/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
@@ -66,6 +66,11 @@ public RewriteFiles newRewrite() {
throw new UnsupportedOperationException("Cannot rewrite in a " + descriptor + " table");
}
+ @Override
+ public StreamingUpdate newStreamingUpdate() {
+ throw new UnsupportedOperationException("Cannot update a " + descriptor + " table");
+ }
+
@Override
public RewriteManifests rewriteManifests() {
throw new UnsupportedOperationException(
diff --git a/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
new file mode 100644
index 000000000000..2e9bf4dfe970
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.SnapshotUtil;
+
+class BaseStreamingUpdate extends MergingSnapshotProducer