Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: New API For sequential / streaming updates #9323

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions api/src/main/java/org/apache/iceberg/StreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;

/**
* API for appending sequential updates to a table
*
* <p>This API accumulates batches of file additions and deletions by order, produces a new {@link
* Snapshot} of the changes where each batch is added to a new data sequence number, and commits
* that snapshot as the current.
*
* <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
* If any of the deleted files are no longer in the latest snapshot when reattempting, the commit
* will throw a {@link ValidationException}.
*/
public interface StreamingUpdate extends SnapshotUpdate<StreamingUpdate> {

/**
* Start a new batch of changes. The changes in this batch will have a sequence number larger than
* the changes in the previous batches.
*
* @return this for method chaining
*/
default StreamingUpdate newBatch() {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement newBatch");
}

/**
* Add a new data file to the current batch. All files in this batch will receive the same data
* sequence number.
*
* @param dataFile a new data file
* @return this for method chaining
*/
default StreamingUpdate addFile(DataFile dataFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Add a new delete file to the current batch. All files in this batch will receive the same data
* sequence number.
*
* @param deleteFile a new delete file
* @return this for method chaining
*/
default StreamingUpdate addFile(DeleteFile deleteFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Set the snapshot ID used in any reads for this operation.
*
* <p>Validations will check changes after this snapshot ID. If the from snapshot is not set, all
* ancestor snapshots through the table's initial snapshot are validated.
*
* @param snapshotId a snapshot ID
* @return this for method chaining
*/
StreamingUpdate validateFromSnapshot(long snapshotId);

/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
*
* <p>If not called, a true literal will be used as the conflict detection filter.
*
* @param newConflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
StreamingUpdate conflictDetectionFilter(Expression newConflictDetectionFilter);

/**
* Enables validation that data files added concurrently do not conflict with this commit's
* operation.
*
* <p>This method should be called when the table is queried to determine which files to
* delete/append. If a concurrent operation commits a new file after the data was read and that
* file might contain rows matching the specified conflict detection filter, this operation will
* detect this during retries and fail.
*
* <p>Calling this method is required to maintain serializable isolation for update/delete
* operations. Otherwise, the isolation level will be snapshot isolation.
*
* <p>Validation uses the conflict detection filter passed to {@link
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the
* snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
StreamingUpdate validateNoConflictingDataFiles();

/**
* Enables validation that delete files added concurrently do not conflict with this commit's
* operation.
*
* <p>This method must be called when the table is queried to produce a row delta for UPDATE and
* MERGE operations independently of the isolation level. Calling this method isn't required for
* DELETE operations as it is OK to delete a record that is also deleted concurrently.
*
* <p>Validation uses the conflict detection filter passed to {@link
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the
* snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
StreamingUpdate validateNoConflictingDeleteFiles();
}
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table
* and commit.
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this
* table.
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public RewriteFiles newRewrite() {
throw new UnsupportedOperationException("Cannot rewrite in a " + descriptor + " table");
}

@Override
public StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException("Cannot update a " + descriptor + " table");
}

@Override
public RewriteManifests rewriteManifests() {
throw new UnsupportedOperationException(
Expand Down
205 changes: 205 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import java.util.Set;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.SnapshotUtil;

class BaseStreamingUpdate extends MergingSnapshotProducer<StreamingUpdate>
implements StreamingUpdate {
private final List<Batch> batches = Lists.newArrayList();

private Long appliedOnSequenceNumber = -1L;
private Long startingSnapshotId = null; // check all versions by default

private Expression conflictDetectionFilter = Expressions.alwaysTrue();
private boolean validateNewDataFiles;
private boolean validateNewDeleteFiles;

BaseStreamingUpdate(String tableName, TableOperations ops) {
super(tableName, ops);
}

@Override
protected BaseStreamingUpdate self() {
return this;
}

@Override
protected String operation() {
return DataOperations.OVERWRITE;
}

@Override
protected long nextSnapshotSequenceNumber(TableMetadata base) {
if (batches.isEmpty()) {
return super.nextSnapshotSequenceNumber(base);
}
// Each batch will advance the data sequence number by one, so we should advance the snapshot by
// the same amount.
// Otherwise, we will end up with data files with a sequence number larger than the snapshot
// sequence number.
return super.nextSnapshotSequenceNumber(base) + batches.size() - 1;
}

@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
if (!batches.isEmpty()) {
long startingSequenceNumber = base.nextSequenceNumber();
if (appliedOnSequenceNumber != startingSequenceNumber) {
// The base sequence number has changed, so we need to re-apply to re-calculate the sequence
// numbers
super.cleanUncommitted(EMPTY_SET); // Clean the old manifest files
super.clearFiles(); // Reset the files since they will be added again with new sequence
// numbers
int batchIndex = 0;
while (batchIndex < batches.size()) {
Batch batch = batches.get(batchIndex);
long dataSequenceNumber = startingSequenceNumber + batchIndex;
batch.getNewDataFiles().forEach(f -> add(f, dataSequenceNumber));
batch.getNewDeleteFiles().forEach(f -> add(f, dataSequenceNumber));
batchIndex += 1;
}
appliedOnSequenceNumber = startingSequenceNumber;
}
}
return super.apply(base, snapshot);
}

@Override
public StreamingUpdate newBatch() {
if (batches.isEmpty() || !batches.get(batches.size() - 1).isEmpty()) {
// Only add a new batch if the there isn't one or there is one, and it's not empty
// Otherwise, we will have empty batches.
batches.add(new Batch());
}
return this;
}

@Override
public StreamingUpdate addFile(DataFile dataFile) {
getBatch().add(dataFile);
return this;
}

@Override
public StreamingUpdate addFile(DeleteFile deleteFile) {
getBatch().add(deleteFile);
return this;
}

private Batch getBatch() {
if (batches.isEmpty()) {
newBatch();
}
return batches.get(batches.size() - 1);
}

@Override
public BaseStreamingUpdate toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
// This is called when the commit fails and the caches are cleared, reset the state here so
// calling apply again will re-add the files
appliedOnSequenceNumber = -1L;
super.cleanUncommitted(committed);
}

@Override
public StreamingUpdate validateFromSnapshot(long snapshotId) {
this.startingSnapshotId = snapshotId;
return this;
}

@Override
public BaseStreamingUpdate conflictDetectionFilter(Expression newConflictDetectionFilter) {
this.conflictDetectionFilter = newConflictDetectionFilter;
return this;
}

@Override
public BaseStreamingUpdate validateNoConflictingDataFiles() {
this.validateNewDataFiles = true;
return this;
}

@Override
public BaseStreamingUpdate validateNoConflictingDeleteFiles() {
this.validateNewDeleteFiles = true;
return this;
}

@Override
protected void validate(TableMetadata base, Snapshot parent) {
if (parent != null) {
if (startingSnapshotId != null) {
Preconditions.checkArgument(
SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot),
"Snapshot %s is not an ancestor of %s",
startingSnapshotId,
parent.snapshotId());
}

if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
}

if (validateNewDeleteFiles) {
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
}
}
}

private static class Batch {
private final List<DataFile> newDataFiles = Lists.newArrayList();
private final List<DeleteFile> newDeleteFiles = Lists.newArrayList();

/** Creates a new set of updates to a specific batch */
Batch() {}

void add(DataFile dataFile) {
newDataFiles.add(dataFile);
}

void add(DeleteFile deleteFile) {
newDeleteFiles.add(deleteFile);
}

List<DataFile> getNewDataFiles() {
return newDataFiles;
}

List<DeleteFile> getNewDeleteFiles() {
return newDeleteFiles;
}

boolean isEmpty() {
return newDataFiles.isEmpty() && newDeleteFiles.isEmpty();
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public RewriteFiles newRewrite() {
return new BaseRewriteFiles(name, ops).reportWith(reporter);
}

@Override
public StreamingUpdate newStreamingUpdate() {
return new BaseStreamingUpdate(name, ops).reportWith(reporter);
}

@Override
public RewriteManifests rewriteManifests() {
return new BaseRewriteManifests(ops).reportWith(reporter);
Expand Down
Loading
Loading