Skip to content

Commit

Permalink
API, Core, Spark 3.5: Parallelize reading of deletes and cache them o…
Browse files Browse the repository at this point in the history
…n executors
  • Loading branch information
aokolnychyi committed Dec 19, 2023
1 parent a83bfe7 commit 6c40b80
Show file tree
Hide file tree
Showing 18 changed files with 844 additions and 87 deletions.
64 changes: 64 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

public class TypeUtil {

private static final long OBJECT_HEADER = 12L;

private TypeUtil() {}

/**
Expand Down Expand Up @@ -452,6 +454,68 @@ private static void checkSchemaCompatibility(
}
}

/**
* Estimates the number of bytes a value for a given field may occupy in memory.
*
* <p>This method approximates the memory size based on the internal Java representation defined
* by {@link Type.TypeID}. It is important to note that the actual size might differ from this
* estimation. The method is designed to handle a variety of data types, including primitive
* types, strings, and nested types such as structs, maps, and lists.
*
* @param field a field for which to estimate the size
* @return the estimated size in bytes of the field's value in memory
*/
public static long defaultSize(Types.NestedField field) {
return defaultSize(field.type());
}

private static long defaultSize(Type type) {
switch (type.typeId()) {
case BOOLEAN:
// the size of a boolean variable is virtual machine dependent
// it is common to believe booleans occupy 1 byte in most JVMs
return 1;
case INTEGER:
case FLOAT:
case DATE:
// ints and floats occupy 4 bytes
// dates are internally represented as ints
return 4;
case LONG:
case DOUBLE:
case TIME:
case TIMESTAMP:
// longs and doubles occupy 8 bytes
// times and timestamps are internally represented as longs
return 8;
case STRING:
// 12 (header) + 12 (fields) + 16 (array overhead) + 20 (10 chars, 2 bytes each) = 60 bytes
return 60;
case UUID:
// 12 (header) + 16 (two long variables) = 28 bytes
return 28;
case FIXED:
return ((Types.FixedType) type).length();
case BINARY:
return 100;
case DECIMAL:
// 12 (header) + (12 + 12 + 4) (BigInteger) + 4 (scale) = 44 bytes
return 44;
case STRUCT:
Types.StructType struct = (Types.StructType) type;
return OBJECT_HEADER + struct.fields().stream().mapToLong(TypeUtil::defaultSize).sum();
case LIST:
Types.ListType list = (Types.ListType) type;
return OBJECT_HEADER + 5 * defaultSize(list.elementType());
case MAP:
Types.MapType map = (Types.MapType) type;
long entrySize = OBJECT_HEADER + defaultSize(map.keyType()) + defaultSize(map.valueType());
return OBJECT_HEADER + 5 * entrySize;
default:
return 16;
}
}

/** Interface for passing a function that assigns column IDs. */
public interface NextID {
int get();
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/org/apache/iceberg/SystemConfigs.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ private SystemConfigs() {}
Integer::parseUnsignedInt);

/**
* Sets the size of the delete worker pool. This limits the number of threads used to compute the
* PositionDeleteIndex from the position deletes for a data file.
* Sets the size of the delete worker pool. This limits the number of threads used to read delete
* files for a data file.
*/
public static final ConfigEntry<Integer> DELETE_WORKER_THREAD_POOL_SIZE =
new ConfigEntry<>(
"iceberg.worker.delete-num-threads",
"ICEBERG_WORKER_DELETE_NUM_THREADS",
Math.max(2, Runtime.getRuntime().availableProcessors()),
Math.max(2, 4 * Runtime.getRuntime().availableProcessors()),
Integer::parseUnsignedInt);

/** Whether to use the shared worker pool when planning table scans. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class BitmapPositionDeleteIndex implements PositionDeleteIndex {
roaring64Bitmap = new Roaring64Bitmap();
}

void merge(BitmapPositionDeleteIndex that) {
roaring64Bitmap.or(that.roaring64Bitmap);
}

@Override
public void delete(long position) {
roaring64Bitmap.add(position);
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.SortedMerge;
Expand Down Expand Up @@ -128,6 +129,25 @@ public static StructLikeSet toEqualitySet(
}
}

public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> toPositionIndexes(
CloseableIterable<T> posDeletes) {
CharSequenceMap<PositionDeleteIndex> indexes = CharSequenceMap.create();

try (CloseableIterable<T> deletes = posDeletes) {
for (T delete : deletes) {
CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete);
long position = (long) POSITION_ACCESSOR.get(delete);
PositionDeleteIndex index =
indexes.computeIfAbsent(filePath, key -> new BitmapPositionDeleteIndex());
index.delete(position);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close position delete source", e);
}

return indexes;
}

public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
CharSequence dataLocation, List<CloseableIterable<T>> deleteFiles) {
return toPositionIndex(dataLocation, deleteFiles, ThreadPools.getDeleteWorkerPool());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.deletes;

class EmptyPositionDeleteIndex implements PositionDeleteIndex {

private static final EmptyPositionDeleteIndex INSTANCE = new EmptyPositionDeleteIndex();

private EmptyPositionDeleteIndex() {}

static EmptyPositionDeleteIndex get() {
return INSTANCE;
}

@Override
public void delete(long position) {
throw new UnsupportedOperationException("Cannot modify " + getClass().getName());
}

@Override
public void delete(long posStart, long posEnd) {
throw new UnsupportedOperationException("Cannot modify " + getClass().getName());
}

@Override
public boolean isDeleted(long position) {
return false;
}

@Override
public boolean isEmpty() {
return true;
}

@Override
public String toString() {
return "PositionDeleteIndex{}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,14 @@ public interface PositionDeleteIndex {

/** Returns true if this collection contains no element. */
boolean isEmpty();

/** Returns true if this collection contains elements. */
default boolean isNotEmpty() {
return !isEmpty();
}

/** Returns an empty immutable position delete index. */
static PositionDeleteIndex empty() {
return EmptyPositionDeleteIndex.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.deletes;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class PositionDeleteIndexUtil {

private PositionDeleteIndexUtil() {}

public static PositionDeleteIndex merge(Iterable<? extends PositionDeleteIndex> indexes) {
BitmapPositionDeleteIndex result = new BitmapPositionDeleteIndex();

for (PositionDeleteIndex index : indexes) {
if (index.isNotEmpty()) {
Preconditions.checkArgument(
index instanceof BitmapPositionDeleteIndex,
"Can merge only bitmap-based indexes, got %s",
index.getClass().getName());
result.merge((BitmapPositionDeleteIndex) index);
}
}

return result;
}
}
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/util/ThreadPools.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ public static ExecutorService getWorkerPool() {
/**
* Return an {@link ExecutorService} that uses the "delete worker" thread-pool.
*
* <p>The size of the delete worker pool limits the number of threads used to compute the
* PositionDeleteIndex from the position deletes for a data file.
* <p>The size of this worker pool limits the number of tasks concurrently reading delete files
* within a single JVM. If there are multiple threads loading deletes, all of them will share this
* worker pool by default.
*
* <p>The size of this thread-pool is controlled by the Java system property {@code
* iceberg.worker.delete-num-threads}.
Expand Down
Loading

0 comments on commit 6c40b80

Please sign in to comment.