Skip to content

Commit

Permalink
MergingSnapshotProducer: Support adding data files at a specific sequ…
Browse files Browse the repository at this point in the history
…ence number
  • Loading branch information
jasonf20 committed Dec 19, 2023
1 parent 23e0317 commit 3e00fa0
Showing 1 changed file with 34 additions and 8 deletions.
42 changes: 34 additions & 8 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -80,7 +81,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final ManifestFilterManager<DeleteFile> deleteFilterManager;

// update data
private final List<DataFile> newDataFiles = Lists.newArrayList();
private final List<FileHolder<DataFile>> newDataFiles = Lists.newArrayList();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<FileHolder<DeleteFile>>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -151,7 +152,8 @@ protected Expression rowFilter() {
}

protected List<DataFile> addedDataFiles() {
return ImmutableList.copyOf(newDataFiles);
return ImmutableList.copyOf(
newDataFiles.stream().map(FileHolder::file).collect(Collectors.toList()));
}

protected void failAnyDelete() {
Expand Down Expand Up @@ -221,10 +223,20 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
setDataSpec(file);
addedFilesSummary.addedFile(dataSpec(), file);
addDataFile(new FileHolder<>(file));
}

/** Add a data file to the new snapshot. */
protected void add(DataFile file, long dataSequenceNumber) {
Preconditions.checkNotNull(file, "Invalid data file: null");
addDataFile(new FileHolder<>(file, dataSequenceNumber));
}

private void addDataFile(FileHolder<DataFile> dataFile) {
setDataSpec(dataFile.file());
addedFilesSummary.addedFile(dataSpec(), dataFile.file());
hasNewDataFiles = true;
newDataFiles.add(file);
newDataFiles.add(dataFile);
}

/** Add a delete file to the new snapshot. */
Expand Down Expand Up @@ -954,9 +966,23 @@ private List<ManifestFile> newDataFilesAsManifests() {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec());
try {
if (newDataFilesDataSequenceNumber == null) {
newDataFiles.forEach(writer::add);
newDataFiles.forEach(
f -> {
if (f.dataSequenceNumber() == null) {
writer.add(f.file());
} else {
writer.add(f.file(), f.dataSequenceNumber);
}
});
} else {
newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
newDataFiles.forEach(
f -> {
if (f.dataSequenceNumber() == null) {
writer.add(f.file(), newDataFilesDataSequenceNumber);
} else {
writer.add(f.file(), f.dataSequenceNumber);
}
});
}
} finally {
writer.close();
Expand Down Expand Up @@ -1126,7 +1152,7 @@ protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
}
}

private static class FileHolder<T extends ContentFile<?>>{
private static class FileHolder<T extends ContentFile<?>> {
private final T file;
private final Long dataSequenceNumber;

Expand Down

0 comments on commit 3e00fa0

Please sign in to comment.