Skip to content

Commit

Permalink
Core: Fix performance issue when combining tasks by partition (#9629)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Feb 3, 2024
1 parent f4ba90d commit fb02bd2
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 122 deletions.
4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,10 @@ acceptedBreaks:
justification: "Static utility class - should not have public constructor"
"1.4.0":
org.apache.iceberg:iceberg-core:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.PartitionData"
new: "class org.apache.iceberg.PartitionData"
justification: "Serialization across versions is not supported"
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.mapping.NameMapping"
new: "class org.apache.iceberg.mapping.NameMapping"
Expand Down
54 changes: 42 additions & 12 deletions core/src/main/java/org/apache/iceberg/PartitionData.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ private PartitionData(PartitionData toCopy) {
this.schema = toCopy.schema;
}

/** Copy constructor that inherits the state but replaces the partition values */
private PartitionData(PartitionData toCopy, StructLike partition) {
this.partitionType = toCopy.partitionType;
this.size = toCopy.size;
this.data = copyData(partitionType, partition);
this.stringSchema = toCopy.stringSchema;
this.schema = toCopy.schema;
}

public Types.StructType getPartitionType() {
return partitionType;
}
Expand Down Expand Up @@ -134,18 +143,7 @@ public Object get(int pos) {

@Override
public <T> void set(int pos, T value) {
if (value instanceof Utf8) {
// Utf8 is not Serializable
data[pos] = value.toString();
} else if (value instanceof ByteBuffer) {
// ByteBuffer is not Serializable
ByteBuffer buffer = (ByteBuffer) value;
byte[] bytes = new byte[buffer.remaining()];
buffer.duplicate().get(bytes);
data[pos] = bytes;
} else {
data[pos] = value;
}
data[pos] = toInternalValue(value);
}

@Override
Expand All @@ -171,6 +169,10 @@ public PartitionData copy() {
return new PartitionData(this);
}

public PartitionData copyFor(StructLike partition) {
return new PartitionData(this, partition);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -221,4 +223,32 @@ public static Object[] copyData(Types.StructType type, Object[] data) {

return copy;
}

private static Object[] copyData(Types.StructType type, StructLike partition) {
List<Types.NestedField> fields = type.fields();
Object[] data = new Object[fields.size()];

for (int pos = 0; pos < fields.size(); pos++) {
Types.NestedField field = fields.get(pos);
Class<?> javaClass = field.type().typeId().javaClass();
data[pos] = toInternalValue(partition.get(pos, javaClass));
}

return data;
}

private static Object toInternalValue(Object value) {
if (value instanceof Utf8) {
// Utf8 is not Serializable
return value.toString();
} else if (value instanceof ByteBuffer) {
// ByteBuffer is not Serializable
ByteBuffer buffer = (ByteBuffer) value;
byte[] bytes = new byte[buffer.remaining()];
buffer.duplicate().get(bytes);
return bytes;
} else {
return value;
}
}
}
20 changes: 2 additions & 18 deletions core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);

Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap();
PartitionData groupingKeyTemplate = new PartitionData(groupingKeyType);

// group tasks by grouping keys derived from their partition tuples
StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType);
Expand All @@ -166,7 +167,7 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
specId -> StructProjection.create(spec.partitionType(), groupingKeyType));
List<T> groupingKeyTasks =
tasksByGroupingKey.computeIfAbsent(
projectGroupingKey(groupingKeyProjection, groupingKeyType, partition),
groupingKeyTemplate.copyFor(groupingKeyProjection.wrap(partition)),
groupingKey -> Lists.newArrayList());
if (task instanceof SplittableScanTask<?>) {
((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add);
Expand All @@ -188,23 +189,6 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup
return taskGroups;
}

private static StructLike projectGroupingKey(
StructProjection groupingKeyProjection,
Types.StructType groupingKeyType,
StructLike partition) {

PartitionData groupingKey = new PartitionData(groupingKeyType);

groupingKeyProjection.wrap(partition);

for (int pos = 0; pos < groupingKeyProjection.size(); pos++) {
Class<?> javaClass = groupingKey.getType(pos).typeId().javaClass();
groupingKey.set(pos, groupingKeyProjection.get(pos, javaClass));
}

return groupingKey;
}

private static <T extends ScanTask> Iterable<ScanTaskGroup<T>> toTaskGroupIterable(
StructLike groupingKey,
Iterable<T> tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.spark;

import static org.apache.spark.sql.functions.lit;

import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
Expand All @@ -28,36 +26,26 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.FileGenerationUtil;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.types.StructType;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -93,16 +81,14 @@ public class TaskGroupPlanningBenchmark {
private static final String PARTITION_COLUMN = "ss_ticket_number";

private static final int NUM_PARTITIONS = 150;
private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 5;
private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000;
private static final int NUM_DATA_FILES_PER_PARTITION = 50_000;
private static final int NUM_DELETE_FILES_PER_PARTITION = 25;
private static final int NUM_ROWS_PER_DATA_FILE = 150;

private final Configuration hadoopConf = new Configuration();
private SparkSession spark;
private Table table;

private List<ScanTask> fileTasks;
private List<FileScanTask> fileTasks;

@Setup
public void setupBenchmark() throws NoSuchTableException, ParseException {
Expand All @@ -122,121 +108,94 @@ public void tearDownBenchmark() {
@Threads(1)
public void planTaskGroups(Blackhole blackhole) {
SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of());
List<ScanTaskGroup<ScanTask>> taskGroups =
List<ScanTaskGroup<FileScanTask>> taskGroups =
TableScanUtil.planTaskGroups(
fileTasks,
readConf.splitSize(),
readConf.splitLookback(),
readConf.splitOpenFileCost());

long rowsCount = 0L;
for (ScanTaskGroup<ScanTask> taskGroup : taskGroups) {
for (ScanTaskGroup<FileScanTask> taskGroup : taskGroups) {
rowsCount += taskGroup.estimatedRowsCount();
}
blackhole.consume(rowsCount);

long filesCount = 0L;
for (ScanTaskGroup<ScanTask> taskGroup : taskGroups) {
for (ScanTaskGroup<FileScanTask> taskGroup : taskGroups) {
filesCount += taskGroup.filesCount();
}
blackhole.consume(filesCount);

long sizeBytes = 0L;
for (ScanTaskGroup<ScanTask> taskGroup : taskGroups) {
for (ScanTaskGroup<FileScanTask> taskGroup : taskGroups) {
sizeBytes += taskGroup.sizeBytes();
}
blackhole.consume(sizeBytes);
}

private void loadFileTasks() {
table.refresh();
@Benchmark
@Threads(1)
public void planTaskGroupsWithGrouping(Blackhole blackhole) {
SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of());

try (CloseableIterable<ScanTask> fileTasksIterable = table.newBatchScan().planFiles()) {
this.fileTasks = Lists.newArrayList(fileTasksIterable);
} catch (IOException e) {
throw new UncheckedIOException(e);
List<ScanTaskGroup<FileScanTask>> taskGroups =
TableScanUtil.planTaskGroups(
fileTasks,
readConf.splitSize(),
readConf.splitLookback(),
readConf.splitOpenFileCost(),
Partitioning.groupingKeyType(table.schema(), table.specs().values()));

long rowsCount = 0L;
for (ScanTaskGroup<FileScanTask> taskGroup : taskGroups) {
rowsCount += taskGroup.estimatedRowsCount();
}
}
blackhole.consume(rowsCount);

private DataFile loadAddedDataFile() {
table.refresh();
long filesCount = 0L;
for (ScanTaskGroup<FileScanTask> taskGroup : taskGroups) {
filesCount += taskGroup.filesCount();
}
blackhole.consume(filesCount);

Iterable<DataFile> addedDataFiles = table.currentSnapshot().addedDataFiles(table.io());
return Iterables.getOnlyElement(addedDataFiles);
long sizeBytes = 0L;
for (ScanTaskGroup<FileScanTask> taskGroup : taskGroups) {
sizeBytes += taskGroup.sizeBytes();
}
blackhole.consume(sizeBytes);
}

private DeleteFile loadAddedDeleteFile() {
private void loadFileTasks() {
table.refresh();

Iterable<DeleteFile> addedDeleteFiles = table.currentSnapshot().addedDeleteFiles(table.io());
return Iterables.getOnlyElement(addedDeleteFiles);
try (CloseableIterable<FileScanTask> fileTasksIterable = table.newScan().planFiles()) {
this.fileTasks = Lists.newArrayList(fileTasksIterable);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void initDataAndDeletes() throws NoSuchTableException {
Schema schema = table.schema();
PartitionSpec spec = table.spec();
LocationProvider locations = table.locationProvider();

private void initDataAndDeletes() {
for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) {
Dataset<Row> inputDF =
randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
.drop(PARTITION_COLUMN)
.withColumn(PARTITION_COLUMN, lit(partitionOrdinal));

for (int fileOrdinal = 0; fileOrdinal < NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) {
appendAsFile(inputDF);
}
StructLike partition = TestHelpers.Row.of(partitionOrdinal);

DataFile dataFile = loadAddedDataFile();

sql(
"DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d",
TABLE_NAME, PARTITION_COLUMN, partitionOrdinal);

DeleteFile deleteFile = loadAddedDeleteFile();

AppendFiles append = table.newFastAppend();
RowDelta rowDelta = table.newRowDelta();

for (int fileOrdinal = 0; fileOrdinal < NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) {
String replicaFileName = UUID.randomUUID() + "-replica.parquet";
DataFile replicaDataFile =
DataFiles.builder(spec)
.copy(dataFile)
.withPath(locations.newDataLocation(spec, dataFile.partition(), replicaFileName))
.build();
append.appendFile(replicaDataFile);
for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) {
DataFile dataFile = FileGenerationUtil.generateDataFile(table, partition);
rowDelta.addRows(dataFile);
}

append.commit();

RowDelta rowDelta = table.newRowDelta();

for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; fileOrdinal++) {
String replicaFileName = UUID.randomUUID() + "-replica.parquet";
DeleteFile replicaDeleteFile =
FileMetadata.deleteFileBuilder(spec)
.copy(deleteFile)
.withPath(locations.newDataLocation(spec, deleteFile.partition(), replicaFileName))
.build();
rowDelta.addDeletes(replicaDeleteFile);
DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, partition);
rowDelta.addDeletes(deleteFile);
}

rowDelta.commit();
}
}

private void appendAsFile(Dataset<Row> df) throws NoSuchTableException {
df.coalesce(1).writeTo(TABLE_NAME).append();
}

private Dataset<Row> randomDataDF(Schema schema, int numRows) {
Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<InternalRow> rowRDD = context.parallelize(Lists.newArrayList(rows));
StructType rowSparkType = SparkSchemaUtil.convert(schema);
return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false);
}

private void setupSpark() {
this.spark =
SparkSession.builder()
Expand Down

0 comments on commit fb02bd2

Please sign in to comment.