diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
index d4c2848b45e6..43ce2a303e2b 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
@@ -29,6 +29,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -81,7 +82,7 @@ public class SparkDistributedDataScan extends BaseDistributedDataScan {
private Broadcast
tableBroadcast = null;
public SparkDistributedDataScan(SparkSession spark, Table table, SparkReadConf readConf) {
- this(spark, table, readConf, table.schema(), TableScanContext.empty());
+ this(spark, table, readConf, table.schema(), newTableScanContext(table));
}
private SparkDistributedDataScan(
@@ -134,6 +135,10 @@ private Iterable> doPlanDataRemotely(
.flatMap(new ReadDataManifest(tableBroadcast(), context(), withColumnStats));
List> dataFileGroups = collectPartitions(dataFileRDD);
+ int matchingFilesCount = dataFileGroups.stream().mapToInt(List::size).sum();
+ int skippedFilesCount = liveFilesCount(dataManifests) - matchingFilesCount;
+ scanMetrics().skippedDataFiles().increment(skippedFilesCount);
+
return Iterables.transform(dataFileGroups, CloseableIterable::withNoopClose);
}
@@ -157,6 +162,9 @@ private DeleteFileIndex doPlanDeletesRemotely(List deleteManifests
.flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
.collect();
+ int skippedFilesCount = liveFilesCount(deleteManifests) - deleteFiles.size();
+ scanMetrics().skippedDeleteFiles().increment(skippedFilesCount);
+
return DeleteFileIndex.builderFor(deleteFiles)
.specsById(table().specs())
.caseSensitive(isCaseSensitive())
@@ -193,6 +201,23 @@ private List> collectPartitions(JavaRDD rdd) {
return Arrays.asList(rdd.collectPartitions(partitionIds));
}
+ private int liveFilesCount(List manifests) {
+ return manifests.stream().mapToInt(this::liveFilesCount).sum();
+ }
+
+ private int liveFilesCount(ManifestFile manifest) {
+ return manifest.existingFilesCount() + manifest.addedFilesCount();
+ }
+
+ private static TableScanContext newTableScanContext(Table table) {
+ if (table instanceof BaseTable) {
+ MetricsReporter reporter = ((BaseTable) table).reporter();
+ return ImmutableTableScanContext.builder().metricsReporter(reporter).build();
+ } else {
+ return TableScanContext.empty();
+ }
+ }
+
private static class ReadDataManifest implements FlatMapFunction {
private final Broadcast table;
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
new file mode 100644
index 000000000000..1ea4f990b272
--- /dev/null
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestSparkDistributedDataScanReporting
+ extends ScanPlanningAndReportingTestBase> {
+
+ @Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ new Object[] {LOCAL, LOCAL},
+ new Object[] {LOCAL, DISTRIBUTED},
+ new Object[] {DISTRIBUTED, LOCAL},
+ new Object[] {DISTRIBUTED, DISTRIBUTED}
+ };
+ }
+
+ private static SparkSession spark = null;
+
+ private final PlanningMode dataMode;
+ private final PlanningMode deleteMode;
+
+ public TestSparkDistributedDataScanReporting(
+ PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
+ this.dataMode = dataPlanningMode;
+ this.deleteMode = deletePlanningMode;
+ }
+
+ @BeforeClass
+ public static void startSpark() {
+ TestSparkDistributedDataScanReporting.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestSparkDistributedDataScanReporting.spark;
+ TestSparkDistributedDataScanReporting.spark = null;
+ currentSpark.stop();
+ }
+
+ @Override
+ protected BatchScan newScan(Table table) {
+ table
+ .updateProperties()
+ .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+ .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+ .commit();
+ SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of());
+ return new SparkDistributedDataScan(spark, table, readConf);
+ }
+}