-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Table Operations: Added streaming update operation
This operation allows adding multiple consecutive update in a single commit without equality deletes from prior updates affecting inserts that occurred after it. Before this commit you would need to do something like this: ``` for batch in batches: delta = transaction.newRowDelta() delta.add(batch.deletes) delta.add(batch.inserts) delta.commit() transaction.commit() ``` Which produces many manifest files and is very IO intensive. This operation allows: ``` update = table.newStreamingUpdate() for batch, batchIndex in enumerate(batches): update.newBatch() update.add(batch.deleteFiles) update.add(batch.dataFiles) update.commit() ```
- Loading branch information
Showing
12 changed files
with
614 additions
and
2 deletions.
There are no files selected for viewing
129 changes: 129 additions & 0 deletions
129
api/src/main/java/org/apache/iceberg/StreamingUpdate.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg; | ||
|
||
import org.apache.iceberg.exceptions.ValidationException; | ||
import org.apache.iceberg.expressions.Expression; | ||
|
||
/** | ||
* API for appending sequential updates to a table | ||
* | ||
* <p>This API accumulates batches of file additions and deletions by order, produces a new {@link | ||
* Snapshot} of the changes where each batch is added to a new data sequence number, and commits | ||
* that snapshot as the current. | ||
* | ||
* <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts | ||
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit. | ||
* If any of the deleted files are no longer in the latest snapshot when reattempting, the commit | ||
* will throw a {@link ValidationException}. | ||
*/ | ||
public interface StreamingUpdate extends SnapshotUpdate<StreamingUpdate> { | ||
|
||
/** | ||
* Start a new batch of changes. The changes in this batch will have a sequence number larger than | ||
* the changes in the previous batches. | ||
* | ||
* @return this for method chaining | ||
*/ | ||
default StreamingUpdate newBatch() { | ||
throw new UnsupportedOperationException( | ||
this.getClass().getName() + " does not implement newBatch"); | ||
} | ||
|
||
/** | ||
* Add a new data file to the current batch. All files in this batch will receive the same data | ||
* sequence number. | ||
* | ||
* @param dataFile a new data file | ||
* @return this for method chaining | ||
*/ | ||
default StreamingUpdate addFile(DataFile dataFile) { | ||
throw new UnsupportedOperationException( | ||
this.getClass().getName() + " does not implement addFile"); | ||
} | ||
|
||
/** | ||
* Add a new delete file to the current batch. All files in this batch will receive the same data | ||
* sequence number. | ||
* | ||
* @param deleteFile a new delete file | ||
* @return this for method chaining | ||
*/ | ||
default StreamingUpdate addFile(DeleteFile deleteFile) { | ||
throw new UnsupportedOperationException( | ||
this.getClass().getName() + " does not implement addFile"); | ||
} | ||
|
||
/** | ||
* Set the snapshot ID used in any reads for this operation. | ||
* | ||
* <p>Validations will check changes after this snapshot ID. If the from snapshot is not set, all | ||
* ancestor snapshots through the table's initial snapshot are validated. | ||
* | ||
* @param snapshotId a snapshot ID | ||
* @return this for method chaining | ||
*/ | ||
StreamingUpdate validateFromSnapshot(long snapshotId); | ||
|
||
/** | ||
* Sets a conflict detection filter used to validate concurrently added data and delete files. | ||
* | ||
* <p>If not called, a true literal will be used as the conflict detection filter. | ||
* | ||
* @param newConflictDetectionFilter an expression on rows in the table | ||
* @return this for method chaining | ||
*/ | ||
StreamingUpdate conflictDetectionFilter(Expression newConflictDetectionFilter); | ||
|
||
/** | ||
* Enables validation that data files added concurrently do not conflict with this commit's | ||
* operation. | ||
* | ||
* <p>This method should be called when the table is queried to determine which files to | ||
* delete/append. If a concurrent operation commits a new file after the data was read and that | ||
* file might contain rows matching the specified conflict detection filter, this operation will | ||
* detect this during retries and fail. | ||
* | ||
* <p>Calling this method is required to maintain serializable isolation for update/delete | ||
* operations. Otherwise, the isolation level will be snapshot isolation. | ||
* | ||
* <p>Validation uses the conflict detection filter passed to {@link | ||
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the | ||
* snapshot passed to {@link #validateFromSnapshot(long)}. | ||
* | ||
* @return this for method chaining | ||
*/ | ||
StreamingUpdate validateNoConflictingDataFiles(); | ||
|
||
/** | ||
* Enables validation that delete files added concurrently do not conflict with this commit's | ||
* operation. | ||
* | ||
* <p>This method must be called when the table is queried to produce a row delta for UPDATE and | ||
* MERGE operations independently of the isolation level. Calling this method isn't required for | ||
* DELETE operations as it is OK to delete a record that is also deleted concurrently. | ||
* | ||
* <p>Validation uses the conflict detection filter passed to {@link | ||
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the | ||
* snapshot passed to {@link #validateFromSnapshot(long)}. | ||
* | ||
* @return this for method chaining | ||
*/ | ||
StreamingUpdate validateNoConflictingDeleteFiles(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
205 changes: 205 additions & 0 deletions
205
core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg; | ||
|
||
import java.util.List; | ||
import java.util.Set; | ||
import org.apache.iceberg.expressions.Expression; | ||
import org.apache.iceberg.expressions.Expressions; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.util.SnapshotUtil; | ||
|
||
class BaseStreamingUpdate extends MergingSnapshotProducer<StreamingUpdate> | ||
implements StreamingUpdate { | ||
private final List<Batch> batches = Lists.newArrayList(); | ||
|
||
private Long appliedOnSequenceNumber = -1L; | ||
private Long startingSnapshotId = null; // check all versions by default | ||
|
||
private Expression conflictDetectionFilter = Expressions.alwaysTrue(); | ||
private boolean validateNewDataFiles; | ||
private boolean validateNewDeleteFiles; | ||
|
||
BaseStreamingUpdate(String tableName, TableOperations ops) { | ||
super(tableName, ops); | ||
} | ||
|
||
@Override | ||
protected BaseStreamingUpdate self() { | ||
return this; | ||
} | ||
|
||
@Override | ||
protected String operation() { | ||
return DataOperations.OVERWRITE; | ||
} | ||
|
||
@Override | ||
protected long nextSnapshotSequenceNumber(TableMetadata base) { | ||
if (batches.isEmpty()) { | ||
return super.nextSnapshotSequenceNumber(base); | ||
} | ||
// Each batch will advance the data sequence number by one, so we should advance the snapshot by | ||
// the same amount. | ||
// Otherwise, we will end up with data files with a sequence number larger than the snapshot | ||
// sequence number. | ||
return super.nextSnapshotSequenceNumber(base) + batches.size() - 1; | ||
} | ||
|
||
@Override | ||
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) { | ||
if (!batches.isEmpty()) { | ||
long startingSequenceNumber = base.nextSequenceNumber(); | ||
if (appliedOnSequenceNumber != startingSequenceNumber) { | ||
// The base sequence number has changed, so we need to re-apply to re-calculate the sequence | ||
// numbers | ||
super.cleanUncommitted(EMPTY_SET); // Clean the old manifest files | ||
super.clearFiles(); // Reset the files since they will be added again with new sequence | ||
// numbers | ||
int batchIndex = 0; | ||
while (batchIndex < batches.size()) { | ||
Batch batch = batches.get(batchIndex); | ||
long dataSequenceNumber = startingSequenceNumber + batchIndex; | ||
batch.getNewDataFiles().forEach(f -> add(f, dataSequenceNumber)); | ||
batch.getNewDeleteFiles().forEach(f -> add(f, dataSequenceNumber)); | ||
batchIndex += 1; | ||
} | ||
appliedOnSequenceNumber = startingSequenceNumber; | ||
} | ||
} | ||
return super.apply(base, snapshot); | ||
} | ||
|
||
@Override | ||
public StreamingUpdate newBatch() { | ||
if (batches.isEmpty() || !batches.get(batches.size() - 1).isEmpty()) { | ||
// Only add a new batch if the there isn't one or there is one, and it's not empty | ||
// Otherwise, we will have empty batches. | ||
batches.add(new Batch()); | ||
} | ||
return this; | ||
} | ||
|
||
@Override | ||
public StreamingUpdate addFile(DataFile dataFile) { | ||
getBatch().add(dataFile); | ||
return this; | ||
} | ||
|
||
@Override | ||
public StreamingUpdate addFile(DeleteFile deleteFile) { | ||
getBatch().add(deleteFile); | ||
return this; | ||
} | ||
|
||
private Batch getBatch() { | ||
if (batches.isEmpty()) { | ||
newBatch(); | ||
} | ||
return batches.get(batches.size() - 1); | ||
} | ||
|
||
@Override | ||
public BaseStreamingUpdate toBranch(String branch) { | ||
targetBranch(branch); | ||
return this; | ||
} | ||
|
||
@Override | ||
protected void cleanUncommitted(Set<ManifestFile> committed) { | ||
// This is called when the commit fails and the caches are cleared, reset the state here so | ||
// calling apply again will re-add the files | ||
appliedOnSequenceNumber = -1L; | ||
super.cleanUncommitted(committed); | ||
} | ||
|
||
@Override | ||
public StreamingUpdate validateFromSnapshot(long snapshotId) { | ||
this.startingSnapshotId = snapshotId; | ||
return this; | ||
} | ||
|
||
@Override | ||
public BaseStreamingUpdate conflictDetectionFilter(Expression newConflictDetectionFilter) { | ||
this.conflictDetectionFilter = newConflictDetectionFilter; | ||
return this; | ||
} | ||
|
||
@Override | ||
public BaseStreamingUpdate validateNoConflictingDataFiles() { | ||
this.validateNewDataFiles = true; | ||
return this; | ||
} | ||
|
||
@Override | ||
public BaseStreamingUpdate validateNoConflictingDeleteFiles() { | ||
this.validateNewDeleteFiles = true; | ||
return this; | ||
} | ||
|
||
@Override | ||
protected void validate(TableMetadata base, Snapshot parent) { | ||
if (parent != null) { | ||
if (startingSnapshotId != null) { | ||
Preconditions.checkArgument( | ||
SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot), | ||
"Snapshot %s is not an ancestor of %s", | ||
startingSnapshotId, | ||
parent.snapshotId()); | ||
} | ||
|
||
if (validateNewDataFiles) { | ||
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent); | ||
} | ||
|
||
if (validateNewDeleteFiles) { | ||
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent); | ||
} | ||
} | ||
} | ||
|
||
private static class Batch { | ||
private final List<DataFile> newDataFiles = Lists.newArrayList(); | ||
private final List<DeleteFile> newDeleteFiles = Lists.newArrayList(); | ||
|
||
/** Creates a new set of updates to a specific batch */ | ||
Batch() {} | ||
|
||
void add(DataFile dataFile) { | ||
newDataFiles.add(dataFile); | ||
} | ||
|
||
void add(DeleteFile deleteFile) { | ||
newDeleteFiles.add(deleteFile); | ||
} | ||
|
||
List<DataFile> getNewDataFiles() { | ||
return newDataFiles; | ||
} | ||
|
||
List<DeleteFile> getNewDeleteFiles() { | ||
return newDeleteFiles; | ||
} | ||
|
||
boolean isEmpty() { | ||
return newDataFiles.isEmpty() && newDeleteFiles.isEmpty(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.