Skip to content

Commit

Permalink
Table Operations: Added streaming update operation
Browse files Browse the repository at this point in the history
This operation allows adding multiple consecutive update in a single commit without equality deletes from prior updates affecting inserts that occurred after it.

Before this commit you would  need to do something like this:
```
for batch in batches:
    delta = transaction.newRowDelta()
    delta.add(batch.deletes)
    delta.add(batch.inserts)
    delta.commit()
transaction.commit()
```
Which produces many manifest files and is very IO intensive.

This operation allows:
```
update = table.newStreamingUpdate()
for batch, batchIndex in enumerate(batches):
    update.newBatch()
    update.add(batch.deleteFiles)
    update.add(batch.dataFiles)
update.commit()
```
  • Loading branch information
jasonf20 committed Jul 25, 2024
1 parent ddde08d commit 37cc30a
Show file tree
Hide file tree
Showing 12 changed files with 614 additions and 2 deletions.
129 changes: 129 additions & 0 deletions api/src/main/java/org/apache/iceberg/StreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;

/**
* API for appending sequential updates to a table
*
* <p>This API accumulates batches of file additions and deletions by order, produces a new {@link
* Snapshot} of the changes where each batch is added to a new data sequence number, and commits
* that snapshot as the current.
*
* <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
* If any of the deleted files are no longer in the latest snapshot when reattempting, the commit
* will throw a {@link ValidationException}.
*/
public interface StreamingUpdate extends SnapshotUpdate<StreamingUpdate> {

/**
* Start a new batch of changes. The changes in this batch will have a sequence number larger than
* the changes in the previous batches.
*
* @return this for method chaining
*/
default StreamingUpdate newBatch() {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement newBatch");
}

/**
* Add a new data file to the current batch. All files in this batch will receive the same data
* sequence number.
*
* @param dataFile a new data file
* @return this for method chaining
*/
default StreamingUpdate addFile(DataFile dataFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Add a new delete file to the current batch. All files in this batch will receive the same data
* sequence number.
*
* @param deleteFile a new delete file
* @return this for method chaining
*/
default StreamingUpdate addFile(DeleteFile deleteFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Set the snapshot ID used in any reads for this operation.
*
* <p>Validations will check changes after this snapshot ID. If the from snapshot is not set, all
* ancestor snapshots through the table's initial snapshot are validated.
*
* @param snapshotId a snapshot ID
* @return this for method chaining
*/
StreamingUpdate validateFromSnapshot(long snapshotId);

/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
*
* <p>If not called, a true literal will be used as the conflict detection filter.
*
* @param newConflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
StreamingUpdate conflictDetectionFilter(Expression newConflictDetectionFilter);

/**
* Enables validation that data files added concurrently do not conflict with this commit's
* operation.
*
* <p>This method should be called when the table is queried to determine which files to
* delete/append. If a concurrent operation commits a new file after the data was read and that
* file might contain rows matching the specified conflict detection filter, this operation will
* detect this during retries and fail.
*
* <p>Calling this method is required to maintain serializable isolation for update/delete
* operations. Otherwise, the isolation level will be snapshot isolation.
*
* <p>Validation uses the conflict detection filter passed to {@link
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the
* snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
StreamingUpdate validateNoConflictingDataFiles();

/**
* Enables validation that delete files added concurrently do not conflict with this commit's
* operation.
*
* <p>This method must be called when the table is queried to produce a row delta for UPDATE and
* MERGE operations independently of the isolation level. Calling this method isn't required for
* DELETE operations as it is OK to delete a record that is also deleted concurrently.
*
* <p>Validation uses the conflict detection filter passed to {@link
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the
* snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
StreamingUpdate validateNoConflictingDeleteFiles();
}
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table
* and commit.
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this
* table.
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public RewriteFiles newRewrite() {
throw new UnsupportedOperationException("Cannot rewrite in a " + descriptor + " table");
}

@Override
public StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException("Cannot update a " + descriptor + " table");
}

@Override
public RewriteManifests rewriteManifests() {
throw new UnsupportedOperationException(
Expand Down
205 changes: 205 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import java.util.Set;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.SnapshotUtil;

class BaseStreamingUpdate extends MergingSnapshotProducer<StreamingUpdate>
implements StreamingUpdate {
private final List<Batch> batches = Lists.newArrayList();

private Long appliedOnSequenceNumber = -1L;
private Long startingSnapshotId = null; // check all versions by default

private Expression conflictDetectionFilter = Expressions.alwaysTrue();
private boolean validateNewDataFiles;
private boolean validateNewDeleteFiles;

BaseStreamingUpdate(String tableName, TableOperations ops) {
super(tableName, ops);
}

@Override
protected BaseStreamingUpdate self() {
return this;
}

@Override
protected String operation() {
return DataOperations.OVERWRITE;
}

@Override
protected long nextSnapshotSequenceNumber(TableMetadata base) {
if (batches.isEmpty()) {
return super.nextSnapshotSequenceNumber(base);
}
// Each batch will advance the data sequence number by one, so we should advance the snapshot by
// the same amount.
// Otherwise, we will end up with data files with a sequence number larger than the snapshot
// sequence number.
return super.nextSnapshotSequenceNumber(base) + batches.size() - 1;
}

@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
if (!batches.isEmpty()) {
long startingSequenceNumber = base.nextSequenceNumber();
if (appliedOnSequenceNumber != startingSequenceNumber) {
// The base sequence number has changed, so we need to re-apply to re-calculate the sequence
// numbers
super.cleanUncommitted(EMPTY_SET); // Clean the old manifest files
super.clearFiles(); // Reset the files since they will be added again with new sequence
// numbers
int batchIndex = 0;
while (batchIndex < batches.size()) {
Batch batch = batches.get(batchIndex);
long dataSequenceNumber = startingSequenceNumber + batchIndex;
batch.getNewDataFiles().forEach(f -> add(f, dataSequenceNumber));
batch.getNewDeleteFiles().forEach(f -> add(f, dataSequenceNumber));
batchIndex += 1;
}
appliedOnSequenceNumber = startingSequenceNumber;
}
}
return super.apply(base, snapshot);
}

@Override
public StreamingUpdate newBatch() {
if (batches.isEmpty() || !batches.get(batches.size() - 1).isEmpty()) {
// Only add a new batch if the there isn't one or there is one, and it's not empty
// Otherwise, we will have empty batches.
batches.add(new Batch());
}
return this;
}

@Override
public StreamingUpdate addFile(DataFile dataFile) {
getBatch().add(dataFile);
return this;
}

@Override
public StreamingUpdate addFile(DeleteFile deleteFile) {
getBatch().add(deleteFile);
return this;
}

private Batch getBatch() {
if (batches.isEmpty()) {
newBatch();
}
return batches.get(batches.size() - 1);
}

@Override
public BaseStreamingUpdate toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
// This is called when the commit fails and the caches are cleared, reset the state here so
// calling apply again will re-add the files
appliedOnSequenceNumber = -1L;
super.cleanUncommitted(committed);
}

@Override
public StreamingUpdate validateFromSnapshot(long snapshotId) {
this.startingSnapshotId = snapshotId;
return this;
}

@Override
public BaseStreamingUpdate conflictDetectionFilter(Expression newConflictDetectionFilter) {
this.conflictDetectionFilter = newConflictDetectionFilter;
return this;
}

@Override
public BaseStreamingUpdate validateNoConflictingDataFiles() {
this.validateNewDataFiles = true;
return this;
}

@Override
public BaseStreamingUpdate validateNoConflictingDeleteFiles() {
this.validateNewDeleteFiles = true;
return this;
}

@Override
protected void validate(TableMetadata base, Snapshot parent) {
if (parent != null) {
if (startingSnapshotId != null) {
Preconditions.checkArgument(
SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot),
"Snapshot %s is not an ancestor of %s",
startingSnapshotId,
parent.snapshotId());
}

if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
}

if (validateNewDeleteFiles) {
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
}
}
}

private static class Batch {
private final List<DataFile> newDataFiles = Lists.newArrayList();
private final List<DeleteFile> newDeleteFiles = Lists.newArrayList();

/** Creates a new set of updates to a specific batch */
Batch() {}

void add(DataFile dataFile) {
newDataFiles.add(dataFile);
}

void add(DeleteFile deleteFile) {
newDeleteFiles.add(deleteFile);
}

List<DataFile> getNewDataFiles() {
return newDataFiles;
}

List<DeleteFile> getNewDeleteFiles() {
return newDeleteFiles;
}

boolean isEmpty() {
return newDataFiles.isEmpty() && newDeleteFiles.isEmpty();
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public RewriteFiles newRewrite() {
return new BaseRewriteFiles(name, ops).reportWith(reporter);
}

@Override
public StreamingUpdate newStreamingUpdate() {
return new BaseStreamingUpdate(name, ops).reportWith(reporter);
}

@Override
public RewriteManifests rewriteManifests() {
return new BaseRewriteManifests(ops).reportWith(reporter);
Expand Down
Loading

0 comments on commit 37cc30a

Please sign in to comment.