Skip to content

Commit

Permalink
Spark 3.4: Fix metrics reporting in distributed planning (#8613)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Sep 21, 2023
1 parent f826cf4 commit 3ca14b8
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -81,7 +82,7 @@ public class SparkDistributedDataScan extends BaseDistributedDataScan {
private Broadcast<Table> tableBroadcast = null;

public SparkDistributedDataScan(SparkSession spark, Table table, SparkReadConf readConf) {
this(spark, table, readConf, table.schema(), TableScanContext.empty());
this(spark, table, readConf, table.schema(), newTableScanContext(table));
}

private SparkDistributedDataScan(
Expand Down Expand Up @@ -134,6 +135,10 @@ private Iterable<CloseableIterable<DataFile>> doPlanDataRemotely(
.flatMap(new ReadDataManifest(tableBroadcast(), context(), withColumnStats));
List<List<DataFile>> dataFileGroups = collectPartitions(dataFileRDD);

int matchingFilesCount = dataFileGroups.stream().mapToInt(List::size).sum();
int skippedFilesCount = liveFilesCount(dataManifests) - matchingFilesCount;
scanMetrics().skippedDataFiles().increment(skippedFilesCount);

return Iterables.transform(dataFileGroups, CloseableIterable::withNoopClose);
}

Expand All @@ -157,6 +162,9 @@ private DeleteFileIndex doPlanDeletesRemotely(List<ManifestFile> deleteManifests
.flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
.collect();

int skippedFilesCount = liveFilesCount(deleteManifests) - deleteFiles.size();
scanMetrics().skippedDeleteFiles().increment(skippedFilesCount);

return DeleteFileIndex.builderFor(deleteFiles)
.specsById(table().specs())
.caseSensitive(isCaseSensitive())
Expand Down Expand Up @@ -193,6 +201,23 @@ private <T> List<List<T>> collectPartitions(JavaRDD<T> rdd) {
return Arrays.asList(rdd.collectPartitions(partitionIds));
}

private int liveFilesCount(List<ManifestFile> manifests) {
return manifests.stream().mapToInt(this::liveFilesCount).sum();
}

private int liveFilesCount(ManifestFile manifest) {
return manifest.existingFilesCount() + manifest.addedFilesCount();
}

private static TableScanContext newTableScanContext(Table table) {
if (table instanceof BaseTable) {
MetricsReporter reporter = ((BaseTable) table).reporter();
return ImmutableTableScanContext.builder().metricsReporter(reporter).build();
} else {
return TableScanContext.empty();
}
}

private static class ReadDataManifest implements FlatMapFunction<ManifestFileBean, DataFile> {

private final Broadcast<Table> table;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;

import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestSparkDistributedDataScanReporting
extends ScanPlanningAndReportingTestBase<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {

@Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {LOCAL, LOCAL},
new Object[] {LOCAL, DISTRIBUTED},
new Object[] {DISTRIBUTED, LOCAL},
new Object[] {DISTRIBUTED, DISTRIBUTED}
};
}

private static SparkSession spark = null;

private final PlanningMode dataMode;
private final PlanningMode deleteMode;

public TestSparkDistributedDataScanReporting(
PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
this.dataMode = dataPlanningMode;
this.deleteMode = deletePlanningMode;
}

@BeforeClass
public static void startSpark() {
TestSparkDistributedDataScanReporting.spark =
SparkSession.builder()
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
.getOrCreate();
}

@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestSparkDistributedDataScanReporting.spark;
TestSparkDistributedDataScanReporting.spark = null;
currentSpark.stop();
}

@Override
protected BatchScan newScan(Table table) {
table
.updateProperties()
.set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
.set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
.commit();
SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of());
return new SparkDistributedDataScan(spark, table, readConf);
}
}

0 comments on commit 3ca14b8

Please sign in to comment.