diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index b9ed4f8d67ce..2093753bf755 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -53,6 +53,10 @@ public BaseTable(TableOperations ops, String name, MetricsReporter reporter) { this.reporter = reporter; } + MetricsReporter reporter() { + return reporter; + } + @Override public TableOperations operations() { return ops; diff --git a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java similarity index 93% rename from core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java rename to core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java index 106c236f59b1..a8f98f82cc81 100644 --- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java +++ b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java @@ -36,14 +36,18 @@ import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.Test; -public class TestScanPlanningAndReporting extends TableTestBase { +public abstract class ScanPlanningAndReportingTestBase< + ScanT extends Scan, T extends ScanTask, G extends ScanTaskGroup> + extends TableTestBase { private final TestMetricsReporter reporter = new TestMetricsReporter(); - public TestScanPlanningAndReporting() { + public ScanPlanningAndReportingTestBase() { super(2); } + protected abstract ScanT newScan(Table table); + @Test public void noDuplicatesInScanContext() { TableScanContext context = TableScanContext.empty(); @@ -82,12 +86,11 @@ public void scanningWithMultipleReporters() throws IOException { table.refresh(); AtomicInteger reportedCount = new AtomicInteger(); - TableScan tableScan = - table - .newScan() + ScanT tableScan = + newScan(table) .metricsReporter((MetricsReporter) -> reportedCount.getAndIncrement()) .metricsReporter((MetricsReporter) -> reportedCount.getAndIncrement()); - try (CloseableIterable fileScanTasks = tableScan.planFiles()) { + try (CloseableIterable fileScanTasks = tableScan.planFiles()) { fileScanTasks.forEach(task -> {}); } @@ -113,10 +116,10 @@ public void scanningWithMultipleDataManifests() throws IOException { table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); table.newAppend().appendFile(FILE_D).commit(); table.refresh(); - TableScan tableScan = table.newScan(); + ScanT tableScan = newScan(table); // should be 3 files - try (CloseableIterable fileScanTasks = tableScan.planFiles()) { + try (CloseableIterable fileScanTasks = tableScan.planFiles()) { fileScanTasks.forEach(task -> {}); } @@ -180,9 +183,9 @@ public void scanningWithDeletes() throws IOException { table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit(); table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES).commit(); - TableScan tableScan = table.newScan(); + ScanT tableScan = newScan(table); - try (CloseableIterable fileScanTasks = tableScan.planFiles()) { + try (CloseableIterable fileScanTasks = tableScan.planFiles()) { fileScanTasks.forEach(task -> {}); } @@ -218,12 +221,12 @@ public void scanningWithSkippedDataFiles() throws IOException { table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit(); table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit(); table.newAppend().appendFile(FILE_C).commit(); - TableScan tableScan = table.newScan(); + ScanT tableScan = newScan(table); List fileTasks = Lists.newArrayList(); - try (CloseableIterable fileScanTasks = + try (CloseableIterable scanTasks = tableScan.filter(Expressions.equal("data", "1")).planFiles()) { - fileScanTasks.forEach(fileTasks::add); + scanTasks.forEach(task -> fileTasks.add((FileScanTask) task)); } assertThat(fileTasks) .singleElement() @@ -259,12 +262,12 @@ public void scanningWithSkippedDeleteFiles() throws IOException { table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit(); table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit(); table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit(); - TableScan tableScan = table.newScan(); + ScanT tableScan = newScan(table); List fileTasks = Lists.newArrayList(); - try (CloseableIterable fileScanTasks = + try (CloseableIterable scanTasks = tableScan.filter(Expressions.equal("data", "1")).planFiles()) { - fileScanTasks.forEach(fileTasks::add); + scanTasks.forEach(task -> fileTasks.add((FileScanTask) task)); } assertThat(fileTasks) .singleElement() @@ -302,9 +305,9 @@ public void scanningWithEqualityAndPositionalDeleteFiles() throws IOException { table.newAppend().appendFile(FILE_A).commit(); // FILE_A_DELETES = positionalDelete / FILE_A2_DELETES = equalityDelete table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit(); - TableScan tableScan = table.newScan(); + ScanT tableScan = newScan(table); - try (CloseableIterable fileScanTasks = + try (CloseableIterable fileScanTasks = tableScan.filter(Expressions.equal("data", "6")).planFiles()) { fileScanTasks.forEach(task -> {}); } diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java index 9998c47ff300..08c4ac33d6fd 100644 --- a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java +++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java @@ -21,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; -import org.apache.iceberg.TestScanPlanningAndReporting.TestMetricsReporter; +import org.apache.iceberg.ScanPlanningAndReportingTestBase.TestMetricsReporter; import org.apache.iceberg.metrics.CommitMetricsResult; import org.apache.iceberg.metrics.CommitReport; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; diff --git a/core/src/test/java/org/apache/iceberg/TestLocalScanPlanningAndReporting.java b/core/src/test/java/org/apache/iceberg/TestLocalScanPlanningAndReporting.java new file mode 100644 index 000000000000..dd8f5374f089 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestLocalScanPlanningAndReporting.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +public class TestLocalScanPlanningAndReporting + extends ScanPlanningAndReportingTestBase { + + @Override + protected TableScan newScan(Table table) { + return table.newScan(); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java index d4c2848b45e6..43ce2a303e2b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java @@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.ClosingIterator; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -81,7 +82,7 @@ public class SparkDistributedDataScan extends BaseDistributedDataScan { private Broadcast tableBroadcast = null; public SparkDistributedDataScan(SparkSession spark, Table table, SparkReadConf readConf) { - this(spark, table, readConf, table.schema(), TableScanContext.empty()); + this(spark, table, readConf, table.schema(), newTableScanContext(table)); } private SparkDistributedDataScan( @@ -134,6 +135,10 @@ private Iterable> doPlanDataRemotely( .flatMap(new ReadDataManifest(tableBroadcast(), context(), withColumnStats)); List> dataFileGroups = collectPartitions(dataFileRDD); + int matchingFilesCount = dataFileGroups.stream().mapToInt(List::size).sum(); + int skippedFilesCount = liveFilesCount(dataManifests) - matchingFilesCount; + scanMetrics().skippedDataFiles().increment(skippedFilesCount); + return Iterables.transform(dataFileGroups, CloseableIterable::withNoopClose); } @@ -157,6 +162,9 @@ private DeleteFileIndex doPlanDeletesRemotely(List deleteManifests .flatMap(new ReadDeleteManifest(tableBroadcast(), context())) .collect(); + int skippedFilesCount = liveFilesCount(deleteManifests) - deleteFiles.size(); + scanMetrics().skippedDeleteFiles().increment(skippedFilesCount); + return DeleteFileIndex.builderFor(deleteFiles) .specsById(table().specs()) .caseSensitive(isCaseSensitive()) @@ -193,6 +201,23 @@ private List> collectPartitions(JavaRDD rdd) { return Arrays.asList(rdd.collectPartitions(partitionIds)); } + private int liveFilesCount(List manifests) { + return manifests.stream().mapToInt(this::liveFilesCount).sum(); + } + + private int liveFilesCount(ManifestFile manifest) { + return manifest.existingFilesCount() + manifest.addedFilesCount(); + } + + private static TableScanContext newTableScanContext(Table table) { + if (table instanceof BaseTable) { + MetricsReporter reporter = ((BaseTable) table).reporter(); + return ImmutableTableScanContext.builder().metricsReporter(reporter).build(); + } else { + return TableScanContext.empty(); + } + } + private static class ReadDataManifest implements FlatMapFunction { private final Broadcast
table; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java new file mode 100644 index 000000000000..1ea4f990b272 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.PlanningMode.DISTRIBUTED; +import static org.apache.iceberg.PlanningMode.LOCAL; + +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSparkDistributedDataScanReporting + extends ScanPlanningAndReportingTestBase> { + + @Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}") + public static Object[] parameters() { + return new Object[][] { + new Object[] {LOCAL, LOCAL}, + new Object[] {LOCAL, DISTRIBUTED}, + new Object[] {DISTRIBUTED, LOCAL}, + new Object[] {DISTRIBUTED, DISTRIBUTED} + }; + } + + private static SparkSession spark = null; + + private final PlanningMode dataMode; + private final PlanningMode deleteMode; + + public TestSparkDistributedDataScanReporting( + PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) { + this.dataMode = dataPlanningMode; + this.deleteMode = deletePlanningMode; + } + + @BeforeClass + public static void startSpark() { + TestSparkDistributedDataScanReporting.spark = + SparkSession.builder() + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") + .getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestSparkDistributedDataScanReporting.spark; + TestSparkDistributedDataScanReporting.spark = null; + currentSpark.stop(); + } + + @Override + protected BatchScan newScan(Table table) { + table + .updateProperties() + .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName()) + .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName()) + .commit(); + SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); + return new SparkDistributedDataScan(spark, table, readConf); + } +}