diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index d2f821ea0021..caa84fab5ad2 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -40,6 +40,8 @@
public class TypeUtil {
+ private static final long OBJECT_HEADER = 12L;
+
private TypeUtil() {}
/**
@@ -452,6 +454,68 @@ private static void checkSchemaCompatibility(
}
}
+ /**
+ * Estimates the number of bytes a value for a given field may occupy in memory.
+ *
+ *
This method approximates the memory size based on the internal Java representation defined
+ * by {@link Type.TypeID}. It is important to note that the actual size might differ from this
+ * estimation. The method is designed to handle a variety of data types, including primitive
+ * types, strings, and nested types such as structs, maps, and lists.
+ *
+ * @param field a field for which to estimate the size
+ * @return the estimated size in bytes of the field's value in memory
+ */
+ public static long defaultSize(Types.NestedField field) {
+ return defaultSize(field.type());
+ }
+
+ private static long defaultSize(Type type) {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ // the size of a boolean variable is virtual machine dependent
+ // it is common to believe booleans occupy 1 byte in most JVMs
+ return 1;
+ case INTEGER:
+ case FLOAT:
+ case DATE:
+ // ints and floats occupy 4 bytes
+ // dates are internally represented as ints
+ return 4;
+ case LONG:
+ case DOUBLE:
+ case TIME:
+ case TIMESTAMP:
+ // longs and doubles occupy 8 bytes
+ // times and timestamps are internally represented as longs
+ return 8;
+ case STRING:
+ // 12 (header) + 12 (fields) + 16 (array overhead) + 20 (10 chars, 2 bytes each) = 60 bytes
+ return 60;
+ case UUID:
+ // 12 (header) + 16 (two long variables) = 28 bytes
+ return 28;
+ case FIXED:
+ return ((Types.FixedType) type).length();
+ case BINARY:
+ return 100;
+ case DECIMAL:
+ // 12 (header) + (12 + 12 + 4) (BigInteger) + 4 (scale) = 44 bytes
+ return 44;
+ case STRUCT:
+ Types.StructType struct = (Types.StructType) type;
+ return OBJECT_HEADER + struct.fields().stream().mapToLong(TypeUtil::defaultSize).sum();
+ case LIST:
+ Types.ListType list = (Types.ListType) type;
+ return OBJECT_HEADER + 5 * defaultSize(list.elementType());
+ case MAP:
+ Types.MapType map = (Types.MapType) type;
+ long entrySize = OBJECT_HEADER + defaultSize(map.keyType()) + defaultSize(map.valueType());
+ return OBJECT_HEADER + 5 * entrySize;
+ default:
+ return 16;
+ }
+ }
+
/** Interface for passing a function that assigns column IDs. */
public interface NextID {
int get();
diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
index ce183ec5d446..feac1f61a13e 100644
--- a/core/src/main/java/org/apache/iceberg/SystemConfigs.java
+++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
@@ -43,14 +43,14 @@ private SystemConfigs() {}
Integer::parseUnsignedInt);
/**
- * Sets the size of the delete worker pool. This limits the number of threads used to compute the
- * PositionDeleteIndex from the position deletes for a data file.
+ * Sets the size of the delete worker pool. This limits the number of threads used to read delete
+ * files for a data file.
*/
public static final ConfigEntry DELETE_WORKER_THREAD_POOL_SIZE =
new ConfigEntry<>(
"iceberg.worker.delete-num-threads",
"ICEBERG_WORKER_DELETE_NUM_THREADS",
- Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Math.max(2, 4 * Runtime.getRuntime().availableProcessors()),
Integer::parseUnsignedInt);
/** Whether to use the shared worker pool when planning table scans. */
diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
index 7690ab7e4879..97699c1c9113 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -27,6 +27,10 @@ class BitmapPositionDeleteIndex implements PositionDeleteIndex {
roaring64Bitmap = new Roaring64Bitmap();
}
+ void merge(BitmapPositionDeleteIndex that) {
+ roaring64Bitmap.or(that.roaring64Bitmap);
+ }
+
@Override
public void delete(long position) {
roaring64Bitmap.add(position);
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index 118987e24b23..223056a40333 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -37,6 +37,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.SortedMerge;
@@ -128,6 +129,25 @@ public static StructLikeSet toEqualitySet(
}
}
+ public static CharSequenceMap toPositionIndexes(
+ CloseableIterable posDeletes) {
+ CharSequenceMap indexes = CharSequenceMap.create();
+
+ try (CloseableIterable deletes = posDeletes) {
+ for (T delete : deletes) {
+ CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete);
+ long position = (long) POSITION_ACCESSOR.get(delete);
+ PositionDeleteIndex index =
+ indexes.computeIfAbsent(filePath, key -> new BitmapPositionDeleteIndex());
+ index.delete(position);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close position delete source", e);
+ }
+
+ return indexes;
+ }
+
public static PositionDeleteIndex toPositionIndex(
CharSequence dataLocation, List> deleteFiles) {
return toPositionIndex(dataLocation, deleteFiles, ThreadPools.getDeleteWorkerPool());
diff --git a/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java
new file mode 100644
index 000000000000..660e01038cff
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+class EmptyPositionDeleteIndex implements PositionDeleteIndex {
+
+ private static final EmptyPositionDeleteIndex INSTANCE = new EmptyPositionDeleteIndex();
+
+ private EmptyPositionDeleteIndex() {}
+
+ static EmptyPositionDeleteIndex get() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void delete(long position) {
+ throw new UnsupportedOperationException("Cannot modify " + getClass().getName());
+ }
+
+ @Override
+ public void delete(long posStart, long posEnd) {
+ throw new UnsupportedOperationException("Cannot modify " + getClass().getName());
+ }
+
+ @Override
+ public boolean isDeleted(long position) {
+ return false;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "PositionDeleteIndex{}";
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
index bcfa9f2cf5ff..be05875aeb2a 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
@@ -44,4 +44,14 @@ public interface PositionDeleteIndex {
/** Returns true if this collection contains no element. */
boolean isEmpty();
+
+ /** Returns true if this collection contains elements. */
+ default boolean isNotEmpty() {
+ return !isEmpty();
+ }
+
+ /** Returns an empty immutable position delete index. */
+ static PositionDeleteIndex empty() {
+ return EmptyPositionDeleteIndex.get();
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java
new file mode 100644
index 000000000000..0c3bff28ee6b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class PositionDeleteIndexUtil {
+
+ private PositionDeleteIndexUtil() {}
+
+ public static PositionDeleteIndex merge(Iterable extends PositionDeleteIndex> indexes) {
+ BitmapPositionDeleteIndex result = new BitmapPositionDeleteIndex();
+
+ for (PositionDeleteIndex index : indexes) {
+ if (index.isNotEmpty()) {
+ Preconditions.checkArgument(
+ index instanceof BitmapPositionDeleteIndex,
+ "Can merge only bitmap-based indexes, got %s",
+ index.getClass().getName());
+ result.merge((BitmapPositionDeleteIndex) index);
+ }
+ }
+
+ return result;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
index 4f2314fec94d..ced121c03c63 100644
--- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
@@ -68,8 +68,9 @@ public static ExecutorService getWorkerPool() {
/**
* Return an {@link ExecutorService} that uses the "delete worker" thread-pool.
*
- * The size of the delete worker pool limits the number of threads used to compute the
- * PositionDeleteIndex from the position deletes for a data file.
+ *
The size of this worker pool limits the number of tasks concurrently reading delete files
+ * within a single JVM. If there are multiple threads loading deletes, all of them will share this
+ * worker pool by default.
*
*
The size of this thread-pool is controlled by the Java system property {@code
* iceberg.worker.delete-num-threads}.
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
new file mode 100644
index 000000000000..0056e80619b0
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.PositionDeleteIndexUtil;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeleteLoader implements DeleteLoader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteLoader.class);
+ private static final Schema POS_DELETE_SCHEMA = DeleteSchemaUtil.pathPosSchema();
+
+ private final Function loadInputFile;
+ private final ExecutorService workerPool;
+
+ public BaseDeleteLoader(Function loadInputFile) {
+ this(loadInputFile, ThreadPools.getDeleteWorkerPool());
+ }
+
+ public BaseDeleteLoader(
+ Function loadInputFile, ExecutorService workerPool) {
+ this.loadInputFile = loadInputFile;
+ this.workerPool = workerPool;
+ }
+
+ /**
+ * Checks if the given number of bytes can be cached.
+ *
+ * Implementations should override this method if they support caching. It is also recommended
+ * to use the provided size as a guideline to decide whether the value is eligible for caching.
+ * For instance, it may be beneficial to discard values that are too large to optimize the cache
+ * performance and utilization.
+ */
+ protected boolean canCache(long sizeBytes) {
+ return false;
+ }
+
+ /**
+ * Gets the cached value for the key or populates the cache with a new mapping.
+ *
+ *
If the value for the specified key is in the cache, it should be returned. If the value is
+ * not in the cache, implementations should compute the value using the provided supplier, cache
+ * it, and then return it.
+ *
+ *
This method will be called only if {@link #canCache(long)} returned true.
+ */
+ protected V get(String key, Supplier valueSupplier, long valueSize) {
+ throw new UnsupportedOperationException(getClass().getName() + " does not support caching");
+ }
+
+ @Override
+ public StructLikeSet loadEqualityDeletes(Iterable deleteFiles, Schema projection) {
+ Iterable> deletes =
+ execute(deleteFiles, deleteFile -> getOrLoadEqDeletes(deleteFile, projection));
+ StructLikeSet deleteSet = StructLikeSet.create(projection.asStruct());
+ Iterables.addAll(deleteSet, Iterables.concat(deletes));
+ return deleteSet;
+ }
+
+ private Iterable getOrLoadEqDeletes(DeleteFile deleteFile, Schema projection) {
+ long estimatedSize = estimateEqDeletesSize(deleteFile, projection);
+ if (canCache(estimatedSize)) {
+ String cacheKey = deleteFile.path().toString();
+ return get(cacheKey, () -> loadEqDeletes(deleteFile, projection), estimatedSize);
+ } else {
+ return loadEqDeletes(deleteFile, projection);
+ }
+ }
+
+ private Iterable loadEqDeletes(DeleteFile deleteFile, Schema projection) {
+ CloseableIterable deletes = openDeletes(deleteFile, projection);
+ CloseableIterable copiedDeletes = CloseableIterable.transform(deletes, Record::copy);
+ CloseableIterable copiedDeletesAsStructs = toStructs(copiedDeletes, projection);
+ return materialize(copiedDeletesAsStructs);
+ }
+
+ private CloseableIterable toStructs(
+ CloseableIterable records, Schema schema) {
+ InternalRecordWrapper wrapper = new InternalRecordWrapper(schema.asStruct());
+ return CloseableIterable.transform(records, wrapper::copyFor);
+ }
+
+ private Iterable materialize(CloseableIterable iterable) {
+ try (CloseableIterable closeableIterable = iterable) {
+ return ImmutableList.copyOf(closeableIterable);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close iterable", e);
+ }
+ }
+
+ @Override
+ public PositionDeleteIndex loadPositionDeletes(
+ Iterable deleteFiles, CharSequence filePath) {
+ Iterable deletes =
+ execute(deleteFiles, deleteFile -> getOrLoadPosDeletes(deleteFile, filePath));
+ return PositionDeleteIndexUtil.merge(deletes);
+ }
+
+ private PositionDeleteIndex getOrLoadPosDeletes(DeleteFile deleteFile, CharSequence filePath) {
+ long estimatedSize = estimatePosDeletesSize(deleteFile);
+ if (canCache(estimatedSize)) {
+ String cacheKey = deleteFile.path().toString();
+ CharSequenceMap indexes =
+ get(cacheKey, () -> loadPosDeletes(deleteFile), estimatedSize);
+ return indexes.getOrDefault(filePath, PositionDeleteIndex.empty());
+ } else {
+ return loadPosDeletes(deleteFile, filePath);
+ }
+ }
+
+ private CharSequenceMap loadPosDeletes(DeleteFile deleteFile) {
+ CloseableIterable deletes = openDeletes(deleteFile, POS_DELETE_SCHEMA);
+ return Deletes.toPositionIndexes(deletes);
+ }
+
+ private PositionDeleteIndex loadPosDeletes(DeleteFile deleteFile, CharSequence filePath) {
+ Expression filter = Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath);
+ CloseableIterable deletes = openDeletes(deleteFile, POS_DELETE_SCHEMA, filter);
+ return Deletes.toPositionIndex(filePath, ImmutableList.of(deletes));
+ }
+
+ private CloseableIterable openDeletes(DeleteFile deleteFile, Schema projection) {
+ return openDeletes(deleteFile, projection, null /* no filter */);
+ }
+
+ private CloseableIterable openDeletes(
+ DeleteFile deleteFile, Schema projection, Expression filter) {
+
+ FileFormat format = deleteFile.format();
+ LOG.trace("Opening delete file {}", deleteFile.path());
+ InputFile inputFile = loadInputFile.apply(deleteFile);
+
+ switch (format) {
+ case AVRO:
+ return Avro.read(inputFile)
+ .project(projection)
+ .reuseContainers()
+ .createReaderFunc(DataReader::create)
+ .build();
+
+ case PARQUET:
+ return Parquet.read(inputFile)
+ .project(projection)
+ .filter(filter)
+ .reuseContainers()
+ .createReaderFunc(newParquetReaderFunc(projection))
+ .build();
+
+ case ORC:
+ // reusing containers is automatic for ORC, no need to call 'reuseContainers'
+ return ORC.read(inputFile)
+ .project(projection)
+ .filter(filter)
+ .createReaderFunc(newOrcReaderFunc(projection))
+ .build();
+
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot read deletes, %s is not a supported file format: %s",
+ format.name(), inputFile.location()));
+ }
+ }
+
+ private Function> newParquetReaderFunc(Schema projection) {
+ return fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema);
+ }
+
+ private Function> newOrcReaderFunc(Schema projection) {
+ return fileSchema -> GenericOrcReader.buildReader(projection, fileSchema);
+ }
+
+ private Iterable execute(Iterable objects, Function func) {
+ Queue output = new ConcurrentLinkedQueue<>();
+
+ Tasks.foreach(objects)
+ .executeWith(workerPool)
+ .stopOnFailure()
+ .onFailure((object, exc) -> LOG.error("Failed to process {}", object, exc))
+ .run(object -> output.add(func.apply(object)));
+
+ return output;
+ }
+
+ // estimates the memory required to cache position deletes
+ private long estimatePosDeletesSize(DeleteFile deleteFile) {
+ // the space consumption highly depends on the nature of deleted positions (sparse vs compact)
+ // testing shows Roaring bitmaps require around 16 bits (2 bytes) per value on average
+ return 2 * deleteFile.recordCount();
+ }
+
+ // estimates the memory required to cache equality deletes
+ private long estimateEqDeletesSize(DeleteFile deleteFile, Schema projection) {
+ try {
+ long recordSize = estimateRecordSize(projection);
+ long recordCount = deleteFile.recordCount();
+ return LongMath.checkedMultiply(recordSize, recordCount);
+ } catch (ArithmeticException e) {
+ return Long.MAX_VALUE;
+ }
+ }
+
+ private long estimateRecordSize(Schema schema) {
+ return schema.columns().stream().mapToLong(TypeUtil::defaultSize).sum();
+ }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index 55acd3200894..e7d8445cf8c8 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -25,25 +25,16 @@
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.avro.DataReader;
-import org.apache.iceberg.data.orc.GenericOrcReader;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
-import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
@@ -58,8 +49,6 @@
public abstract class DeleteFilter {
private static final Logger LOG = LoggerFactory.getLogger(DeleteFilter.class);
- private static final Schema POS_DELETE_SCHEMA =
- new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS);
private final String filePath;
private final List posDeletes;
@@ -70,6 +59,7 @@ public abstract class DeleteFilter {
private final int isDeletedColumnPosition;
private final DeleteCounter counter;
+ private volatile DeleteLoader deleteLoader = null;
private PositionDeleteIndex deleteRowPositions = null;
private List> isInDeleteSets = null;
private Predicate eqDeleteRows = null;
@@ -143,10 +133,30 @@ Accessor posAccessor() {
protected abstract InputFile getInputFile(String location);
+ protected InputFile loadInputFile(DeleteFile deleteFile) {
+ return getInputFile(deleteFile.path().toString());
+ }
+
protected long pos(T record) {
return (Long) posAccessor.get(asStructLike(record));
}
+ protected DeleteLoader newDeleteLoader() {
+ return new BaseDeleteLoader(this::loadInputFile);
+ }
+
+ private DeleteLoader deleteLoader() {
+ if (deleteLoader == null) {
+ synchronized (this) {
+ if (deleteLoader == null) {
+ this.deleteLoader = newDeleteLoader();
+ }
+ }
+ }
+
+ return deleteLoader;
+ }
+
public CloseableIterable filter(CloseableIterable records) {
return applyEqDeletes(applyPosDeletes(records));
}
@@ -173,22 +183,11 @@ private List> applyEqDeletes() {
Iterable deletes = entry.getValue();
Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
- InternalRecordWrapper wrapper = new InternalRecordWrapper(deleteSchema.asStruct());
// a projection to select and reorder fields of the file schema to match the delete rows
StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
- Iterable> deleteRecords =
- Iterables.transform(deletes, delete -> openDeletes(delete, deleteSchema));
-
- // copy the delete records because they will be held in a set
- CloseableIterable records =
- CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy);
-
- StructLikeSet deleteSet =
- Deletes.toEqualitySet(
- CloseableIterable.transform(records, wrapper::copyFor), deleteSchema.asStruct());
-
+ StructLikeSet deleteSet = deleteLoader().loadEqualityDeletes(deletes, deleteSchema);
Predicate isInDeleteSet =
record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
isInDeleteSets.add(isInDeleteSet);
@@ -224,14 +223,10 @@ public Predicate eqDeletedRowFilter() {
}
public PositionDeleteIndex deletedRowPositions() {
- if (posDeletes.isEmpty()) {
- return null;
+ if (deleteRowPositions == null && !posDeletes.isEmpty()) {
+ this.deleteRowPositions = deleteLoader().loadPositionDeletes(posDeletes, filePath);
}
- if (deleteRowPositions == null) {
- List> deletes = Lists.transform(posDeletes, this::openPosDeletes);
- deleteRowPositions = Deletes.toPositionIndex(filePath, deletes);
- }
return deleteRowPositions;
}
@@ -240,9 +235,7 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) {
return records;
}
- List> deletes = Lists.transform(posDeletes, this::openPosDeletes);
-
- PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, deletes);
+ PositionDeleteIndex positionIndex = deletedRowPositions();
Predicate isDeleted = record -> positionIndex.isDeleted(pos(record));
return createDeleteIterable(records, isDeleted);
}
@@ -254,56 +247,6 @@ private CloseableIterable createDeleteIterable(
: Deletes.filterDeleted(records, isDeleted, counter);
}
- private CloseableIterable openPosDeletes(DeleteFile file) {
- return openDeletes(file, POS_DELETE_SCHEMA);
- }
-
- private CloseableIterable openDeletes(DeleteFile deleteFile, Schema deleteSchema) {
- LOG.trace("Opening delete file {}", deleteFile.path());
- InputFile input = getInputFile(deleteFile.path().toString());
- switch (deleteFile.format()) {
- case AVRO:
- return Avro.read(input)
- .project(deleteSchema)
- .reuseContainers()
- .createReaderFunc(DataReader::create)
- .build();
-
- case PARQUET:
- Parquet.ReadBuilder builder =
- Parquet.read(input)
- .project(deleteSchema)
- .reuseContainers()
- .createReaderFunc(
- fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema));
-
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
- builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
- }
-
- return builder.build();
-
- case ORC:
- // Reusing containers is automatic for ORC. No need to set 'reuseContainers' here.
- ORC.ReadBuilder orcBuilder =
- ORC.read(input)
- .project(deleteSchema)
- .createReaderFunc(
- fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema));
-
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
- orcBuilder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
- }
-
- return orcBuilder.build();
- default:
- throw new UnsupportedOperationException(
- String.format(
- "Cannot read deletes, %s is not a supported format: %s",
- deleteFile.format().name(), deleteFile.path()));
- }
- }
-
private static Schema fileProjection(
Schema tableSchema,
Schema requestedSchema,
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
new file mode 100644
index 000000000000..07bdce6d836f
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.util.StructLikeSet;
+
+/** An API for loading delete file content into in-memory data structures. */
+public interface DeleteLoader {
+ /**
+ * Loads the content of equality delete files into a set.
+ *
+ * @param deleteFiles equality delete files
+ * @param projection a projection of columns to load
+ * @return a set of equality deletes
+ */
+ StructLikeSet loadEqualityDeletes(Iterable deleteFiles, Schema projection);
+
+ /**
+ * Loads the content of position delete files for a given data file path into a position index.
+ *
+ * @param deleteFiles position delete files
+ * @param filePath the data file path for which to load deletes
+ * @return a position delete index for the provided data file path
+ */
+ PositionDeleteIndex loadPositionDeletes(Iterable deleteFiles, CharSequence filePath);
+}
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index 2c58281904c9..eeef75be6334 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -89,6 +89,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
+ implementation libs.caffeine
+
testImplementation(libs.hadoop2.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
index d86246b1e616..e3b01b8375b6 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
@@ -18,15 +18,19 @@
*/
package org.apache.iceberg.spark;
+import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
class SparkConfParser {
@@ -34,6 +38,12 @@ class SparkConfParser {
private final RuntimeConfig sessionConf;
private final Map options;
+ SparkConfParser() {
+ this.properties = ImmutableMap.of();
+ this.sessionConf = new RuntimeConfig(SQLConf.get());
+ this.options = ImmutableMap.of();
+ }
+
SparkConfParser(SparkSession spark, Table table, Map options) {
this.properties = table.properties();
this.sessionConf = spark.conf();
@@ -56,6 +66,10 @@ public StringConfParser stringConf() {
return new StringConfParser();
}
+ public DurationConfParser durationConf() {
+ return new DurationConfParser();
+ }
+
class BooleanConfParser extends ConfParser {
private Boolean defaultValue;
private boolean negate = false;
@@ -156,6 +170,33 @@ public String parseOptional() {
}
}
+ class DurationConfParser extends ConfParser {
+ private Duration defaultValue;
+
+ @Override
+ protected DurationConfParser self() {
+ return this;
+ }
+
+ public DurationConfParser defaultValue(Duration value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public Duration parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+ return parse(this::toDuration, defaultValue);
+ }
+
+ public Duration parseOptional() {
+ return parse(this::toDuration, defaultValue);
+ }
+
+ private Duration toDuration(String time) {
+ return Duration.ofSeconds(JavaUtils.timeStringAsSec(time));
+ }
+ }
+
abstract class ConfParser {
private final List optionNames = Lists.newArrayList();
private String sessionConfName;
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
new file mode 100644
index 000000000000..77a66d4e766e
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executor cache for optimizing tasks by reducing the computation and IO overhead.
+ *
+ * The cache is configurable and enabled through Spark SQL properties. Its key features include
+ * setting limits on the total cache size and maximum size for individual entries. Additionally, it
+ * implements automatic eviction of entries after a specified duration of inactivity.
+ *
+ *
Usage pattern involves fetching data from the cache using a unique combination of execution ID
+ * and key. If the data is not present in the cache, it is computed using the provided supplier and
+ * stored in the cache, subject to the defined size constraints.
+ *
+ *
Note that this class employs the singleton pattern to ensure only one cache exists per JVM.
+ *
+ * @see SparkUtil#executionId()
+ */
+public class SparkExecutorCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkExecutorCache.class);
+
+ private static final SparkConfParser CONF_PARSER = new SparkConfParser();
+ private static final boolean CACHE_ENABLED = parseCacheEnabledConf();
+ private static final Duration TIMEOUT = parseTimeoutConf();
+ private static final long MAX_ENTRY_SIZE = parseMaxEntrySizeConf();
+ private static final long MAX_TOTAL_SIZE = parseMaxTotalSizeConf();
+ private static final String EXECUTOR_DESC = SparkUtil.executorDesc();
+
+ private static volatile SparkExecutorCache instance = null;
+
+ private final Map> keysByExecutionId;
+ private volatile Cache cache = null;
+
+ private SparkExecutorCache() {
+ this.keysByExecutionId = Collections.synchronizedMap(Maps.newHashMap());
+ }
+
+ public static SparkExecutorCache getOrCreate() {
+ if (instance == null && CACHE_ENABLED) {
+ synchronized (SparkExecutorCache.class) {
+ if (instance == null) {
+ SparkExecutorCache.instance = new SparkExecutorCache();
+ }
+ }
+ }
+
+ return instance;
+ }
+
+ public static void cleanUp(String executionId) {
+ if (instance != null) {
+ instance.invalidate(executionId);
+ }
+ }
+
+ public long maxEntrySize() {
+ return MAX_ENTRY_SIZE;
+ }
+
+ public V get(String executionId, String key, Supplier valueSupplier, long valueSize) {
+ if (valueSize > MAX_ENTRY_SIZE) {
+ return valueSupplier.get();
+ }
+
+ Collection keys =
+ keysByExecutionId.computeIfAbsent(executionId, id -> Queues.newConcurrentLinkedQueue());
+ keys.add(key);
+
+ return get(key, valueSupplier, valueSize);
+ }
+
+ private V get(String key, Supplier valueSupplier, long valueSize) {
+ CacheValue value = cache().get(key, ignored -> new CacheValue(valueSupplier, valueSize));
+ Preconditions.checkNotNull(value, "Loaded value must not be null");
+ return value.get();
+ }
+
+ public void invalidate(String executionId) {
+ if (cache != null) {
+ LOG.info("Invalidating execution ID {} ({})", executionId, EXECUTOR_DESC);
+ Collection keys = keysByExecutionId.get(executionId);
+ if (keys != null) {
+ for (String key : keys) {
+ cache().invalidate(key);
+ }
+ }
+ }
+ }
+
+ private Cache cache() {
+ if (cache == null) {
+ synchronized (this) {
+ if (cache == null) {
+ LOG.info("Initializing cache state ({})", EXECUTOR_DESC);
+ this.cache = initCache();
+ }
+ }
+ }
+
+ return cache;
+ }
+
+ private Cache initCache() {
+ return Caffeine.newBuilder()
+ .expireAfterAccess(TIMEOUT)
+ .maximumWeight(MAX_TOTAL_SIZE)
+ .weigher((key, value) -> ((CacheValue) value).weight())
+ .recordStats()
+ .removalListener(
+ (key, value, cause) -> LOG.info("Evicted {} ({}) ({})", key, EXECUTOR_DESC, cause))
+ .build();
+ }
+
+ private static boolean parseCacheEnabledConf() {
+ return CONF_PARSER
+ .booleanConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_ENABLED)
+ .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_ENABLED_DEFAULT)
+ .parse();
+ }
+
+ private static Duration parseTimeoutConf() {
+ return CONF_PARSER
+ .durationConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT)
+ .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT_DEFAULT)
+ .parse();
+ }
+
+ private static long parseMaxEntrySizeConf() {
+ return CONF_PARSER
+ .longConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE)
+ .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT)
+ .parse();
+ }
+
+ private static long parseMaxTotalSizeConf() {
+ return CONF_PARSER
+ .longConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE)
+ .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT)
+ .parse();
+ }
+
+ private static class CacheValue {
+ private final Object value;
+ private final long size;
+
+ CacheValue(Supplier> valueSupplier, long size) {
+ this.value = valueSupplier.get();
+ this.size = size;
+ }
+
+ @SuppressWarnings("unchecked")
+ public V get() {
+ return (V) value;
+ }
+
+ public int weight() {
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
+ }
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index bca41b4155ed..4a665202317b 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark;
+import java.time.Duration;
+
public class SparkSQLProperties {
private SparkSQLProperties() {}
@@ -70,4 +72,18 @@ private SparkSQLProperties() {}
// Controls whether to report locality information to Spark while allocating input partitions
public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";
+
+ public static final String EXECUTOR_CACHE_ENABLED = "spark.sql.iceberg.executor-cache.enabled";
+ public static final boolean EXECUTOR_CACHE_ENABLED_DEFAULT = true;
+
+ public static final String EXECUTOR_CACHE_TIMEOUT = "spark.sql.iceberg.executor-cache.timeout";
+ public static final Duration EXECUTOR_CACHE_TIMEOUT_DEFAULT = Duration.ofMinutes(10);
+
+ public static final String EXECUTOR_CACHE_MAX_ENTRY_SIZE =
+ "spark.sql.iceberg.executor-cache.max-entry-size";
+ public static final long EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT = 64 * 1024 * 1024; // 64 MB
+
+ public static final String EXECUTOR_CACHE_MAX_TOTAL_SIZE =
+ "spark.sql.iceberg.executor-cache.max-total-size";
+ public static final long EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 2357ca0441fc..eb2de9497df8 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -34,12 +34,16 @@
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.UnknownTransform;
import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.TaskContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.BoundReference;
import org.apache.spark.sql.catalyst.expressions.EqualTo;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
@@ -238,4 +242,19 @@ public static String toColumnName(NamedReference ref) {
public static boolean caseSensitive(SparkSession spark) {
return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
}
+
+ public static String executionId() {
+ TaskContext taskContext = TaskContext.get();
+ if (taskContext != null) {
+ return taskContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY());
+ } else {
+ SparkContext sparkContext = SparkSession.active().sparkContext();
+ return sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY());
+ }
+ }
+
+ public static String executorDesc() {
+ String executorId = SparkEnv.get().executorId();
+ return executorId.equals("driver") ? executorId : "executor " + executorId;
+ }
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 4fb838202c88..8b1f55341110 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -25,6 +25,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
@@ -40,7 +42,9 @@
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.BaseDeleteLoader;
import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
@@ -50,7 +54,9 @@
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExecutorCache;
import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
@@ -279,5 +285,31 @@ protected void markRowDeleted(InternalRow row) {
counter().increment();
}
}
+
+ @Override
+ protected DeleteLoader newDeleteLoader() {
+ return new CachingDeleteLoader(this::loadInputFile);
+ }
+
+ private class CachingDeleteLoader extends BaseDeleteLoader {
+ private final SparkExecutorCache cache;
+ private final String executionId;
+
+ CachingDeleteLoader(Function loadInputFile) {
+ super(loadInputFile);
+ this.cache = SparkExecutorCache.getOrCreate();
+ this.executionId = SparkUtil.executionId();
+ }
+
+ @Override
+ protected boolean canCache(long size) {
+ return cache != null && size < cache.maxEntrySize();
+ }
+
+ @Override
+ protected V get(String key, Supplier valueSupplier, long valueSize) {
+ return cache.get(executionId, key, valueSupplier, valueSize);
+ }
+ }
}
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
index 65df29051c8c..93f9821e03fa 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
@@ -21,6 +21,8 @@
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkExecutorCache;
+import org.apache.iceberg.spark.SparkUtil;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,10 +43,12 @@ public class SerializableTableWithSize extends SerializableTable
private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
private static final long SIZE_ESTIMATE = 32_768L;
+ private final String executionId;
private final transient Object serializationMarker;
protected SerializableTableWithSize(Table table) {
super(table);
+ this.executionId = SparkUtil.executionId();
this.serializationMarker = new Object();
}
@@ -67,6 +71,7 @@ public void close() throws Exception {
LOG.info("Releasing resources");
io().close();
}
+ SparkExecutorCache.cleanUp(executionId);
}
public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
@@ -75,10 +80,12 @@ public static class SerializableMetadataTableWithSize extends SerializableMetada
private static final Logger LOG =
LoggerFactory.getLogger(SerializableMetadataTableWithSize.class);
+ private final String executionId;
private final transient Object serializationMarker;
protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) {
super(metadataTable);
+ this.executionId = SparkUtil.executionId();
this.serializationMarker = new Object();
}
@@ -93,6 +100,7 @@ public void close() throws Exception {
LOG.info("Releasing resources");
io().close();
}
+ SparkExecutorCache.cleanUp(executionId);
}
}
}