From 5e0392b5bc906e4d74b93a509ba0d051787d074c Mon Sep 17 00:00:00 2001 From: aokolnychyi Date: Sun, 12 Nov 2023 19:34:09 -0800 Subject: [PATCH] API, Core, Spark 3.5: Parallelize reading of deletes and cache them on executors --- .../org/apache/iceberg/types/TypeUtil.java | 64 ++++ .../org/apache/iceberg/SystemConfigs.java | 6 +- .../deletes/BitmapPositionDeleteIndex.java | 4 + .../org/apache/iceberg/deletes/Deletes.java | 30 ++ .../deletes/EmptyPositionDeleteIndex.java | 55 +++ .../iceberg/deletes/PositionDeleteIndex.java | 10 + .../deletes/PositionDeleteIndexUtil.java | 42 +++ .../org/apache/iceberg/util/ThreadPools.java | 5 +- .../apache/iceberg/data/BaseDeleteLoader.java | 261 +++++++++++++ .../org/apache/iceberg/data/DeleteFilter.java | 107 ++---- .../org/apache/iceberg/data/DeleteLoader.java | 45 +++ spark/v3.5/build.gradle | 2 + .../apache/iceberg/spark/SparkConfParser.java | 41 ++ .../iceberg/spark/SparkExecutorCache.java | 228 ++++++++++++ .../iceberg/spark/SparkSQLProperties.java | 16 + .../org/apache/iceberg/spark/SparkUtil.java | 6 + .../iceberg/spark/source/BaseReader.java | 29 ++ .../source/SerializableTableWithSize.java | 10 + .../org/apache/iceberg/spark/Employee.java | 66 ++++ .../iceberg/spark/TestSparkExecutorCache.java | 351 ++++++++++++++++++ .../iceberg/spark/TestSparkWriteConf.java | 23 ++ 21 files changed, 1314 insertions(+), 87 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java create mode 100644 core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java create mode 100644 data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java create mode 100644 data/src/main/java/org/apache/iceberg/data/DeleteLoader.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index d2f821ea0021..8ea7a063dcd4 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -40,6 +40,8 @@ public class TypeUtil { + private static final long HEADER_SIZE = 12L; + private TypeUtil() {} /** @@ -452,6 +454,68 @@ private static void checkSchemaCompatibility( } } + /** + * Estimates the number of bytes a value for a given field may occupy in memory. + * + *

This method approximates the memory size based on heuristics and the internal Java + * representation defined by {@link Type.TypeID}. It is important to note that the actual size + * might differ from this estimation. The method is designed to handle a variety of data types, + * including primitive types, strings, and nested types such as structs, maps, and lists. + * + * @param field a field for which to estimate the size + * @return the estimated size in bytes of the field's value in memory + */ + public static long estimateSize(Types.NestedField field) { + return estimateSize(field.type()); + } + + private static long estimateSize(Type type) { + switch (type.typeId()) { + case BOOLEAN: + // the size of a boolean variable is virtual machine dependent + // it is common to believe booleans occupy 1 byte in most JVMs + return 1; + case INTEGER: + case FLOAT: + case DATE: + // ints and floats occupy 4 bytes + // dates are internally represented as ints + return 4; + case LONG: + case DOUBLE: + case TIME: + case TIMESTAMP: + // longs and doubles occupy 8 bytes + // times and timestamps are internally represented as longs + return 8; + case STRING: + // 12 (header) + 6 (fields) + 16 (array overhead) + 20 (10 chars, 2 bytes each) = 54 bytes + return 54; + case UUID: + // 12 (header) + 16 (two long variables) = 28 bytes + return 28; + case FIXED: + return ((Types.FixedType) type).length(); + case BINARY: + return 100; + case DECIMAL: + // 12 (header) + (12 + 12 + 4) (BigInteger) + 4 (scale) = 44 bytes + return 44; + case STRUCT: + Types.StructType struct = (Types.StructType) type; + return HEADER_SIZE + struct.fields().stream().mapToLong(TypeUtil::estimateSize).sum(); + case LIST: + Types.ListType list = (Types.ListType) type; + return HEADER_SIZE + 5 * estimateSize(list.elementType()); + case MAP: + Types.MapType map = (Types.MapType) type; + long entrySize = HEADER_SIZE + estimateSize(map.keyType()) + estimateSize(map.valueType()); + return HEADER_SIZE + 5 * entrySize; + default: + return 16; + } + } + /** Interface for passing a function that assigns column IDs. */ public interface NextID { int get(); diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java b/core/src/main/java/org/apache/iceberg/SystemConfigs.java index ce183ec5d446..feac1f61a13e 100644 --- a/core/src/main/java/org/apache/iceberg/SystemConfigs.java +++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java @@ -43,14 +43,14 @@ private SystemConfigs() {} Integer::parseUnsignedInt); /** - * Sets the size of the delete worker pool. This limits the number of threads used to compute the - * PositionDeleteIndex from the position deletes for a data file. + * Sets the size of the delete worker pool. This limits the number of threads used to read delete + * files for a data file. */ public static final ConfigEntry DELETE_WORKER_THREAD_POOL_SIZE = new ConfigEntry<>( "iceberg.worker.delete-num-threads", "ICEBERG_WORKER_DELETE_NUM_THREADS", - Math.max(2, Runtime.getRuntime().availableProcessors()), + Math.max(2, 4 * Runtime.getRuntime().availableProcessors()), Integer::parseUnsignedInt); /** Whether to use the shared worker pool when planning table scans. */ diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java index 7690ab7e4879..97699c1c9113 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java +++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java @@ -27,6 +27,10 @@ class BitmapPositionDeleteIndex implements PositionDeleteIndex { roaring64Bitmap = new Roaring64Bitmap(); } + void merge(BitmapPositionDeleteIndex that) { + roaring64Bitmap.or(that.roaring64Bitmap); + } + @Override public void delete(long position) { roaring64Bitmap.add(position); diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 118987e24b23..ff20ba53ff70 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -37,6 +37,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceMap; import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.SortedMerge; @@ -128,6 +129,35 @@ public static StructLikeSet toEqualitySet( } } + /** + * Builds a map of position delete indexes by path. + * + *

Unlike {@link #toPositionIndex(CharSequence, List)}, this method builds a position delete + * index for each referenced data file and does not filter deletes. This can be useful when the + * entire delete file content is needed (e.g. caching). + * + * @param posDeletes position deletes + * @return the map of position delete indexes by path + */ + public static CharSequenceMap toPositionIndexes( + CloseableIterable posDeletes) { + CharSequenceMap indexes = CharSequenceMap.create(); + + try (CloseableIterable deletes = posDeletes) { + for (T delete : deletes) { + CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete); + long position = (long) POSITION_ACCESSOR.get(delete); + PositionDeleteIndex index = + indexes.computeIfAbsent(filePath, key -> new BitmapPositionDeleteIndex()); + index.delete(position); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to close position delete source", e); + } + + return indexes; + } + public static PositionDeleteIndex toPositionIndex( CharSequence dataLocation, List> deleteFiles) { return toPositionIndex(dataLocation, deleteFiles, ThreadPools.getDeleteWorkerPool()); diff --git a/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java new file mode 100644 index 000000000000..660e01038cff --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +class EmptyPositionDeleteIndex implements PositionDeleteIndex { + + private static final EmptyPositionDeleteIndex INSTANCE = new EmptyPositionDeleteIndex(); + + private EmptyPositionDeleteIndex() {} + + static EmptyPositionDeleteIndex get() { + return INSTANCE; + } + + @Override + public void delete(long position) { + throw new UnsupportedOperationException("Cannot modify " + getClass().getName()); + } + + @Override + public void delete(long posStart, long posEnd) { + throw new UnsupportedOperationException("Cannot modify " + getClass().getName()); + } + + @Override + public boolean isDeleted(long position) { + return false; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public String toString() { + return "PositionDeleteIndex{}"; + } +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java index bcfa9f2cf5ff..be05875aeb2a 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java @@ -44,4 +44,14 @@ public interface PositionDeleteIndex { /** Returns true if this collection contains no element. */ boolean isEmpty(); + + /** Returns true if this collection contains elements. */ + default boolean isNotEmpty() { + return !isEmpty(); + } + + /** Returns an empty immutable position delete index. */ + static PositionDeleteIndex empty() { + return EmptyPositionDeleteIndex.get(); + } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java new file mode 100644 index 000000000000..0c3bff28ee6b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class PositionDeleteIndexUtil { + + private PositionDeleteIndexUtil() {} + + public static PositionDeleteIndex merge(Iterable indexes) { + BitmapPositionDeleteIndex result = new BitmapPositionDeleteIndex(); + + for (PositionDeleteIndex index : indexes) { + if (index.isNotEmpty()) { + Preconditions.checkArgument( + index instanceof BitmapPositionDeleteIndex, + "Can merge only bitmap-based indexes, got %s", + index.getClass().getName()); + result.merge((BitmapPositionDeleteIndex) index); + } + } + + return result; + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java index 4f2314fec94d..ced121c03c63 100644 --- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java +++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java @@ -68,8 +68,9 @@ public static ExecutorService getWorkerPool() { /** * Return an {@link ExecutorService} that uses the "delete worker" thread-pool. * - *

The size of the delete worker pool limits the number of threads used to compute the - * PositionDeleteIndex from the position deletes for a data file. + *

The size of this worker pool limits the number of tasks concurrently reading delete files + * within a single JVM. If there are multiple threads loading deletes, all of them will share this + * worker pool by default. * *

The size of this thread-pool is controlled by the Java system property {@code * iceberg.worker.delete-num-threads}. diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java new file mode 100644 index 000000000000..4c4b6b55ba96 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.deletes.PositionDeleteIndexUtil; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.orc.OrcRowReader; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.math.LongMath; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.CharSequenceMap; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.orc.TypeDescription; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseDeleteLoader implements DeleteLoader { + + private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteLoader.class); + private static final Schema POS_DELETE_SCHEMA = DeleteSchemaUtil.pathPosSchema(); + + private final Function loadInputFile; + private final ExecutorService workerPool; + + public BaseDeleteLoader(Function loadInputFile) { + this(loadInputFile, ThreadPools.getDeleteWorkerPool()); + } + + public BaseDeleteLoader( + Function loadInputFile, ExecutorService workerPool) { + this.loadInputFile = loadInputFile; + this.workerPool = workerPool; + } + + /** + * Checks if the given number of bytes can be cached. + * + *

Implementations should override this method if they support caching. It is also recommended + * to use the provided size as a guideline to decide whether the value is eligible for caching. + * For instance, it may be beneficial to discard values that are too large to optimize the cache + * performance and utilization. + */ + protected boolean canCache(long size) { + return false; + } + + /** + * Gets the cached value for the key or populates the cache with a new mapping. + * + *

If the value for the specified key is in the cache, it should be returned. If the value is + * not in the cache, implementations should compute the value using the provided supplier, cache + * it, and then return it. + * + *

This method will be called only if {@link #canCache(long)} returned true. + */ + protected V getOrLoad(String key, Supplier valueSupplier, long valueSize) { + throw new UnsupportedOperationException(getClass().getName() + " does not support caching"); + } + + @Override + public StructLikeSet loadEqualityDeletes(Iterable deleteFiles, Schema projection) { + Iterable> deletes = + execute(deleteFiles, deleteFile -> getOrReadEqDeletes(deleteFile, projection)); + StructLikeSet deleteSet = StructLikeSet.create(projection.asStruct()); + Iterables.addAll(deleteSet, Iterables.concat(deletes)); + return deleteSet; + } + + private Iterable getOrReadEqDeletes(DeleteFile deleteFile, Schema projection) { + long estimatedSize = estimateEqDeletesSize(deleteFile, projection); + if (canCache(estimatedSize)) { + String cacheKey = deleteFile.path().toString(); + return getOrLoad(cacheKey, () -> readEqDeletes(deleteFile, projection), estimatedSize); + } else { + return readEqDeletes(deleteFile, projection); + } + } + + private Iterable readEqDeletes(DeleteFile deleteFile, Schema projection) { + CloseableIterable deletes = openDeletes(deleteFile, projection); + CloseableIterable copiedDeletes = CloseableIterable.transform(deletes, Record::copy); + CloseableIterable copiedDeletesAsStructs = toStructs(copiedDeletes, projection); + return materialize(copiedDeletesAsStructs); + } + + private CloseableIterable toStructs( + CloseableIterable records, Schema schema) { + InternalRecordWrapper wrapper = new InternalRecordWrapper(schema.asStruct()); + return CloseableIterable.transform(records, wrapper::copyFor); + } + + // materializes the iterable and releases resources so that the result can be cached + private Iterable materialize(CloseableIterable iterable) { + try (CloseableIterable closeableIterable = iterable) { + return ImmutableList.copyOf(closeableIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close iterable", e); + } + } + + @Override + public PositionDeleteIndex loadPositionDeletes( + Iterable deleteFiles, CharSequence filePath) { + Iterable deletes = + execute(deleteFiles, deleteFile -> getOrReadPosDeletes(deleteFile, filePath)); + return PositionDeleteIndexUtil.merge(deletes); + } + + private PositionDeleteIndex getOrReadPosDeletes(DeleteFile deleteFile, CharSequence filePath) { + long estimatedSize = estimatePosDeletesSize(deleteFile); + if (canCache(estimatedSize)) { + String cacheKey = deleteFile.path().toString(); + CharSequenceMap indexes = + getOrLoad(cacheKey, () -> readPosDeletes(deleteFile), estimatedSize); + return indexes.getOrDefault(filePath, PositionDeleteIndex.empty()); + } else { + return readPosDeletes(deleteFile, filePath); + } + } + + private CharSequenceMap readPosDeletes(DeleteFile deleteFile) { + CloseableIterable deletes = openDeletes(deleteFile, POS_DELETE_SCHEMA); + return Deletes.toPositionIndexes(deletes); + } + + private PositionDeleteIndex readPosDeletes(DeleteFile deleteFile, CharSequence filePath) { + Expression filter = Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath); + CloseableIterable deletes = openDeletes(deleteFile, POS_DELETE_SCHEMA, filter); + return Deletes.toPositionIndex(filePath, ImmutableList.of(deletes)); + } + + private CloseableIterable openDeletes(DeleteFile deleteFile, Schema projection) { + return openDeletes(deleteFile, projection, null /* no filter */); + } + + private CloseableIterable openDeletes( + DeleteFile deleteFile, Schema projection, Expression filter) { + + FileFormat format = deleteFile.format(); + LOG.trace("Opening delete file {}", deleteFile.path()); + InputFile inputFile = loadInputFile.apply(deleteFile); + + switch (format) { + case AVRO: + return Avro.read(inputFile) + .project(projection) + .reuseContainers() + .createReaderFunc(DataReader::create) + .build(); + + case PARQUET: + return Parquet.read(inputFile) + .project(projection) + .filter(filter) + .reuseContainers() + .createReaderFunc(newParquetReaderFunc(projection)) + .build(); + + case ORC: + // reusing containers is automatic for ORC, no need to call 'reuseContainers' + return ORC.read(inputFile) + .project(projection) + .filter(filter) + .createReaderFunc(newOrcReaderFunc(projection)) + .build(); + + default: + throw new UnsupportedOperationException( + String.format( + "Cannot read deletes, %s is not a supported file format: %s", + format.name(), inputFile.location())); + } + } + + private Function> newParquetReaderFunc(Schema projection) { + return fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema); + } + + private Function> newOrcReaderFunc(Schema projection) { + return fileSchema -> GenericOrcReader.buildReader(projection, fileSchema); + } + + private Iterable execute(Iterable objects, Function func) { + Queue output = new ConcurrentLinkedQueue<>(); + + Tasks.foreach(objects) + .executeWith(workerPool) + .stopOnFailure() + .onFailure((object, exc) -> LOG.error("Failed to process {}", object, exc)) + .run(object -> output.add(func.apply(object))); + + return output; + } + + // estimates the memory required to cache position deletes (in bytes) + private long estimatePosDeletesSize(DeleteFile deleteFile) { + // the space consumption highly depends on the nature of deleted positions (sparse vs compact) + // testing shows Roaring bitmaps require around 8 bits (1 byte) per value on average + return deleteFile.recordCount(); + } + + // estimates the memory required to cache equality deletes (in bytes) + private long estimateEqDeletesSize(DeleteFile deleteFile, Schema projection) { + try { + long recordSize = estimateRecordSize(projection); + long recordCount = deleteFile.recordCount(); + return LongMath.checkedMultiply(recordSize, recordCount); + } catch (ArithmeticException e) { + return Long.MAX_VALUE; + } + } + + private long estimateRecordSize(Schema schema) { + return schema.columns().stream().mapToLong(TypeUtil::estimateSize).sum(); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 55acd3200894..e7d8445cf8c8 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -25,25 +25,16 @@ import java.util.function.Predicate; import org.apache.iceberg.Accessor; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileContent; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.DataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; -import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Multimap; @@ -58,8 +49,6 @@ public abstract class DeleteFilter { private static final Logger LOG = LoggerFactory.getLogger(DeleteFilter.class); - private static final Schema POS_DELETE_SCHEMA = - new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); private final String filePath; private final List posDeletes; @@ -70,6 +59,7 @@ public abstract class DeleteFilter { private final int isDeletedColumnPosition; private final DeleteCounter counter; + private volatile DeleteLoader deleteLoader = null; private PositionDeleteIndex deleteRowPositions = null; private List> isInDeleteSets = null; private Predicate eqDeleteRows = null; @@ -143,10 +133,30 @@ Accessor posAccessor() { protected abstract InputFile getInputFile(String location); + protected InputFile loadInputFile(DeleteFile deleteFile) { + return getInputFile(deleteFile.path().toString()); + } + protected long pos(T record) { return (Long) posAccessor.get(asStructLike(record)); } + protected DeleteLoader newDeleteLoader() { + return new BaseDeleteLoader(this::loadInputFile); + } + + private DeleteLoader deleteLoader() { + if (deleteLoader == null) { + synchronized (this) { + if (deleteLoader == null) { + this.deleteLoader = newDeleteLoader(); + } + } + } + + return deleteLoader; + } + public CloseableIterable filter(CloseableIterable records) { return applyEqDeletes(applyPosDeletes(records)); } @@ -173,22 +183,11 @@ private List> applyEqDeletes() { Iterable deletes = entry.getValue(); Schema deleteSchema = TypeUtil.select(requiredSchema, ids); - InternalRecordWrapper wrapper = new InternalRecordWrapper(deleteSchema.asStruct()); // a projection to select and reorder fields of the file schema to match the delete rows StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema); - Iterable> deleteRecords = - Iterables.transform(deletes, delete -> openDeletes(delete, deleteSchema)); - - // copy the delete records because they will be held in a set - CloseableIterable records = - CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy); - - StructLikeSet deleteSet = - Deletes.toEqualitySet( - CloseableIterable.transform(records, wrapper::copyFor), deleteSchema.asStruct()); - + StructLikeSet deleteSet = deleteLoader().loadEqualityDeletes(deletes, deleteSchema); Predicate isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); isInDeleteSets.add(isInDeleteSet); @@ -224,14 +223,10 @@ public Predicate eqDeletedRowFilter() { } public PositionDeleteIndex deletedRowPositions() { - if (posDeletes.isEmpty()) { - return null; + if (deleteRowPositions == null && !posDeletes.isEmpty()) { + this.deleteRowPositions = deleteLoader().loadPositionDeletes(posDeletes, filePath); } - if (deleteRowPositions == null) { - List> deletes = Lists.transform(posDeletes, this::openPosDeletes); - deleteRowPositions = Deletes.toPositionIndex(filePath, deletes); - } return deleteRowPositions; } @@ -240,9 +235,7 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { return records; } - List> deletes = Lists.transform(posDeletes, this::openPosDeletes); - - PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, deletes); + PositionDeleteIndex positionIndex = deletedRowPositions(); Predicate isDeleted = record -> positionIndex.isDeleted(pos(record)); return createDeleteIterable(records, isDeleted); } @@ -254,56 +247,6 @@ private CloseableIterable createDeleteIterable( : Deletes.filterDeleted(records, isDeleted, counter); } - private CloseableIterable openPosDeletes(DeleteFile file) { - return openDeletes(file, POS_DELETE_SCHEMA); - } - - private CloseableIterable openDeletes(DeleteFile deleteFile, Schema deleteSchema) { - LOG.trace("Opening delete file {}", deleteFile.path()); - InputFile input = getInputFile(deleteFile.path().toString()); - switch (deleteFile.format()) { - case AVRO: - return Avro.read(input) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc(DataReader::create) - .build(); - - case PARQUET: - Parquet.ReadBuilder builder = - Parquet.read(input) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)); - - if (deleteFile.content() == FileContent.POSITION_DELETES) { - builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath)); - } - - return builder.build(); - - case ORC: - // Reusing containers is automatic for ORC. No need to set 'reuseContainers' here. - ORC.ReadBuilder orcBuilder = - ORC.read(input) - .project(deleteSchema) - .createReaderFunc( - fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)); - - if (deleteFile.content() == FileContent.POSITION_DELETES) { - orcBuilder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath)); - } - - return orcBuilder.build(); - default: - throw new UnsupportedOperationException( - String.format( - "Cannot read deletes, %s is not a supported format: %s", - deleteFile.format().name(), deleteFile.path())); - } - } - private static Schema fileProjection( Schema tableSchema, Schema requestedSchema, diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java new file mode 100644 index 000000000000..07bdce6d836f --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.util.StructLikeSet; + +/** An API for loading delete file content into in-memory data structures. */ +public interface DeleteLoader { + /** + * Loads the content of equality delete files into a set. + * + * @param deleteFiles equality delete files + * @param projection a projection of columns to load + * @return a set of equality deletes + */ + StructLikeSet loadEqualityDeletes(Iterable deleteFiles, Schema projection); + + /** + * Loads the content of position delete files for a given data file path into a position index. + * + * @param deleteFiles position delete files + * @param filePath the data file path for which to load deletes + * @return a position delete index for the provided data file path + */ + PositionDeleteIndex loadPositionDeletes(Iterable deleteFiles, CharSequence filePath); +} diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index 2c58281904c9..eeef75be6334 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -89,6 +89,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + implementation libs.caffeine + testImplementation(libs.hadoop2.minicluster) { exclude group: 'org.apache.avro', module: 'avro' // to make sure netty libs only come from project(':iceberg-arrow') diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index d86246b1e616..e3b01b8375b6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -18,15 +18,19 @@ */ package org.apache.iceberg.spark; +import java.time.Duration; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.function.Function; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.sql.RuntimeConfig; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; class SparkConfParser { @@ -34,6 +38,12 @@ class SparkConfParser { private final RuntimeConfig sessionConf; private final Map options; + SparkConfParser() { + this.properties = ImmutableMap.of(); + this.sessionConf = new RuntimeConfig(SQLConf.get()); + this.options = ImmutableMap.of(); + } + SparkConfParser(SparkSession spark, Table table, Map options) { this.properties = table.properties(); this.sessionConf = spark.conf(); @@ -56,6 +66,10 @@ public StringConfParser stringConf() { return new StringConfParser(); } + public DurationConfParser durationConf() { + return new DurationConfParser(); + } + class BooleanConfParser extends ConfParser { private Boolean defaultValue; private boolean negate = false; @@ -156,6 +170,33 @@ public String parseOptional() { } } + class DurationConfParser extends ConfParser { + private Duration defaultValue; + + @Override + protected DurationConfParser self() { + return this; + } + + public DurationConfParser defaultValue(Duration value) { + this.defaultValue = value; + return self(); + } + + public Duration parse() { + Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); + return parse(this::toDuration, defaultValue); + } + + public Duration parseOptional() { + return parse(this::toDuration, defaultValue); + } + + private Duration toDuration(String time) { + return Duration.ofSeconds(JavaUtils.timeStringAsSec(time)); + } + } + abstract class ConfParser { private final List optionNames = Lists.newArrayList(); private String sessionConfName; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java new file mode 100644 index 000000000000..6d20ede7af37 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.time.Duration; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An executor cache for reducing the computation and IO overhead in tasks. + * + *

The cache is configured and controlled through Spark SQL properties. It supports both limits + * on the total cache size and maximum size for individual entries. Additionally, it implements + * automatic eviction of entries after a specified duration of inactivity. The cache will respect + * the SQL configuration valid at the time of initialization. All subsequent changes to the + * configuration will have no effect. + * + *

The cache is accessed and populated via {@link #getOrLoad(String, String, Supplier, long)}. If + * the value is not present in the cache, it is computed using the provided supplier and stored in + * the cache, subject to the defined size constraints. When a key is added, it must be associated + * with a particular group ID. Once a group is no longer needed, it is recommended to explicitly + * invalidate its state by calling {@link #invalidate(String)} instead of relying on automatic + * eviction. + * + *

Note that this class employs the singleton pattern to ensure only one cache exists per JVM. + */ +public class SparkExecutorCache { + + private static final Logger LOG = LoggerFactory.getLogger(SparkExecutorCache.class); + + private static final SparkConfParser CONF_PARSER = new SparkConfParser(); + private static final boolean CACHE_ENABLED = parseCacheEnabledConf(); + private static final Duration TIMEOUT = parseTimeoutConf(); + private static final long MAX_ENTRY_SIZE = parseMaxEntrySizeConf(); + private static final long MAX_TOTAL_SIZE = parseMaxTotalSizeConf(); + private static final String EXECUTOR_DESC = SparkUtil.executorDesc(); + + private static volatile SparkExecutorCache instance = null; + + private volatile Cache state; + + private SparkExecutorCache() {} + + /** + * Returns the cache if created or creates it. + * + *

Note this method returns null if caching is disabled. + */ + public static SparkExecutorCache getOrCreate() { + if (instance == null && CACHE_ENABLED) { + synchronized (SparkExecutorCache.class) { + if (instance == null) { + SparkExecutorCache.instance = new SparkExecutorCache(); + } + } + } + + return instance; + } + + /** Returns the cache if already created or null otherwise. */ + public static SparkExecutorCache get() { + return instance; + } + + /** Returns the max entry size in bytes that will be considered for caching. */ + public long maxEntrySize() { + return MAX_ENTRY_SIZE; + } + + /** Returns the period of inactivity after which cache entries are evicted. */ + public Duration timeout() { + return TIMEOUT; + } + + /** Returns the max total size of this cache in bytes. */ + public long maxTotalSize() { + return MAX_TOTAL_SIZE; + } + + /** + * Gets the cached value for the key or populates the cache with a new mapping. + * + * @param group a group ID + * @param key a cache key + * @param valueSupplier a supplier to compute the value + * @param valueSize an estimated memory size of the value in bytes + * @return the cached or computed value + */ + public V getOrLoad(String group, String key, Supplier valueSupplier, long valueSize) { + if (valueSize > MAX_ENTRY_SIZE) { + return valueSupplier.get(); + } + + String internalKey = group + "_" + key; + CacheValue value = state().get(internalKey, loadFunc(valueSupplier, valueSize)); + Preconditions.checkNotNull(value, "Loaded value must not be null"); + return value.get(); + } + + private Function loadFunc(Supplier valueSupplier, long valueSize) { + return key -> { + long start = System.currentTimeMillis(); + V value = valueSupplier.get(); + long end = System.currentTimeMillis(); + LOG.info("Loaded value for {} with size {} in {} ms", key, valueSize, (end - start)); + return new CacheValue(value, valueSize); + }; + } + + /** + * Invalidates all keys associated with the given group ID. + * + * @param group a group ID + */ + public void invalidate(String group) { + if (state != null) { + LOG.info("Invalidating all keys associated with {} ({})", group, EXECUTOR_DESC); + for (String key : findKeys(group)) { + state().invalidate(key); + } + } + } + + private List findKeys(String group) { + return state.asMap().keySet().stream() + .filter(internalKey -> internalKey.startsWith(group)) + .collect(Collectors.toList()); + } + + private Cache state() { + if (state == null) { + synchronized (this) { + if (state == null) { + LOG.info("Initializing cache state ({})", EXECUTOR_DESC); + this.state = initState(); + } + } + } + + return state; + } + + private Cache initState() { + return Caffeine.newBuilder() + .expireAfterAccess(TIMEOUT) + .maximumWeight(MAX_TOTAL_SIZE) + .weigher((key, value) -> ((CacheValue) value).weight()) + .recordStats() + .removalListener( + (key, value, cause) -> LOG.info("Evicted {} ({}) ({})", key, EXECUTOR_DESC, cause)) + .build(); + } + + private static boolean parseCacheEnabledConf() { + return CONF_PARSER + .booleanConf() + .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_ENABLED) + .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_ENABLED_DEFAULT) + .parse(); + } + + private static Duration parseTimeoutConf() { + return CONF_PARSER + .durationConf() + .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT) + .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT_DEFAULT) + .parse(); + } + + private static long parseMaxEntrySizeConf() { + return CONF_PARSER + .longConf() + .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE) + .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT) + .parse(); + } + + private static long parseMaxTotalSizeConf() { + return CONF_PARSER + .longConf() + .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE) + .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT) + .parse(); + } + + private static class CacheValue { + private final Object value; + private final long size; + + CacheValue(Object value, long size) { + this.value = value; + this.size = size; + } + + @SuppressWarnings("unchecked") + public V get() { + return (V) value; + } + + public int weight() { + return (int) Math.min(size, Integer.MAX_VALUE); + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index bca41b4155ed..4a665202317b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark; +import java.time.Duration; + public class SparkSQLProperties { private SparkSQLProperties() {} @@ -70,4 +72,18 @@ private SparkSQLProperties() {} // Controls whether to report locality information to Spark while allocating input partitions public static final String LOCALITY = "spark.sql.iceberg.locality.enabled"; + + public static final String EXECUTOR_CACHE_ENABLED = "spark.sql.iceberg.executor-cache.enabled"; + public static final boolean EXECUTOR_CACHE_ENABLED_DEFAULT = true; + + public static final String EXECUTOR_CACHE_TIMEOUT = "spark.sql.iceberg.executor-cache.timeout"; + public static final Duration EXECUTOR_CACHE_TIMEOUT_DEFAULT = Duration.ofMinutes(10); + + public static final String EXECUTOR_CACHE_MAX_ENTRY_SIZE = + "spark.sql.iceberg.executor-cache.max-entry-size"; + public static final long EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT = 64 * 1024 * 1024; // 64 MB + + public static final String EXECUTOR_CACHE_MAX_TOTAL_SIZE = + "spark.sql.iceberg.executor-cache.max-total-size"; + public static final long EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java index 2357ca0441fc..9355da74661a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java @@ -34,6 +34,7 @@ import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.UnknownTransform; import org.apache.iceberg.util.Pair; +import org.apache.spark.SparkEnv; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.expressions.BoundReference; import org.apache.spark.sql.catalyst.expressions.EqualTo; @@ -238,4 +239,9 @@ public static String toColumnName(NamedReference ref) { public static boolean caseSensitive(SparkSession spark) { return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); } + + public static String executorDesc() { + String executorId = SparkEnv.get().executorId(); + return executorId.equals("driver") ? executorId : "executor " + executorId; + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 4fb838202c88..c2b3e7c2dc56 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -25,6 +25,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.generic.GenericData; @@ -40,7 +42,9 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.BaseDeleteLoader; import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.DeleteLoader; import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; @@ -50,6 +54,7 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkExecutorCache; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.NestedField; @@ -279,5 +284,29 @@ protected void markRowDeleted(InternalRow row) { counter().increment(); } } + + @Override + protected DeleteLoader newDeleteLoader() { + return new CachingDeleteLoader(this::loadInputFile); + } + + private class CachingDeleteLoader extends BaseDeleteLoader { + private final SparkExecutorCache cache; + + CachingDeleteLoader(Function loadInputFile) { + super(loadInputFile); + this.cache = SparkExecutorCache.getOrCreate(); + } + + @Override + protected boolean canCache(long size) { + return cache != null && size < cache.maxEntrySize(); + } + + @Override + protected V getOrLoad(String key, Supplier valueSupplier, long valueSize) { + return cache.getOrLoad(table().name(), key, valueSupplier, valueSize); + } + } } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java index 65df29051c8c..f6913fb9d00d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java @@ -21,6 +21,7 @@ import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; +import org.apache.iceberg.spark.SparkExecutorCache; import org.apache.spark.util.KnownSizeEstimation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,7 @@ public void close() throws Exception { LOG.info("Releasing resources"); io().close(); } + invalidateCache(name()); } public static class SerializableMetadataTableWithSize extends SerializableMetadataTable @@ -93,6 +95,14 @@ public void close() throws Exception { LOG.info("Releasing resources"); io().close(); } + invalidateCache(name()); + } + } + + private static void invalidateCache(String name) { + SparkExecutorCache cache = SparkExecutorCache.get(); + if (cache != null) { + cache.invalidate(name); } } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java new file mode 100644 index 000000000000..9c57936d989e --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Objects; + +public class Employee { + private Integer id; + private String dep; + + public Employee() {} + + public Employee(Integer id, String dep) { + this.id = id; + this.dep = dep; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getDep() { + return dep; + } + + public void setDep(String dep) { + this.dep = dep; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (other == null || getClass() != other.getClass()) { + return false; + } + + Employee employee = (Employee) other; + return Objects.equals(id, employee.id) && Objects.equals(dep, employee.dep); + } + + @Override + public int hashCode() { + return Objects.hash(id, dep); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java new file mode 100644 index 000000000000..74fe0f640251 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE; +import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.benmanes.caffeine.cache.Cache; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Files; +import org.apache.iceberg.RowLevelOperationMode; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +public class TestSparkExecutorCache extends SparkTestBaseWithCatalog { + + public TestSparkExecutorCache() { + super(SparkCatalogConfig.HIVE); + } + + @AfterEach + public void after() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS updated_ids"); + resetCache(); + } + + @Test + public void testConcurrentAccess() throws InterruptedException { + SparkExecutorCache cache = SparkExecutorCache.getOrCreate(); + + int threadCount = 10; + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + + for (int i = 0; i < threadCount; i++) { + String group = i % 2 == 0 ? "table1" : "table2"; + executorService.submit( + () -> { + cache.getOrLoad(group, "key1", () -> "v1", 100); + cache.getOrLoad(group, "key2", () -> "v2", 200); + cache.getOrLoad(group, "key3", () -> "v3", 300); + cache.getOrLoad(group, "key2", () -> "v2", 200); + cache.getOrLoad(group, "key1", () -> "v1", 100); + cache.getOrLoad(group, "key3", () -> "v3", 300); + }); + } + + executorService.shutdown(); + assertThat(executorService.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); + + Cache state = fetchInternalCacheState(); + + // there are 10 threads and each is loading 3 keys twice + // half of the threads load it for one table and half of them for another + // each key must be loaded only once per table + assertThat(state.stats().hitCount()).isEqualTo(54); + assertThat(state.stats().missCount()).isEqualTo(6); + + cache.invalidate("table1"); + cache.invalidate("table2"); + + // all keys must be invalidated + assertThat(state.asMap()).isEmpty(); + } + + @Test + public void testMaxEntrySize() { + SparkExecutorCache cache = SparkExecutorCache.getOrCreate(); + + cache.getOrLoad("table", "key", () -> "v", Long.MAX_VALUE); + + // the cache must not be initialized as the entry was not considered for caching + Cache state = fetchInternalCacheState(); + assertThat(state).isNull(); + } + + @Test + public void testMaxTotalSize() { + SparkExecutorCache cache = SparkExecutorCache.getOrCreate(); + + int entryCount = 0; + long entrySize = 8 * 1024 * 1024; // 8 MB + + while (entryCount * entrySize <= 5 * cache.maxTotalSize()) { + cache.getOrLoad("table", "key" + entryCount, () -> "v", entrySize); + entryCount++; + } + + assertThat(entryCount).isGreaterThan(0); + + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInSameThread() + .untilAsserted( + () -> { + // the cache must evict some entries to make room for others + Cache state = fetchInternalCacheState(); + assertThat(state.stats().evictionCount()).isGreaterThan(0); + }); + } + + @Test + public void testCopyOnWriteDelete() throws Exception { + checkDelete(COPY_ON_WRITE); + } + + @Test + public void testMergeOnReadDelete() throws Exception { + checkDelete(MERGE_ON_READ); + } + + private void checkDelete(RowLevelOperationMode mode) throws Exception { + createAndInitTable(); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.DELETE_MODE, mode.modeName()); + + sql("DELETE FROM %s WHERE id = 1 OR id = 4", tableName); + + // in CoW, the target table will be scanned 2 times (main query + runtime filter) + // in MoR, the target table will be scanned only once + int scanCount = mode == COPY_ON_WRITE ? 2 : 1; + + // require at least 2 hits per scan cause 2 delete files apply to 2 data files + // allow at most 2 misses per scan cause each scan has its own broadcast + Cache state = fetchInternalCacheState(); + assertThat(state.stats().hitCount()).isGreaterThanOrEqualTo(2 * scanCount); + assertThat(state.stats().missCount()).isLessThanOrEqualTo(2 * scanCount); + + // verify the final set of records is correct + assertEquals( + "Should have expected rows", + ImmutableList.of(), + sql("SELECT * FROM %s ORDER BY id ASC", tableName)); + } + + @Test + public void testCopyOnWriteUpdate() throws Exception { + checkUpdate(COPY_ON_WRITE); + } + + @Test + public void testMergeOnReadUpdate() throws Exception { + checkUpdate(MERGE_ON_READ); + } + + private void checkUpdate(RowLevelOperationMode mode) throws Exception { + createAndInitTable(); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.UPDATE_MODE, mode.modeName()); + + Dataset updatedIdDS = spark.createDataset(ImmutableList.of(1, 4), Encoders.INT()); + updatedIdDS.createOrReplaceTempView("updated_ids"); + + sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM updated_ids)", tableName); + + // in CoW, the target table will be scanned 3 times (2 in main query + runtime filter) + // in MoR, the target table will be scanned only once + int scanCount = mode == COPY_ON_WRITE ? 3 : 1; + + // require at least 2 hits per scan cause 2 delete files apply to 2 data files + // allow at most 2 misses per scan cause each scan has its own broadcast + Cache state = fetchInternalCacheState(); + assertThat(state.stats().hitCount()).isGreaterThanOrEqualTo(2 * scanCount); + assertThat(state.stats().missCount()).isLessThanOrEqualTo(2 * scanCount); + + // verify the final set of records is correct + assertEquals( + "Should have expected rows", + ImmutableList.of(row(-1, "hr"), row(-1, "hr")), + sql("SELECT * FROM %s ORDER BY id ASC", tableName)); + } + + @Test + public void testCopyOnWriteMerge() throws Exception { + checkMerge(COPY_ON_WRITE); + } + + @Test + public void testMergeOnReadMerge() throws Exception { + checkMerge(MERGE_ON_READ); + } + + private void checkMerge(RowLevelOperationMode mode) throws Exception { + createAndInitTable(); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.MERGE_MODE, mode.modeName()); + + Dataset updatedIdDS = spark.createDataset(ImmutableList.of(1, 4), Encoders.INT()); + updatedIdDS.createOrReplaceTempView("updated_ids"); + + sql( + "MERGE INTO %s t USING updated_ids s " + + "ON t.id == s.value " + + "WHEN MATCHED THEN " + + " UPDATE SET id = 100 " + + "WHEN NOT MATCHED THEN " + + " INSERT (id, dep) VALUES (-1, 'unknown')", + tableName); + + // in CoW, the target table will be scanned 2 times (main query + runtime filter) + // in MoR, the target table will be scanned only once + int scanCount = mode == COPY_ON_WRITE ? 2 : 1; + + // require at least 2 hits per scan cause 2 delete file apply to 2 data files + // allow at most 2 misses per scan cause each scan has its own broadcast + Cache state = fetchInternalCacheState(); + assertThat(state.stats().hitCount()).isGreaterThanOrEqualTo(2 * scanCount); + assertThat(state.stats().missCount()).isLessThanOrEqualTo(2 * scanCount); + + // verify the final set of records is correct + assertEquals( + "Should have expected rows", + ImmutableList.of(row(100, "hr"), row(100, "hr")), + sql("SELECT * FROM %s ORDER BY id ASC", tableName)); + } + + private void createAndInitTable() throws Exception { + sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tableName); + + append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr")); + append(tableName, new Employee(3, "hr"), new Employee(4, "hr"), new Employee(5, "hr")); + + Table table = validationCatalog.loadTable(tableIdent); + + List> posDeletes = + dataFiles(table).stream() + .map(dataFile -> Pair.of(dataFile.path(), 0L)) + .collect(Collectors.toList()); + Pair posDeleteResult = writePosDeletes(table, posDeletes); + DeleteFile posDeleteFile = posDeleteResult.first(); + CharSequenceSet referencedDataFiles = posDeleteResult.second(); + + DeleteFile eqDeleteFile = writeEqDeletes(table, "id", 2, 5); + + table + .newRowDelta() + .validateFromSnapshot(table.currentSnapshot().snapshotId()) + .validateDataFilesExist(referencedDataFiles) + .addDeletes(posDeleteFile) + .addDeletes(eqDeleteFile) + .commit(); + } + + private DeleteFile writeEqDeletes(Table table, String col, Object... values) throws IOException { + Schema deleteSchema = table.schema().select(col); + + Record delete = GenericRecord.create(deleteSchema); + List deletes = Lists.newArrayList(); + for (Object value : values) { + deletes.add(delete.copy(col, value)); + } + + OutputFile out = Files.localOutput(temp.newFile()); + return FileHelpers.writeDeleteFile(table, out, null, deletes, deleteSchema); + } + + private Pair writePosDeletes( + Table table, List> deletes) throws IOException { + OutputFile out = Files.localOutput(temp.newFile()); + return FileHelpers.writeDeleteFile(table, out, null, deletes); + } + + private void append(String target, Employee... employees) throws NoSuchTableException { + List input = Arrays.asList(employees); + Dataset inputDF = spark.createDataFrame(input, Employee.class); + inputDF.coalesce(1).writeTo(target).append(); + } + + private List dataFiles(Table table) { + try (CloseableIterable tasks = table.newScan().planFiles()) { + return ImmutableList.copyOf(Iterables.transform(tasks, ContentScanTask::file)); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @SuppressWarnings("unchecked") + private static Cache fetchInternalCacheState() { + try { + Field stateField = SparkExecutorCache.class.getDeclaredField("state"); + stateField.setAccessible(true); + SparkExecutorCache cache = SparkExecutorCache.get(); + return (Cache) stateField.get(cache); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void resetCache() { + try { + Field instanceField = SparkExecutorCache.class.getDeclaredField("instance"); + instanceField.setAccessible(true); + instanceField.set(null, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 30b3b8a88576..9f4a4f47bf00 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -47,6 +47,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.time.Duration; import java.util.List; import java.util.Map; import org.apache.iceberg.DistributionMode; @@ -78,6 +79,28 @@ public void after() { sql("DROP TABLE IF EXISTS %s", tableName); } + @TestTemplate + public void testDurationConf() { + Table table = validationCatalog.loadTable(tableIdent); + String confName = "spark.sql.iceberg.some-duration-conf"; + + withSQLConf( + ImmutableMap.of(confName, "10s"), + () -> { + SparkConfParser parser = new SparkConfParser(spark, table, ImmutableMap.of()); + Duration duration = parser.durationConf().sessionConf(confName).parseOptional(); + assertThat(duration).hasSeconds(10); + }); + + withSQLConf( + ImmutableMap.of(confName, "2m"), + () -> { + SparkConfParser parser = new SparkConfParser(spark, table, ImmutableMap.of()); + Duration duration = parser.durationConf().sessionConf(confName).parseOptional(); + assertThat(duration).hasMinutes(2); + }); + } + @TestTemplate public void testDeleteGranularityDefault() { Table table = validationCatalog.loadTable(tableIdent);