-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API: New API For sequential / streaming updates #9323
base: main
Are you sure you want to change the base?
Conversation
Benchmark The test uses just 10 batches, but increasing the batch count will increase the difference linearly. code object IcebergUpsert {
def main(args: Array[String]): Unit = {
val catalog = new GlueCatalog()
catalog.initialize("iceberg", Map.empty[String, String].asJava)
val schema = new Schema(
IcebergTypes.NestedField.required(1, "id", IcebergTypes.StringType.get()),
);
val tableName = "streams"
val tableId = TableIdentifier.of("prod_iceberg", tableName)
val basePath = s"s3://bucket/ice/tables/${tableName}/"
if (!catalog.tableExists(tableId)) {
catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), basePath, Map.empty[String, String].asJava)
}
val table = catalog.loadTable(tableId)
val updateBatches = (1 to 10).par.map(_ => {
writeFile(basePath, table) -> writeDeleteFile(basePath, table)
}).seq
println("Starting with rowDelta")
var startTime = System.currentTimeMillis()
val transaction = table.newTransaction()
updateBatches.foreach({
case (dataFile, deleteFile) =>
transaction.newRowDelta()
.addRows(dataFile)
.addDeletes(deleteFile)
.commit()
})
transaction.commitTransaction()
println(s"Row delta took ${System.currentTimeMillis() - startTime} millis")
table.refresh()
println(s"Table has ${table.currentSnapshot().allManifests(table.io()).size()} manifests")
println("Starting with streaming update")
startTime = System.currentTimeMillis()
val update = table.newStreamingUpdate()
updateBatches.foreach({
case (dataFile, deleteFile) =>
update.newBatch().addFile(dataFile).addFile(deleteFile)
})
update.commit()
println(s"Streaming update took ${System.currentTimeMillis() - startTime} millis")
table.refresh()
println(s"Table has ${table.currentSnapshot().allManifests(table.io()).size()} manifests")
}
private def writeFile(basePath: String, table: Table) = {
val writer = Parquet.writeData(
table.io().newOutputFile(basePath + UUID.randomUUID().toString + ".parquet"))
.forTable(table)
.overwrite(true)
.createWriterFunc(GenericParquetWriter.buildWriter)
.build[data.Record]()
writer.write(Iterable(GenericRecord.create(table.schema()).copy("id", "1")).asJava)
writer.close()
writer.toDataFile
}
private def writeDeleteFile(basePath: String, table: Table) = {
val writer = Parquet.writeDeletes(
table.io().newOutputFile(basePath + UUID.randomUUID().toString + ".parquet"))
.forTable(table)
.overwrite(true)
.createWriterFunc(GenericParquetWriter.buildWriter)
.equalityFieldIds(table.schema().columns().asScala.map(_.fieldId().asInstanceOf[Integer]).asJava)
.buildEqualityWriter[data.Record]()
writer.write(Iterable(GenericRecord.create(table.schema()).copy("id", "1")).asJava)
writer.close()
writer.toDeleteFile
}
} results
24 seconds and 20 manifests vs 4 seconds and 2 manifests |
@jasonf20, I don't quite understand the use case. It looks like the purpose is to commit multiple batches of data at the same time. Why would not not just use a single operation? Do you need to produce multiple sequence numbers but only want to commit once? |
3e67c95
to
f70ac14
Compare
@rdblue Correct, we need multiple (new) sequence sequence numbers since each batch has equality deletes that need to apply to prior batches, but not newer batches.
|
@jasonf20, to make that work, I think you'd need to keep track of a base sequence number and update the metadata for each new manifest with the correct sequence number when the manifest list is written. That sounds doable, but complicated. Can you explain how you're handling it in this PR? |
@rdblue Sure. I added support for setting the sequence number explicitly per file in Instead, we just collect an ordered list of files and only assign the sequence numbers when EDIT: I added a commit + test to fix the issue with retries. |
@jasonf20, explicitly setting the sequence number isn't safe. Sequence numbers are assigned when the client attempts to commit and must be updated if the client has to retry. You could make this work by writing separate manifest files for each interim commit and reassigning the sequence numbers for those in order, but that seems time consuming and would require large changes to the snapshot producer base class. Maybe we should catch up sometime to talk through what you're trying to accomplish. |
@rdblue I'm not setting it explicitly, I'm allowing the If I understand correctly that is what is happening already in What am I missing? Catching up sounds good, I'll ping you in Slack |
/** Add a data file to the new snapshot. */ | ||
protected void add(DataFile file, long dataSequenceNumber) { | ||
Preconditions.checkNotNull(file, "Invalid data file: null"); | ||
addDataFile(new FileHolder<>(file, dataSequenceNumber)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jasonf20, the caller can't reliably supply a sequence number. Let's talk about this on Tuesday. I think there's a way that you can update this class to do what you want, but it's going to be a bit more work than what you have here.
@rdblue Based on our discussions could you have another look at this? |
16b958d
to
968688d
Compare
This will allow re-using it for insert files for specific data versions in future commits
75b2f09
to
37cc30a
Compare
This operation allows adding multiple consecutive update in a single commit without equality deletes from prior updates affecting inserts that occurred after it. Before this commit you would need to do something like this: ``` for batch in batches: delta = transaction.newRowDelta() delta.add(batch.deletes) delta.add(batch.inserts) delta.commit() transaction.commit() ``` Which produces many manifest files and is very IO intensive. This operation allows: ``` update = table.newStreamingUpdate() for batch, batchIndex in enumerate(batches): update.newBatch() update.add(batch.deleteFiles) update.add(batch.dataFiles) update.commit() ```
37cc30a
to
c75d8b1
Compare
I rebased this PR again so it's mergable. Would appreciate reviving this PR. @rdblue Could you please have a look or let me know if there is someone else who could help? |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
We are still interested in merging this, it's been very helpful for performance on our end and should do the same for other streaming users. |
Explanation
Certain data production patterns can result in a bunch of micro-batch updates that need to be applied to the table sequentially. If these batches include updates they need to be committed to matching data sequence numbers to only apply the deletes of each batch to the previous batches.
Currently, this is achievable by creating a transaction and committing each batch
However, this is very slow since it produces a manifest file for each batch and writes that file out to the filesystem.
Instead, I propose an API that produces a single manifest with files at different data sequence numbers (like you would get after re-writing the manifests) immediately.
The API will produce 1 delete file and 1 data file manifest (or more if it gets too large) where each batch advances the data sequence number by 1. This way :
This PR adds this API.
I understand 1.5 is planned to be released in Jan. This might be a good time to include this if there is a version bump.
Use Cases
This is useful for use cases that involve frequent updates. The most common is CDC, but any upsert use case can benefit from this. Not all data producer patterns require this as some data producers may run on bigger batches of data and don't mind creating a manifest for each change, but if the producer is trying to reduce latency and processes the backlog concurrently it may end up with many smaller batches that need to be committed together.
Benchmark
Benchmark available in my first comment.