Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: New API For sequential / streaming updates #9323

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

jasonf20
Copy link
Contributor

@jasonf20 jasonf20 commented Dec 17, 2023

Explanation
Certain data production patterns can result in a bunch of micro-batch updates that need to be applied to the table sequentially. If these batches include updates they need to be committed to matching data sequence numbers to only apply the deletes of each batch to the previous batches.

Currently, this is achievable by creating a transaction and committing each batch

for batch in batches:
    delta = transaction.newRowDelta()
    delta.add(batch.deletes)
    delta.add(batch.inserts)
    delta.commit()
transaction.commit()

However, this is very slow since it produces a manifest file for each batch and writes that file out to the filesystem.

Instead, I propose an API that produces a single manifest with files at different data sequence numbers (like you would get after re-writing the manifests) immediately.

update = table.newStreamingUpdate()
for batch, batchIndex in enumerate(batches):
    update.newBatch()
    update.add(batch.deleteFiles)
    update.add(batch.dataFiles)
update.commit()

The API will produce 1 delete file and 1 data file manifest (or more if it gets too large) where each batch advances the data sequence number by 1. This way :

  • Deletes for previous batches don't apply to new data.
  • Deletes do apply for all data written before the delete.

This PR adds this API.

I understand 1.5 is planned to be released in Jan. This might be a good time to include this if there is a version bump.

Use Cases
This is useful for use cases that involve frequent updates. The most common is CDC, but any upsert use case can benefit from this. Not all data producer patterns require this as some data producers may run on bigger batches of data and don't mind creating a manifest for each change, but if the producer is trying to reduce latency and processes the backlog concurrently it may end up with many smaller batches that need to be committed together.

Benchmark
Benchmark available in my first comment.

@jasonf20
Copy link
Contributor Author

jasonf20 commented Dec 17, 2023

Benchmark
The following test was run locally just to demonstrate that the difference in IO performance is very significant. While the transaction approach IO grows linearly with the number of batches, the Streaming API has constant IO overhead.

The test uses just 10 batches, but increasing the batch count will increase the difference linearly.

code

object IcebergUpsert {


  def main(args: Array[String]): Unit = {
    val catalog = new GlueCatalog()
    catalog.initialize("iceberg", Map.empty[String, String].asJava)

    val schema = new Schema(
      IcebergTypes.NestedField.required(1, "id", IcebergTypes.StringType.get()),
    );
    val tableName = "streams"
    val tableId = TableIdentifier.of("prod_iceberg", tableName)
    val basePath = s"s3://bucket/ice/tables/${tableName}/"

    if (!catalog.tableExists(tableId)) {
      catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), basePath, Map.empty[String, String].asJava)
    }

    val table = catalog.loadTable(tableId)

    val updateBatches = (1 to 10).par.map(_ => {
      writeFile(basePath, table) -> writeDeleteFile(basePath, table)
    }).seq

    println("Starting with rowDelta")
    var startTime = System.currentTimeMillis()
    val transaction = table.newTransaction()
    updateBatches.foreach({
      case (dataFile, deleteFile) =>
        transaction.newRowDelta()
          .addRows(dataFile)
          .addDeletes(deleteFile)
          .commit()
    })
    transaction.commitTransaction()
    println(s"Row delta took ${System.currentTimeMillis() - startTime} millis")
    table.refresh()
    println(s"Table has ${table.currentSnapshot().allManifests(table.io()).size()} manifests")

    println("Starting with streaming update")
    startTime = System.currentTimeMillis()
    val update = table.newStreamingUpdate()
    updateBatches.foreach({
      case (dataFile, deleteFile) =>
        update.newBatch().addFile(dataFile).addFile(deleteFile)
    })
    update.commit()
    println(s"Streaming update took ${System.currentTimeMillis() - startTime} millis")
    table.refresh()
    println(s"Table has ${table.currentSnapshot().allManifests(table.io()).size()} manifests")
  }

  private def writeFile(basePath: String, table: Table) = {
    val writer = Parquet.writeData(
        table.io().newOutputFile(basePath + UUID.randomUUID().toString + ".parquet"))
      .forTable(table)
      .overwrite(true)
      .createWriterFunc(GenericParquetWriter.buildWriter)
      .build[data.Record]()
    writer.write(Iterable(GenericRecord.create(table.schema()).copy("id", "1")).asJava)
    writer.close()
    writer.toDataFile
  }

  private def writeDeleteFile(basePath: String, table: Table) = {
    val writer = Parquet.writeDeletes(
        table.io().newOutputFile(basePath + UUID.randomUUID().toString + ".parquet"))
      .forTable(table)
      .overwrite(true)
      .createWriterFunc(GenericParquetWriter.buildWriter)
      .equalityFieldIds(table.schema().columns().asScala.map(_.fieldId().asInstanceOf[Integer]).asJava)
      .buildEqualityWriter[data.Record]()
    writer.write(Iterable(GenericRecord.create(table.schema()).copy("id", "1")).asJava)
    writer.close()
    writer.toDeleteFile
  }
}

results

Starting with rowDelta
2023-12-15 14:40:34,801 8637 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 3558899585328564531 (BaseRowDelta)
2023-12-15 14:40:35,219 9055 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=3558899585328564531, sequenceNumber=41, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT2.337073916S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=41}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=41}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=41}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=33046}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=41}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:36,498 10334 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 616856435651832528 (BaseRowDelta)
2023-12-15 14:40:36,917 10753 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=616856435651832528, sequenceNumber=42, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT1.698149167S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=42}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=42}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=42}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=33852}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=42}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:38,226 12062 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 4567773841167163715 (BaseRowDelta)
2023-12-15 14:40:38,642 12478 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=4567773841167163715, sequenceNumber=43, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT1.723988375S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=43}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=43}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=43}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=34658}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=43}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:39,991 13827 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 4541277781112838040 (BaseRowDelta)
2023-12-15 14:40:40,466 14302 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=4541277781112838040, sequenceNumber=44, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT1.82380625S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=44}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=44}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=44}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=35464}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=44}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:41,776 15612 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 6741568890407525576 (BaseRowDelta)
2023-12-15 14:40:42,184 16020 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=6741568890407525576, sequenceNumber=45, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT1.717269417S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=45}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=45}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=45}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=36270}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=45}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:43,460 17296 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 1018838099119420123 (BaseRowDelta)
2023-12-15 14:40:43,877 17713 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=1018838099119420123, sequenceNumber=46, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT1.692011958S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=46}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=46}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=46}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=37076}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=46}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:45,192 19028 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 758872760345785094 (BaseRowDelta)
2023-12-15 14:40:45,630 19466 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=758872760345785094, sequenceNumber=47, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT1.752748416S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=47}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=47}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=47}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=37882}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=47}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:46,938 20774 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 8330568458953296944 (BaseRowDelta)
2023-12-15 14:40:47,345 21181 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=8330568458953296944, sequenceNumber=48, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT1.714607334S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=48}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=48}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=48}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=38688}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=48}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:48,875 22711 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 380337181704891470 (BaseRowDelta)
2023-12-15 14:40:49,281 23117 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=380337181704891470, sequenceNumber=49, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT1.934422667S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=49}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=49}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=49}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=39494}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=49}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:50,558 24394 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 2626027648794038319 (BaseRowDelta)
2023-12-15 14:40:50,960 24796 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=2626027648794038319, sequenceNumber=50, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT1.679267084S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=50}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=50}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=50}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=806}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=40300}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=1}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=50}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
2023-12-15 14:40:52,297 26133 [main] INFO  o.a.i.BaseMetastoreTableOperations  - Successfully committed to table iceberg.prod_iceberg.streams in 1041 ms
2023-12-15 14:40:52,588 26424 [main] INFO  o.a.i.BaseMetastoreTableOperations  - Refreshing table metadata from new version: s3://bucket/ice/tables/streams/metadata/00005-20fd758e-5cb9-431d-95d5-1de0e06bea8f.metadata.json
Row delta took 24493 millis
Table has 20 manifests
Starting with streaming update
2023-12-15 14:40:59,957 33793 [main] INFO  o.a.i.BaseMetastoreTableOperations  - Successfully committed to table iceberg.prod_iceberg.streams in 1003 ms
2023-12-15 14:40:59,957 33793 [main] INFO  org.apache.iceberg.SnapshotProducer  - Committed snapshot 6955191004682032851 (BaseStreamingUpdate)
2023-12-15 14:41:00,242 34078 [main] INFO  o.a.i.BaseMetastoreTableOperations  - Refreshing table metadata from new version: s3://bucket/ice/tables/streams/metadata/00006-435a312d-615d-4ae0-8c0a-96d60a1aa9d5.metadata.json
2023-12-15 14:41:01,349 35185 [main] INFO  o.a.i.m.LoggingMetricsReporter  - Received metrics report: CommitReport{tableName=iceberg.prod_iceberg.streams, snapshotId=6955191004682032851, sequenceNumber=60, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT3.728768833S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=10}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=60}, addedDeleteFiles=CounterResult{unit=COUNT, value=10}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=10}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=60}, addedRecords=CounterResult{unit=COUNT, value=10}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=60}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=8060}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=48360}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=10}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=60}}, metadata={iceberg-version=Apache Iceberg 1.5.0-SNAPSHOT (commit 0b63a580f84dc38fc891a75fa244724cb9d57385)}}
Streaming update took 4010 millis
Table has 22 manifests

24 seconds and 20 manifests vs 4 seconds and 2 manifests

@rdblue
Copy link
Contributor

rdblue commented Dec 18, 2023

@jasonf20, I don't quite understand the use case. It looks like the purpose is to commit multiple batches of data at the same time. Why would not not just use a single operation? Do you need to produce multiple sequence numbers but only want to commit once?

@jasonf20
Copy link
Contributor Author

jasonf20 commented Dec 19, 2023

@rdblue Correct, we need multiple (new) sequence sequence numbers since each batch has equality deletes that need to apply to prior batches, but not newer batches.
Committing more than once works, it is just very slow and produces many manifest files wastefully.

RewriteFiles partially supports setting the sequence numbers manually, but in our case, we are not rewriting a known sequence number, so we can't set it explicitly while adding files, instead, they need to be determined during apply.

@rdblue
Copy link
Contributor

rdblue commented Jan 3, 2024

@jasonf20, to make that work, I think you'd need to keep track of a base sequence number and update the metadata for each new manifest with the correct sequence number when the manifest list is written. That sounds doable, but complicated. Can you explain how you're handling it in this PR?

@jasonf20
Copy link
Contributor Author

jasonf20 commented Jan 4, 2024

@rdblue Sure. I added support for setting the sequence number explicitly per file in MergingSnapshotProducer. This was almost supported already (it didn't support per file level for added files). This is then used in a similar way to how Rewrite used it, but the main difference is that we don't set the sequence number when the user adds the file.

Instead, we just collect an ordered list of files and only assign the sequence numbers when apply is called. Since apply is called with the base metadata we can calculate the sequence numbers from there. If the commit fails due to a conflict apply is called again and we can re-calculate the new sequence numbers. Looking again now, I think I might need to remove the requiresApply boolean from the apply method for retries to work properly.

EDIT: I added a commit + test to fix the issue with retries.

@rdblue
Copy link
Contributor

rdblue commented Jan 23, 2024

@jasonf20, explicitly setting the sequence number isn't safe. Sequence numbers are assigned when the client attempts to commit and must be updated if the client has to retry. You could make this work by writing separate manifest files for each interim commit and reassigning the sequence numbers for those in order, but that seems time consuming and would require large changes to the snapshot producer base class. Maybe we should catch up sometime to talk through what you're trying to accomplish.

@jasonf20
Copy link
Contributor Author

jasonf20 commented Jan 23, 2024

@rdblue I'm not setting it explicitly, I'm allowing the apply function to set it based on the current snapshot that will be used during the commit. If the commit succeeds that is the right base to use, otherwise it will be re-called with the new base.

If I understand correctly that is what is happening already in SnapshotProduer here: https://github.com/apache/iceberg/blob/20ff1ab33d2e032feb845eb2609bd6eb2c154f2d/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L230C10-L230C24

What am I missing?

Catching up sounds good, I'll ping you in Slack

/** Add a data file to the new snapshot. */
protected void add(DataFile file, long dataSequenceNumber) {
Preconditions.checkNotNull(file, "Invalid data file: null");
addDataFile(new FileHolder<>(file, dataSequenceNumber));
Copy link
Contributor

@rdblue rdblue Feb 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasonf20, the caller can't reliably supply a sequence number. Let's talk about this on Tuesday. I think there's a way that you can update this class to do what you want, but it's going to be a bit more work than what you have here.

@jasonf20
Copy link
Contributor Author

@rdblue Based on our discussions could you have another look at this?

jasonf20 added 2 commits July 25, 2024 15:46
This will allow re-using it for insert files for specific data versions in future commits
@jasonf20 jasonf20 force-pushed the streaming-update branch 3 times, most recently from 75b2f09 to 37cc30a Compare July 25, 2024 13:07
This operation allows adding multiple consecutive update in a single commit without equality deletes from prior updates affecting inserts that occurred after it.

Before this commit you would  need to do something like this:
```
for batch in batches:
    delta = transaction.newRowDelta()
    delta.add(batch.deletes)
    delta.add(batch.inserts)
    delta.commit()
transaction.commit()
```
Which produces many manifest files and is very IO intensive.

This operation allows:
```
update = table.newStreamingUpdate()
for batch, batchIndex in enumerate(batches):
    update.newBatch()
    update.add(batch.deleteFiles)
    update.add(batch.dataFiles)
update.commit()
```
@jasonf20
Copy link
Contributor Author

I rebased this PR again so it's mergable. Would appreciate reviving this PR. @rdblue Could you please have a look or let me know if there is someone else who could help?

Copy link

github-actions bot commented Oct 8, 2024

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Oct 8, 2024
@jasonf20
Copy link
Contributor Author

jasonf20 commented Oct 8, 2024

We are still interested in merging this, it's been very helpful for performance on our end and should do the same for other streaming users.

@github-actions github-actions bot removed the stale label Oct 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants