Skip to content

Commit

Permalink
Iceberg delete files are read twice during query processing causing d…
Browse files Browse the repository at this point in the history
…elays
  • Loading branch information
deniskuzZ committed Jan 12, 2023
1 parent ceaddbf commit 17509c5
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 31 deletions.
42 changes: 39 additions & 3 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -33,10 +34,13 @@
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FilterIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SortedMerge;
import org.apache.iceberg.util.StructLikeSet;

Expand Down Expand Up @@ -142,6 +146,33 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel
}
}

public static <T extends StructLike> Map<String, PositionDeleteIndex> toPositionIndex(
Iterable<CharSequence> dataLocations, List<CloseableIterable<T>> deleteFiles) {
DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocations);
List<CloseableIterable<Pair<String, Long>>> positions =
Lists.transform(
deleteFiles,
deletes ->
CloseableIterable.transform(
locationFilter.filter(deletes),
row ->
Pair.of(
(String) FILENAME_ACCESSOR.get(row),
(Long) POSITION_ACCESSOR.get(row))));

Map<String, PositionDeleteIndex> positionDeleteIndex = Maps.newHashMap();
try (CloseableIterable<Pair<String, Long>> deletes = CloseableIterable.concat(positions)) {
deletes.forEach(
entry ->
positionDeleteIndex
.computeIfAbsent(entry.first(), f -> new BitmapPositionDeleteIndex())
.delete(entry.second()));
return ImmutableMap.copyOf(positionDeleteIndex);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close position delete source", e);
}
}

public static <T> CloseableIterable<T> streamingFilter(
CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
Expand Down Expand Up @@ -306,15 +337,20 @@ protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
}

private static class DataFileFilter<T extends StructLike> extends Filter<T> {
private final CharSequence dataLocation;
private final Iterable<CharSequence> dataLocation;

DataFileFilter(CharSequence dataLocation) {
DataFileFilter(Iterable<CharSequence> dataLocation) {
this.dataLocation = dataLocation;
}

DataFileFilter(CharSequence dataLocation) {
this.dataLocation = ImmutableList.of(dataLocation);
}

@Override
protected boolean shouldKeep(T posDelete) {
return charSeqEquals(dataLocation, (CharSequence) FILENAME_ACCESSOR.get(posDelete));
return Iterables.any(
dataLocation, dl -> charSeqEquals(dl, (CharSequence) FILENAME_ACCESSOR.get(posDelete)));
}

private boolean charSeqEquals(CharSequence s1, CharSequence s2) {
Expand Down
101 changes: 75 additions & 26 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
Expand All @@ -36,13 +38,15 @@
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -51,6 +55,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;
import org.slf4j.Logger;
Expand Down Expand Up @@ -154,6 +159,14 @@ public CloseableIterable<T> filter(CloseableIterable<T> records) {
return applyEqDeletes(applyPosDeletes(records));
}

public CloseableIterable<T> filter(
CloseableIterable<T> records, Map<String, PositionDeleteIndex> positionIndexCache) {
Optional<PositionDeleteIndex> positionIndex =
Optional.ofNullable(positionIndexCache.get(filePath));
return applyEqDeletes(
positionIndex.isPresent() ? applyPosDeletes(records, positionIndex) : records);
}

private List<Predicate<T>> applyEqDeletes() {
if (isInDeleteSets != null) {
return isInDeleteSets;
Expand Down Expand Up @@ -239,6 +252,11 @@ public PositionDeleteIndex deletedRowPositions() {
}

private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
return applyPosDeletes(records, Optional.empty());
}

private CloseableIterable<T> applyPosDeletes(
CloseableIterable<T> records, Optional<PositionDeleteIndex> positionIndex) {
if (posDeletes.isEmpty()) {
return records;
}
Expand All @@ -247,8 +265,11 @@ private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {

// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, deletes);
Predicate<T> isDeleted = record -> positionIndex.isDeleted(pos(record));
Predicate<T> isDeleted =
record ->
positionIndex
.orElseGet(() -> Deletes.toPositionIndex(filePath, deletes))
.isDeleted(pos(record));
return createDeleteIterable(records, isDeleted);
}

Expand All @@ -266,13 +287,23 @@ private CloseableIterable<T> createDeleteIterable(
: Deletes.filterDeleted(records, isDeleted, counter);
}

static CloseableIterable<Record> openPosDeletes(FileIO io, DeleteFile file) {
InputFile input = io.newInputFile(file.path().toString());
return openDeletes(input, file, POS_DELETE_SCHEMA);
}

private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
return openDeletes(file, POS_DELETE_SCHEMA);
}

private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema deleteSchema) {
LOG.trace("Opening delete file {}", deleteFile.path());
InputFile input = getInputFile(deleteFile.path().toString());
return openDeletes(input, deleteFile, deleteSchema);
}

private static CloseableIterable<Record> openDeletes(
InputFile input, DeleteFile deleteFile, Schema deleteSchema) {
switch (deleteFile.format()) {
case AVRO:
return Avro.read(input)
Expand All @@ -282,32 +313,19 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
.build();

case PARQUET:
Parquet.ReadBuilder builder =
Parquet.read(input)
.project(deleteSchema)
.reuseContainers()
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema));

if (deleteFile.content() == FileContent.POSITION_DELETES) {
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
}

return builder.build();
return Parquet.read(input)
.project(deleteSchema)
.reuseContainers()
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
.build();

case ORC:
// Reusing containers is automatic for ORC. No need to set 'reuseContainers' here.
ORC.ReadBuilder orcBuilder =
ORC.read(input)
.project(deleteSchema)
.createReaderFunc(
fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema));

if (deleteFile.content() == FileContent.POSITION_DELETES) {
orcBuilder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
}

return orcBuilder.build();
return ORC.read(input)
.project(deleteSchema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema))
.build();
default:
throw new UnsupportedOperationException(
String.format(
Expand Down Expand Up @@ -367,4 +385,35 @@ private static Schema fileProjection(

return new Schema(columns);
}

public static Map<String, PositionDeleteIndex> createPosIndexCache(
Iterable<FileScanTask> fileTasks, FileIO io) {
return createPosIndexCache(fileTasks, DEFAULT_SET_FILTER_THRESHOLD, io);
}

public static Map<String, PositionDeleteIndex> createPosIndexCache(
Iterable<FileScanTask> fileTasks, long filterThreshold, FileIO io) {
List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
if (posDeletes.isEmpty()) {
return ImmutableMap.of();
}
// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < filterThreshold) {
Iterable<CharSequence> filePaths =
Iterables.transform(fileTasks, task -> task.file().path().toString());
List<CloseableIterable<Record>> deletes =
Lists.transform(posDeletes, row -> openPosDeletes(io, row));
return Deletes.toPositionIndex(filePaths, deletes);
}
return ImmutableMap.of();
}

private static List<DeleteFile> distinctPosDeletes(Iterable<FileScanTask> fileTasks) {
CharSequenceSet deleteSet = CharSequenceSet.empty();

return FluentIterable.from(fileTasks).transformAndConcat(FileScanTask::deletes).stream()
.filter(deleteFile -> FileContent.POSITION_DELETES == deleteFile.content())
.filter(deleteFile -> deleteSet.add(deleteFile.path()))
.collect(ImmutableList.toImmutableList());
}
}
18 changes: 16 additions & 2 deletions data/src/main/java/org/apache/iceberg/data/GenericReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand All @@ -39,6 +40,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
Expand All @@ -62,19 +64,31 @@ class GenericReader implements Serializable {
CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
Iterable<FileScanTask> fileTasks =
Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
return open(fileTasks).iterator();
}

public CloseableIterable<Record> open(CombinedScanTask task) {
return new CombinedTaskIterable(task);
}

public CloseableIterable<Record> open(Iterable<FileScanTask> fileTasks) {
Map<String, PositionDeleteIndex> positionIndexCache =
DeleteFilter.createPosIndexCache(fileTasks, io);
return CloseableIterable.concat(
Iterables.transform(fileTasks, task -> open(task, positionIndexCache)));
}

public CloseableIterable<Record> open(FileScanTask task) {
return open(task, ImmutableMap.of());
}

public CloseableIterable<Record> open(
FileScanTask task, Map<String, PositionDeleteIndex> positionIndexCache) {
DeleteFilter<Record> deletes = new GenericDeleteFilter(io, task, tableSchema, projection);
Schema readSchema = deletes.requiredSchema();

CloseableIterable<Record> records = openFile(task, readSchema);
records = deletes.filter(records);
records = deletes.filter(records, positionIndexCache);
records = applyResidual(records, readSchema, task.residual());

return records;
Expand Down

0 comments on commit 17509c5

Please sign in to comment.