Skip to content

Commit

Permalink
cache fix
Browse files Browse the repository at this point in the history
  • Loading branch information
deniskuzZ committed May 23, 2023
1 parent 273bf8d commit a931a78
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ public PositionDeleteIndex or(PositionDeleteIndex deleteIndex) {
roaring64Bitmap.or(((BitmapPositionDeleteIndex) deleteIndex).roaring64Bitmap);
return this;
}

@Override
public String toString() {
return roaring64Bitmap.toString();
}
}
43 changes: 32 additions & 11 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,20 @@ private CloseableIterable<T> createDeleteIterable(
: Deletes.filterDeleted(records, isDeleted, counter);
}

private CloseableIterable<Record> openPosDeletesNoFilter(DeleteFile file) {
return openDeletes(file, POS_DELETE_SCHEMA, null);
}

private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
return openDeletes(file, POS_DELETE_SCHEMA);
}

private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema deleteSchema) {
return openDeletes(deleteFile, deleteSchema, filePath);
}

private CloseableIterable<Record> openDeletes(
DeleteFile deleteFile, Schema deleteSchema, String pathFilter) {
LOG.trace("Opening delete file {}", deleteFile.path());
InputFile input = getInputFile(deleteFile.path().toString());
switch (deleteFile.format()) {
Expand All @@ -313,15 +322,15 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
.reuseContainers()
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
.filter(deletesFilter(deleteFile))
.filter(deletesFilter(deleteFile, pathFilter))
.build();

case ORC:
// Reusing containers is automatic for ORC. No need to set 'reuseContainers' here.
return ORC.read(input)
.project(deleteSchema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema))
.filter(deletesFilter(deleteFile))
.filter(deletesFilter(deleteFile, pathFilter))
.build();
default:
throw new UnsupportedOperationException(
Expand All @@ -331,9 +340,9 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
}
}

private Expression deletesFilter(DeleteFile deleteFile) {
if (deleteFile.content() == FileContent.POSITION_DELETES) {
return Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath);
private Expression deletesFilter(DeleteFile deleteFile, String pathFilter) {
if (pathFilter != null && FileContent.POSITION_DELETES == deleteFile.content()) {
return Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), pathFilter);
}
return null;
}
Expand Down Expand Up @@ -390,12 +399,21 @@ private static Schema fileProjection(
return new Schema(columns);
}

public Map<String, PositionDeleteIndex> createPosIndexMap(Iterable<FileScanTask> fileTasks) {
return createPosIndexMap(fileTasks, DEFAULT_SET_FILTER_THRESHOLD);
public Map<String, PositionDeleteIndex> createPosIndexMap(
Map<String, PositionDeleteIndex> posIndexCache, Iterable<FileScanTask> fileTasks) {
return createPosIndexMap(posIndexCache, fileTasks, DEFAULT_SET_FILTER_THRESHOLD);
}

public Map<String, PositionDeleteIndex> createPosIndexMap(
Iterable<FileScanTask> fileTasks, long filterThreshold) {
Map<String, PositionDeleteIndex> posIndexCache,
Iterable<FileScanTask> fileTasks,
long filterThreshold) {
Preconditions.checkNotNull(posIndexCache, "posIndexCache cannot be null");

if (!posIndexCache.isEmpty()) {
return posIndexCache;
}

List<DeleteFile> posDeleteFiles = distinctPosDeletes(fileTasks);
if (posDeleteFiles.isEmpty()) {
return ImmutableMap.of();
Expand All @@ -406,8 +424,9 @@ public Map<String, PositionDeleteIndex> createPosIndexMap(
Set<CharSequence> filePaths =
CharSequenceSet.of(Iterables.transform(fileTasks, task -> task.file().path()));
List<CloseableIterable<Record>> deletes =
Lists.transform(posDeleteFiles, this::openPosDeletes);
return Deletes.toPositionIndexMap(filePaths, deletes);
Lists.transform(posDeleteFiles, this::openPosDeletesNoFilter);
posIndexCache.putAll(Deletes.toPositionIndexMap(filePaths, deletes));
return posIndexCache;
}
return null;
}
Expand All @@ -422,6 +441,8 @@ public Map<String, PositionDeleteIndex> createPosIndexMap(
Cache<CharSequence, Map<String, PositionDeleteIndex>> posIndexCache,
Iterable<FileScanTask> fileTasks,
long filterThreshold) {
Preconditions.checkNotNull(posIndexCache, "posIndexCache cannot be null");

List<DeleteFile> posDeleteFiles = distinctPosDeletes(fileTasks);
if (posDeleteFiles.isEmpty()) {
return ImmutableMap.of();
Expand Down Expand Up @@ -450,7 +471,7 @@ public Map<String, PositionDeleteIndex> createPosIndexMap(
LOG.debug("Cache miss: {}", deleteFile.path());
Instant start = Instant.now();
Map<String, PositionDeleteIndex> res =
Deletes.toPositionIndexMap(openPosDeletes(deleteFile));
Deletes.toPositionIndexMap(openPosDeletesNoFilter(deleteFile));
LOG.debug(
"Cache load: {}; Time taken: {} ms;",
deleteFile.path(),
Expand Down
6 changes: 4 additions & 2 deletions data/src/main/java/org/apache/iceberg/data/GenericReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
Expand Down Expand Up @@ -72,8 +73,9 @@ public CloseableIterable<Record> open(CombinedScanTask task) {
}

public CloseableIterable<Record> open(Iterable<FileScanTask> fileTasks) {
Map<String, PositionDeleteIndex> deletePosIndexCache = Maps.newHashMap();
Function<DeleteFilter<Record>, Map<String, PositionDeleteIndex>> positionIndex =
filter -> filter.createPosIndexMap(fileTasks);
filter -> filter.createPosIndexMap(deletePosIndexCache, fileTasks);
return CloseableIterable.concat(
Iterables.transform(fileTasks, task -> open(task, positionIndex)));
}
Expand All @@ -82,7 +84,7 @@ public CloseableIterable<Record> open(FileScanTask task) {
return open(task, filter -> null);
}

public CloseableIterable<Record> open(
private CloseableIterable<Record> open(
FileScanTask task,
Function<DeleteFilter<Record>, Map<String, PositionDeleteIndex>> positionIndex) {
DeleteFilter<Record> deletes = new GenericDeleteFilter(io, task, tableSchema, projection);
Expand Down

0 comments on commit a931a78

Please sign in to comment.