From aef13dce4043a97b251fe2467183b4948c3a858d Mon Sep 17 00:00:00 2001 From: aokolnychyi Date: Thu, 21 Sep 2023 11:03:08 -0700 Subject: [PATCH] Spark 3.4: Fix metrics reporting in distributed planning --- .../iceberg/SparkDistributedDataScan.java | 27 +++++- ...TestSparkDistributedDataScanReporting.java | 85 +++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java index d4c2848b45e6..43ce2a303e2b 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java @@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.ClosingIterator; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -81,7 +82,7 @@ public class SparkDistributedDataScan extends BaseDistributedDataScan { private Broadcast tableBroadcast = null; public SparkDistributedDataScan(SparkSession spark, Table table, SparkReadConf readConf) { - this(spark, table, readConf, table.schema(), TableScanContext.empty()); + this(spark, table, readConf, table.schema(), newTableScanContext(table)); } private SparkDistributedDataScan( @@ -134,6 +135,10 @@ private Iterable> doPlanDataRemotely( .flatMap(new ReadDataManifest(tableBroadcast(), context(), withColumnStats)); List> dataFileGroups = collectPartitions(dataFileRDD); + int matchingFilesCount = dataFileGroups.stream().mapToInt(List::size).sum(); + int skippedFilesCount = liveFilesCount(dataManifests) - matchingFilesCount; + scanMetrics().skippedDataFiles().increment(skippedFilesCount); + return Iterables.transform(dataFileGroups, CloseableIterable::withNoopClose); } @@ -157,6 +162,9 @@ private DeleteFileIndex doPlanDeletesRemotely(List deleteManifests .flatMap(new ReadDeleteManifest(tableBroadcast(), context())) .collect(); + int skippedFilesCount = liveFilesCount(deleteManifests) - deleteFiles.size(); + scanMetrics().skippedDeleteFiles().increment(skippedFilesCount); + return DeleteFileIndex.builderFor(deleteFiles) .specsById(table().specs()) .caseSensitive(isCaseSensitive()) @@ -193,6 +201,23 @@ private List> collectPartitions(JavaRDD rdd) { return Arrays.asList(rdd.collectPartitions(partitionIds)); } + private int liveFilesCount(List manifests) { + return manifests.stream().mapToInt(this::liveFilesCount).sum(); + } + + private int liveFilesCount(ManifestFile manifest) { + return manifest.existingFilesCount() + manifest.addedFilesCount(); + } + + private static TableScanContext newTableScanContext(Table table) { + if (table instanceof BaseTable) { + MetricsReporter reporter = ((BaseTable) table).reporter(); + return ImmutableTableScanContext.builder().metricsReporter(reporter).build(); + } else { + return TableScanContext.empty(); + } + } + private static class ReadDataManifest implements FlatMapFunction { private final Broadcast
table; diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java new file mode 100644 index 000000000000..1ea4f990b272 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.PlanningMode.DISTRIBUTED; +import static org.apache.iceberg.PlanningMode.LOCAL; + +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSparkDistributedDataScanReporting + extends ScanPlanningAndReportingTestBase> { + + @Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}") + public static Object[] parameters() { + return new Object[][] { + new Object[] {LOCAL, LOCAL}, + new Object[] {LOCAL, DISTRIBUTED}, + new Object[] {DISTRIBUTED, LOCAL}, + new Object[] {DISTRIBUTED, DISTRIBUTED} + }; + } + + private static SparkSession spark = null; + + private final PlanningMode dataMode; + private final PlanningMode deleteMode; + + public TestSparkDistributedDataScanReporting( + PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) { + this.dataMode = dataPlanningMode; + this.deleteMode = deletePlanningMode; + } + + @BeforeClass + public static void startSpark() { + TestSparkDistributedDataScanReporting.spark = + SparkSession.builder() + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") + .getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestSparkDistributedDataScanReporting.spark; + TestSparkDistributedDataScanReporting.spark = null; + currentSpark.stop(); + } + + @Override + protected BatchScan newScan(Table table) { + table + .updateProperties() + .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName()) + .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName()) + .commit(); + SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); + return new SparkDistributedDataScan(spark, table, readConf); + } +}