Skip to content

Commit

Permalink
added support for the external cache
Browse files Browse the repository at this point in the history
  • Loading branch information
deniskuzZ committed Jan 25, 2023
1 parent 4bbc4c8 commit 490934c
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,13 @@ public boolean isDeleted(long position) {
public boolean isEmpty() {
return roaring64Bitmap.isEmpty();
}

@Override
public PositionDeleteIndex or(PositionDeleteIndex deleteIndex) {
if (deleteIndex instanceof BitmapPositionDeleteIndex) {
roaring64Bitmap.or(((BitmapPositionDeleteIndex) deleteIndex).roaring64Bitmap);
return this;
}
throw new IllegalArgumentException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ public interface PositionDeleteIndex {

/** Returns true if this collection contains no element. */
boolean isEmpty();

/** In-place bitwise OR (union) operation. The current bitmap is modified. */
default PositionDeleteIndex or(PositionDeleteIndex deleteIndex) {
throw new UnsupportedOperationException();
}
}
26 changes: 21 additions & 5 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
Expand Down Expand Up @@ -409,18 +410,33 @@ public static Map<String, PositionDeleteIndex> createPosIndexMap(
return ImmutableMap.of();
}

public static Map<CharSequence, Map<String, PositionDeleteIndex>> createPosIndexMap(
public static Map<String, PositionDeleteIndex> createPosIndexMap(
Cache<CharSequence, Map<String, PositionDeleteIndex>> posIndexCache,
Iterable<FileScanTask> fileTasks,
FileIO io) {
List<DeleteFile> posDeletes = distinctPosDeletes(fileTasks);
if (posDeletes.isEmpty()) {
return ImmutableMap.of();
}
return Maps.transformEntries(
Maps.uniqueIndex(posDeletes, DeleteFile::path),
(key, row) ->
posIndexCache.get(key, k -> Deletes.toPositionIndexMap(openPosDeletes(io, row))));
// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum()
< DEFAULT_SET_FILTER_THRESHOLD) {
Iterable<CharSequence> filePaths =
Iterables.transform(fileTasks, task -> task.file().path().toString());

return Lists.transform(
posDeletes,
row ->
Maps.filterKeys(
posIndexCache.get(
row.path(), f -> Deletes.toPositionIndexMap(openPosDeletes(io, row))),
key -> Iterables.contains(filePaths, key)))
.stream()
.flatMap(map -> map.entrySet().stream())
.collect(
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, PositionDeleteIndex::or));
}
return ImmutableMap.of();
}

private static List<DeleteFile> distinctPosDeletes(Iterable<FileScanTask> fileTasks) {
Expand Down
1 change: 1 addition & 0 deletions spark/v3.3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")

compileOnly "com.google.errorprone:error_prone_annotations"
compileOnly "com.github.ben-manes.caffeine:caffeine"
compileOnly "org.apache.avro:avro"
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}") {
exclude group: 'org.apache.avro', module: 'avro'
Expand Down

0 comments on commit 490934c

Please sign in to comment.