Skip to content

Commit

Permalink
Core: Use CharSequenceMap for writing unordered deletes (#9365)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Dec 24, 2023
1 parent 226a23f commit 6f4e33e
Showing 1 changed file with 9 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.CharSequenceMap;
import org.roaringbitmap.longlong.PeekableLongIterator;
import org.roaringbitmap.longlong.Roaring64Bitmap;

Expand All @@ -44,28 +42,20 @@ public class SortingPositionOnlyDeleteWriter<T>
implements FileWriter<PositionDelete<T>, DeleteWriteResult> {

private final FileWriter<PositionDelete<T>, DeleteWriteResult> writer;
private final Map<CharSequenceWrapper, Roaring64Bitmap> positionsByPath;
private final CharSequenceWrapper pathWrapper;
private final CharSequenceMap<Roaring64Bitmap> positionsByPath;
private DeleteWriteResult result = null;

public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>, DeleteWriteResult> writer) {
this.writer = writer;
this.positionsByPath = Maps.newHashMap();
this.pathWrapper = CharSequenceWrapper.wrap(null);
this.positionsByPath = CharSequenceMap.create();
}

@Override
public void write(PositionDelete<T> positionDelete) {
CharSequence path = positionDelete.path();
long position = positionDelete.pos();
Roaring64Bitmap positions = positionsByPath.get(pathWrapper.set(path));
if (positions != null) {
positions.add(position);
} else {
positions = new Roaring64Bitmap();
positions.add(position);
positionsByPath.put(CharSequenceWrapper.wrap(path), positions);
}
Roaring64Bitmap positions = positionsByPath.computeIfAbsent(path, Roaring64Bitmap::new);
positions.add(position);
}

@Override
Expand All @@ -88,12 +78,12 @@ public void close() throws IOException {
private DeleteWriteResult writeDeletes() throws IOException {
try {
PositionDelete<T> positionDelete = PositionDelete.create();
for (CharSequenceWrapper path : sortedPaths()) {
for (CharSequence path : sortedPaths()) {
// the iterator provides values in ascending sorted order
PeekableLongIterator positions = positionsByPath.get(path).getLongIterator();
while (positions.hasNext()) {
long position = positions.next();
writer.write(positionDelete.set(path.get(), position, null /* no row */));
writer.write(positionDelete.set(path, position, null /* no row */));
}
}
} finally {
Expand All @@ -103,8 +93,8 @@ private DeleteWriteResult writeDeletes() throws IOException {
return writer.result();
}

private List<CharSequenceWrapper> sortedPaths() {
List<CharSequenceWrapper> paths = Lists.newArrayList(positionsByPath.keySet());
private List<CharSequence> sortedPaths() {
List<CharSequence> paths = Lists.newArrayList(positionsByPath.keySet());
paths.sort(Comparators.charSequences());
return paths;
}
Expand Down

0 comments on commit 6f4e33e

Please sign in to comment.