From 506fee492620bc8e13d7da4f104462fb97ceef82 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 31 Jul 2024 08:41:20 -0700 Subject: [PATCH] Spark 3.5: Support Reporting Column Stats (#10659) Co-authored-by: Karuppayya Rajendran --- .../apache/iceberg/spark/SparkReadConf.java | 8 + .../iceberg/spark/SparkSQLProperties.java | 4 + .../spark/source/SparkChangelogScan.java | 2 +- .../spark/source/SparkColumnStatistics.java | 88 +++++++++ .../iceberg/spark/source/SparkScan.java | 54 +++++- .../apache/iceberg/spark/source/Stats.java | 12 +- .../iceberg/spark/source/TestSparkScan.java | 183 ++++++++++++++++++ 7 files changed, 346 insertions(+), 5 deletions(-) create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index bd29fb0d6d42..67e9d78ada4d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -347,4 +347,12 @@ private boolean executorCacheLocalityEnabledInternal() { .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT) .parse(); } + + public boolean reportColumnStats() { + return confParser + .booleanConf() + .sessionConf(SparkSQLProperties.REPORT_COLUMN_STATS) + .defaultValue(SparkSQLProperties.REPORT_COLUMN_STATS_DEFAULT) + .parse(); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index ea8f6fe0718b..77ae796ffb76 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -90,4 +90,8 @@ private SparkSQLProperties() {} public static final String EXECUTOR_CACHE_LOCALITY_ENABLED = "spark.sql.iceberg.executor-cache.locality.enabled"; public static final boolean EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT = false; + + // Controls whether to report available column statistics to Spark for query optimization. + public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats"; + public static final boolean REPORT_COLUMN_STATS_DEFAULT = true; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java index 7cde3e1fbe11..71b53d70262f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java @@ -88,7 +88,7 @@ class SparkChangelogScan implements Scan, SupportsReportStatistics { public Statistics estimateStatistics() { long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum(); long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount); - return new Stats(sizeInBytes, rowsCount); + return new Stats(sizeInBytes, rowsCount, Collections.emptyMap()); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java new file mode 100644 index 000000000000..faaff3631d7c --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnStatistics.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Optional; +import java.util.OptionalLong; +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; +import org.apache.spark.sql.connector.read.colstats.Histogram; + +class SparkColumnStatistics implements ColumnStatistics { + + private final OptionalLong distinctCount; + private final Optional min; + private final Optional max; + private final OptionalLong nullCount; + private final OptionalLong avgLen; + private final OptionalLong maxLen; + private final Optional histogram; + + SparkColumnStatistics( + Long distinctCount, + Object min, + Object max, + Long nullCount, + Long avgLen, + Long maxLen, + Histogram histogram) { + this.distinctCount = + (distinctCount == null) ? OptionalLong.empty() : OptionalLong.of(distinctCount); + this.min = Optional.ofNullable(min); + this.max = Optional.ofNullable(max); + this.nullCount = (nullCount == null) ? OptionalLong.empty() : OptionalLong.of(nullCount); + this.avgLen = (avgLen == null) ? OptionalLong.empty() : OptionalLong.of(avgLen); + this.maxLen = (maxLen == null) ? OptionalLong.empty() : OptionalLong.of(maxLen); + this.histogram = Optional.ofNullable(histogram); + } + + @Override + public OptionalLong distinctCount() { + return distinctCount; + } + + @Override + public Optional min() { + return min; + } + + @Override + public Optional max() { + return max; + } + + @Override + public OptionalLong nullCount() { + return nullCount; + } + + @Override + public OptionalLong avgLen() { + return avgLen; + } + + @Override + public OptionalLong maxLen() { + return maxLen; + } + + @Override + public Optional histogram() { + return histogram; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 6efe8a080bde..8b88cf49c692 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -23,15 +23,19 @@ import java.util.Map; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.iceberg.BlobMetadata; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -75,22 +79,28 @@ import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.expressions.FieldReference; +import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.metric.CustomMetric; import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.read.SupportsReportStatistics; +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class SparkScan implements Scan, SupportsReportStatistics { private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); + private static final String NDV_KEY = "ndv"; private final JavaSparkContext sparkContext; private final Table table; + private final SparkSession spark; private final SparkReadConf readConf; private final boolean caseSensitive; private final Schema expectedSchema; @@ -111,6 +121,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch()); SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema, expectedSchema); + this.spark = spark; this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.readConf = readConf; @@ -175,7 +186,43 @@ public Statistics estimateStatistics() { protected Statistics estimateStatistics(Snapshot snapshot) { // its a fresh table, no data if (snapshot == null) { - return new Stats(0L, 0L); + return new Stats(0L, 0L, Collections.emptyMap()); + } + + boolean cboEnabled = + Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(), "false")); + Map colStatsMap = Collections.emptyMap(); + if (readConf.reportColumnStats() && cboEnabled) { + colStatsMap = Maps.newHashMap(); + List files = table.statisticsFiles(); + if (!files.isEmpty()) { + List metadataList = (files.get(0)).blobMetadata(); + + for (BlobMetadata blobMetadata : metadataList) { + int id = blobMetadata.fields().get(0); + String colName = table.schema().findColumnName(id); + NamedReference ref = FieldReference.column(colName); + + Long ndv = null; + if (blobMetadata + .type() + .equals(org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1)) { + String ndvStr = blobMetadata.properties().get(NDV_KEY); + if (!Strings.isNullOrEmpty(ndvStr)) { + ndv = Long.parseLong(ndvStr); + } else { + LOG.debug("ndv is not set in BlobMetadata for column {}", colName); + } + } else { + LOG.debug("DataSketch blob is not available for column {}", colName); + } + + ColumnStatistics colStats = + new SparkColumnStatistics(ndv, null, null, null, null, null, null); + + colStatsMap.put(ref, colStats); + } + } } // estimate stats using snapshot summary only for partitioned tables @@ -186,12 +233,13 @@ protected Statistics estimateStatistics(Snapshot snapshot) { snapshot.snapshotId(), table.name()); long totalRecords = totalRecords(snapshot); - return new Stats(SparkSchemaUtil.estimateSize(readSchema(), totalRecords), totalRecords); + return new Stats( + SparkSchemaUtil.estimateSize(readSchema(), totalRecords), totalRecords, colStatsMap); } long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum(); long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount); - return new Stats(sizeInBytes, rowsCount); + return new Stats(sizeInBytes, rowsCount, colStatsMap); } private long totalRecords(Snapshot snapshot) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java index ddf6ca834d9b..ccf523cb4b05 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java @@ -18,16 +18,21 @@ */ package org.apache.iceberg.spark.source; +import java.util.Map; import java.util.OptionalLong; +import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.read.Statistics; +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; class Stats implements Statistics { private final OptionalLong sizeInBytes; private final OptionalLong numRows; + private final Map colstats; - Stats(long sizeInBytes, long numRows) { + Stats(long sizeInBytes, long numRows, Map colstats) { this.sizeInBytes = OptionalLong.of(sizeInBytes); this.numRows = OptionalLong.of(numRows); + this.colstats = colstats; } @Override @@ -39,4 +44,9 @@ public OptionalLong sizeInBytes() { public OptionalLong numRows() { return numRows; } + + @Override + public Map columnStats() { + return colstats; + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java index d539b0123951..7d5475ff919e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import static org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1; import static org.apache.iceberg.spark.SystemFunctionPushDownHelper.createPartitionedTable; import static org.apache.iceberg.spark.SystemFunctionPushDownHelper.createUnpartitionedTable; import static org.apache.iceberg.spark.SystemFunctionPushDownHelper.timestampStrToDayOrdinal; @@ -28,14 +29,22 @@ import static org.apache.spark.sql.functions.expr; import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkSQLProperties; import org.apache.iceberg.spark.TestBaseWithCatalog; import org.apache.iceberg.spark.functions.BucketFunction; import org.apache.iceberg.spark.functions.DaysFunction; @@ -44,6 +53,7 @@ import org.apache.iceberg.spark.functions.TruncateFunction; import org.apache.iceberg.spark.functions.YearsFunction; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.functions.BoundFunction; @@ -60,6 +70,8 @@ import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters; +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.junit.jupiter.api.AfterEach; @@ -130,6 +142,157 @@ public void testEstimatedRowCount() throws NoSuchTableException { assertThat(stats.numRows().getAsLong()).isEqualTo(10000L); } + @TestTemplate + public void testTableWithoutColStats() throws NoSuchTableException { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "a"), + new SimpleRecord(4, "b")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + + Table table = validationCatalog.loadTable(tableIdent); + + SparkScanBuilder scanBuilder = + new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty()); + SparkScan scan = (SparkScan) scanBuilder.build(); + + Map reportColStatsDisabled = + ImmutableMap.of( + SQLConf.CBO_ENABLED().key(), "true", SparkSQLProperties.REPORT_COLUMN_STATS, "false"); + + Map reportColStatsEnabled = + ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true"); + + checkColStatisticsNotReported(scan, 4L); + withSQLConf(reportColStatsDisabled, () -> checkColStatisticsNotReported(scan, 4L)); + // The expected col NDVs are nulls + withSQLConf( + reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, Maps.newHashMap())); + } + + @TestTemplate + public void testTableWithOneColStats() throws NoSuchTableException { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "a"), + new SimpleRecord(4, "b")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + + SparkScanBuilder scanBuilder = + new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty()); + SparkScan scan = (SparkScan) scanBuilder.build(); + + Map reportColStatsDisabled = + ImmutableMap.of( + SQLConf.CBO_ENABLED().key(), "true", SparkSQLProperties.REPORT_COLUMN_STATS, "false"); + + Map reportColStatsEnabled = + ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true"); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, + "/test/statistics/file.puffin", + 100, + 42, + ImmutableList.of( + new GenericBlobMetadata( + APACHE_DATASKETCHES_THETA_V1, + snapshotId, + 1, + ImmutableList.of(1), + ImmutableMap.of("ndv", "4")))); + + table.updateStatistics().setStatistics(snapshotId, statisticsFile).commit(); + + checkColStatisticsNotReported(scan, 4L); + withSQLConf(reportColStatsDisabled, () -> checkColStatisticsNotReported(scan, 4L)); + + Map expectedOneNDV = Maps.newHashMap(); + expectedOneNDV.put("id", 4L); + withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, expectedOneNDV)); + } + + @TestTemplate + public void testTableWithTwoColStats() throws NoSuchTableException { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "a"), + new SimpleRecord(4, "b")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + + SparkScanBuilder scanBuilder = + new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty()); + SparkScan scan = (SparkScan) scanBuilder.build(); + + Map reportColStatsDisabled = + ImmutableMap.of( + SQLConf.CBO_ENABLED().key(), "true", SparkSQLProperties.REPORT_COLUMN_STATS, "false"); + + Map reportColStatsEnabled = + ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true"); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, + "/test/statistics/file.puffin", + 100, + 42, + ImmutableList.of( + new GenericBlobMetadata( + APACHE_DATASKETCHES_THETA_V1, + snapshotId, + 1, + ImmutableList.of(1), + ImmutableMap.of("ndv", "4")), + new GenericBlobMetadata( + APACHE_DATASKETCHES_THETA_V1, + snapshotId, + 1, + ImmutableList.of(2), + ImmutableMap.of("ndv", "2")))); + + table.updateStatistics().setStatistics(snapshotId, statisticsFile).commit(); + + checkColStatisticsNotReported(scan, 4L); + withSQLConf(reportColStatsDisabled, () -> checkColStatisticsNotReported(scan, 4L)); + + Map expectedTwoNDVs = Maps.newHashMap(); + expectedTwoNDVs.put("id", 4L); + expectedTwoNDVs.put("data", 2L); + withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, expectedTwoNDVs)); + } + @TestTemplate public void testUnpartitionedYears() throws Exception { createUnpartitionedTable(spark, tableName); @@ -734,6 +897,26 @@ private Expression[] expressions(Expression... expressions) { return expressions; } + private void checkColStatisticsNotReported(SparkScan scan, long expectedRowCount) { + Statistics stats = scan.estimateStatistics(); + assertThat(stats.numRows().getAsLong()).isEqualTo(expectedRowCount); + + Map columnStats = stats.columnStats(); + assertThat(columnStats.isEmpty()); + } + + private void checkColStatisticsReported( + SparkScan scan, long expectedRowCount, Map expectedNDVs) { + Statistics stats = scan.estimateStatistics(); + assertThat(stats.numRows().getAsLong()).isEqualTo(expectedRowCount); + + Map columnStats = stats.columnStats(); + for (Map.Entry entry : expectedNDVs.entrySet()) { + assertThat(columnStats.get(FieldReference.column(entry.getKey())).distinctCount().getAsLong()) + .isEqualTo(entry.getValue()); + } + } + private static LiteralValue intLit(int value) { return LiteralValue.apply(value, DataTypes.IntegerType); }