From 3e00fa03d36566d74644755be0cfb5f88a4c18f4 Mon Sep 17 00:00:00 2001 From: Jason Fine Date: Mon, 11 Dec 2023 18:37:00 +0200 Subject: [PATCH] MergingSnapshotProducer: Support adding data files at a specific sequence number --- .../iceberg/MergingSnapshotProducer.java | 42 +++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e9a637e18486..2ec02dcf52da 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; @@ -80,7 +81,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final ManifestFilterManager deleteFilterManager; // update data - private final List newDataFiles = Lists.newArrayList(); + private final List> newDataFiles = Lists.newArrayList(); private Long newDataFilesDataSequenceNumber; private final Map>> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); @@ -151,7 +152,8 @@ protected Expression rowFilter() { } protected List addedDataFiles() { - return ImmutableList.copyOf(newDataFiles); + return ImmutableList.copyOf( + newDataFiles.stream().map(FileHolder::file).collect(Collectors.toList())); } protected void failAnyDelete() { @@ -221,10 +223,20 @@ protected boolean addsDeleteFiles() { /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - setDataSpec(file); - addedFilesSummary.addedFile(dataSpec(), file); + addDataFile(new FileHolder<>(file)); + } + + /** Add a data file to the new snapshot. */ + protected void add(DataFile file, long dataSequenceNumber) { + Preconditions.checkNotNull(file, "Invalid data file: null"); + addDataFile(new FileHolder<>(file, dataSequenceNumber)); + } + + private void addDataFile(FileHolder dataFile) { + setDataSpec(dataFile.file()); + addedFilesSummary.addedFile(dataSpec(), dataFile.file()); hasNewDataFiles = true; - newDataFiles.add(file); + newDataFiles.add(dataFile); } /** Add a delete file to the new snapshot. */ @@ -954,9 +966,23 @@ private List newDataFilesAsManifests() { RollingManifestWriter writer = newRollingManifestWriter(dataSpec()); try { if (newDataFilesDataSequenceNumber == null) { - newDataFiles.forEach(writer::add); + newDataFiles.forEach( + f -> { + if (f.dataSequenceNumber() == null) { + writer.add(f.file()); + } else { + writer.add(f.file(), f.dataSequenceNumber); + } + }); } else { - newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber)); + newDataFiles.forEach( + f -> { + if (f.dataSequenceNumber() == null) { + writer.add(f.file(), newDataFilesDataSequenceNumber); + } else { + writer.add(f.file(), f.dataSequenceNumber); + } + }); } } finally { writer.close(); @@ -1126,7 +1152,7 @@ protected ManifestReader newManifestReader(ManifestFile manifest) { } } - private static class FileHolder>{ + private static class FileHolder> { private final T file; private final Long dataSequenceNumber;