diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle index 3b1761d39f63..a978cda9db0b 100644 --- a/spark/v3.4/build.gradle +++ b/spark/v3.4/build.gradle @@ -59,6 +59,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { implementation project(':iceberg-parquet') implementation project(':iceberg-arrow') implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}") + implementation("org.apache.datasketches:datasketches-java:${libs.versions.datasketches.get()}") if (scalaVersion == '2.12') { // scala-collection-compat_2.12 pulls scala 2.12.17 and we need 2.12.18 for JDK 21 support implementation 'org.scala-lang:scala-library:2.12.18' @@ -289,6 +290,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch' relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' + relocate 'org.apache.datasketches', 'org.apache.iceberg.shaded.org.apache.datasketches' archiveClassifier.set(null) } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java new file mode 100644 index 000000000000..a508021c1040 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ComputeTableStatsSparkAction.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.actions.ComputeTableStats; +import org.apache.iceberg.actions.ImmutableComputeTableStats; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Computes the statistics of the given columns and stores it as Puffin files. */ +public class ComputeTableStatsSparkAction extends BaseSparkAction + implements ComputeTableStats { + + private static final Logger LOG = LoggerFactory.getLogger(ComputeTableStatsSparkAction.class); + private static final Result EMPTY_RESULT = ImmutableComputeTableStats.Result.builder().build(); + + private final Table table; + private List columns; + private Snapshot snapshot; + + ComputeTableStatsSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + this.snapshot = table.currentSnapshot(); + } + + @Override + protected ComputeTableStatsSparkAction self() { + return this; + } + + @Override + public ComputeTableStats columns(String... newColumns) { + Preconditions.checkArgument( + newColumns != null && newColumns.length > 0, "Columns cannot be null/empty"); + this.columns = ImmutableList.copyOf(ImmutableSet.copyOf(newColumns)); + return this; + } + + @Override + public ComputeTableStats snapshot(long newSnapshotId) { + Snapshot newSnapshot = table.snapshot(newSnapshotId); + Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", newSnapshotId); + this.snapshot = newSnapshot; + return this; + } + + @Override + public Result execute() { + if (snapshot == null) { + LOG.info("No snapshot to compute stats for table {}", table.name()); + return EMPTY_RESULT; + } + validateColumns(); + JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc()); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + LOG.info( + "Computing stats for columns {} in {} (snapshot {})", + columns(), + table.name(), + snapshotId()); + List blobs = generateNDVBlobs(); + StatisticsFile statisticsFile = writeStatsFile(blobs); + table.updateStatistics().setStatistics(snapshotId(), statisticsFile).commit(); + return ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build(); + } + + private StatisticsFile writeStatsFile(List blobs) { + LOG.info("Writing stats for table {} for snapshot {}", table.name(), snapshotId()); + OutputFile outputFile = table.io().newOutputFile(outputPath()); + try (PuffinWriter writer = Puffin.write(outputFile).createdBy(appIdentifier()).build()) { + blobs.forEach(writer::add); + writer.finish(); + return new GenericStatisticsFile( + snapshotId(), + outputFile.location(), + writer.fileSize(), + writer.footerSize(), + GenericBlobMetadata.from(writer.writtenBlobsMetadata())); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + private List generateNDVBlobs() { + return NDVSketchUtil.generateBlobs(spark(), table, snapshot, columns()); + } + + private List columns() { + if (columns == null) { + Schema schema = table.schemas().get(snapshot.schemaId()); + this.columns = + schema.columns().stream() + .filter(nestedField -> nestedField.type().isPrimitiveType()) + .map(Types.NestedField::name) + .collect(Collectors.toList()); + } + return columns; + } + + private void validateColumns() { + Schema schema = table.schemas().get(snapshot.schemaId()); + Preconditions.checkArgument(!columns().isEmpty(), "No columns found to compute stats"); + for (String columnName : columns()) { + Types.NestedField field = schema.findField(columnName); + Preconditions.checkArgument(field != null, "Can't find column %s in %s", columnName, schema); + Preconditions.checkArgument( + field.type().isPrimitiveType(), + "Can't compute stats on non-primitive type column: %s (%s)", + columnName, + field.type()); + } + } + + private String appIdentifier() { + String icebergVersion = IcebergBuild.fullVersion(); + String sparkVersion = spark().version(); + return String.format("Iceberg %s Spark %s", icebergVersion, sparkVersion); + } + + private long snapshotId() { + return snapshot.snapshotId(); + } + + private String jobDesc() { + return String.format( + "Computing table stats for %s (snapshot_id=%s, columns=%s)", + table.name(), snapshotId(), columns()); + } + + private String outputPath() { + TableOperations operations = ((HasTableOperations) table).operations(); + String fileName = String.format("%s-%s.stats", snapshotId(), UUID.randomUUID()); + return operations.metadataFileLocation(fileName); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java new file mode 100644 index 000000000000..22055a161e4e --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.Sketch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.PuffinCompressionCodec; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.stats.ThetaSketchAgg; + +public class NDVSketchUtil { + + private NDVSketchUtil() {} + + public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv"; + + static List generateBlobs( + SparkSession spark, Table table, Snapshot snapshot, List columns) { + Row sketches = computeNDVSketches(spark, table, snapshot, columns); + Schema schema = table.schemas().get(snapshot.schemaId()); + List blobs = Lists.newArrayList(); + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = schema.findField(columns.get(i)); + Sketch sketch = CompactSketch.wrap(Memory.wrap((byte[]) sketches.get(i))); + blobs.add(toBlob(field, sketch, snapshot)); + } + return blobs; + } + + private static Blob toBlob(Types.NestedField field, Sketch sketch, Snapshot snapshot) { + return new Blob( + StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1, + ImmutableList.of(field.fieldId()), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + ByteBuffer.wrap(sketch.toByteArray()), + PuffinCompressionCodec.ZSTD, + ImmutableMap.of( + APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY, + String.valueOf((long) sketch.getEstimate()))); + } + + private static Row computeNDVSketches( + SparkSession spark, Table table, Snapshot snapshot, List colNames) { + return spark + .read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshot.snapshotId()) + .load(table.name()) + .select(toAggColumns(colNames)) + .first(); + } + + private static Column[] toAggColumns(List colNames) { + return colNames.stream().map(NDVSketchUtil::toAggColumn).toArray(Column[]::new); + } + + private static Column toAggColumn(String colName) { + ThetaSketchAgg agg = new ThetaSketchAgg(colName); + return new Column(agg.toAggregateExpression()); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java index fb67ded96e35..f845386d30c4 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java @@ -20,6 +20,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.actions.ActionsProvider; +import org.apache.iceberg.actions.ComputeTableStats; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier; import org.apache.spark.sql.SparkSession; @@ -96,4 +97,9 @@ public DeleteReachableFilesSparkAction deleteReachableFiles(String metadataLocat public RewritePositionDeleteFilesSparkAction rewritePositionDeletes(Table table) { return new RewritePositionDeleteFilesSparkAction(spark, table); } + + @Override + public ComputeTableStats computeTableStats(Table table) { + return new ComputeTableStatsSparkAction(spark, table); + } } diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAgg.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAgg.scala new file mode 100644 index 000000000000..cca16960f434 --- /dev/null +++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAgg.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.stats + +import java.nio.ByteBuffer +import org.apache.datasketches.common.Family +import org.apache.datasketches.memory.Memory +import org.apache.datasketches.theta.CompactSketch +import org.apache.datasketches.theta.SetOperationBuilder +import org.apache.datasketches.theta.Sketch +import org.apache.datasketches.theta.UpdateSketch +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.types.Conversions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.Decimal +import org.apache.spark.unsafe.types.UTF8String + +/** + * ThetaSketchAgg generates Alpha family sketch with default seed. + * The values fed to the sketch are converted to bytes using Iceberg's single value serialization. + * The result returned is an array of bytes of Compact Theta sketch of Datasketches library, + * which should be deserialized to Compact sketch before using. + * + * See [[https://iceberg.apache.org/puffin-spec/]] for more information. + * + */ +case class ThetaSketchAgg( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Sketch] with UnaryLike[Expression] { + + private lazy val icebergType = SparkSchemaUtil.convert(child.dataType) + + def this(colName: String) = { + this(col(colName).expr, 0, 0) + } + + override def dataType: DataType = BinaryType + + override def nullable: Boolean = false + + override def createAggregationBuffer(): Sketch = { + UpdateSketch.builder.setFamily(Family.ALPHA).build() + } + + override def update(buffer: Sketch, input: InternalRow): Sketch = { + val value = child.eval(input) + if (value != null) { + val icebergValue = toIcebergValue(value) + val byteBuffer = Conversions.toByteBuffer(icebergType, icebergValue) + buffer.asInstanceOf[UpdateSketch].update(byteBuffer) + } + buffer + } + + private def toIcebergValue(value: Any): Any = { + value match { + case s: UTF8String => s.toString + case d: Decimal => d.toJavaBigDecimal + case b: Array[Byte] => ByteBuffer.wrap(b) + case _ => value + } + } + + override def merge(buffer: Sketch, input: Sketch): Sketch = { + new SetOperationBuilder().buildUnion.union(buffer, input) + } + + override def eval(buffer: Sketch): Any = { + toBytes(buffer) + } + + override def serialize(buffer: Sketch): Array[Byte] = { + toBytes(buffer) + } + + override def deserialize(storageFormat: Array[Byte]): Sketch = { + CompactSketch.wrap(Memory.wrap(storageFormat)) + } + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = { + copy(mutableAggBufferOffset = newMutableAggBufferOffset) + } + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = { + copy(inputAggBufferOffset = newInputAggBufferOffset) + } + + override protected def withNewChildInternal(newChild: Expression): Expression = { + copy(child = newChild) + } + + private def toBytes(sketch: Sketch): Array[Byte] = { + val compactSketch = sketch.compact() + compactSketch.toByteArray + } +} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java new file mode 100644 index 000000000000..58703d4a90e5 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.spark.actions.NDVSketchUtil.APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.BlobMetadata; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ComputeTableStats; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.Types; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Test; + +public class TestComputeTableStatsAction extends SparkCatalogTestBase { + + private static final Types.StructType LEAF_STRUCT_TYPE = + Types.StructType.of( + optional(1, "leafLongCol", Types.LongType.get()), + optional(2, "leafDoubleCol", Types.DoubleType.get())); + + private static final Types.StructType NESTED_STRUCT_TYPE = + Types.StructType.of(required(3, "leafStructCol", LEAF_STRUCT_TYPE)); + + private static final Schema NESTED_SCHEMA = + new Schema(required(4, "nestedStructCol", NESTED_STRUCT_TYPE)); + + private static final Schema SCHEMA_WITH_NESTED_COLUMN = + new Schema( + required(4, "nestedStructCol", NESTED_STRUCT_TYPE), + required(5, "stringCol", Types.StringType.get())); + + public TestComputeTableStatsAction( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Test + public void testComputeTableStatsAction() throws NoSuchTableException, ParseException { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + // To create multiple splits on the mapper + table + .updateProperties() + .set("read.split.target-size", "100") + .set("write.parquet.row-group-size-bytes", "100") + .commit(); + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c"), + new SimpleRecord(4, "d")); + spark.createDataset(records, Encoders.bean(SimpleRecord.class)).writeTo(tableName).append(); + SparkActions actions = SparkActions.get(); + ComputeTableStats.Result results = + actions.computeTableStats(table).columns("id", "data").execute(); + assertThat(results).isNotNull(); + + List statisticsFiles = table.statisticsFiles(); + assertThat(statisticsFiles.size()).isEqualTo(1); + + StatisticsFile statisticsFile = statisticsFiles.get(0); + assertThat(statisticsFile.fileSizeInBytes()).isNotEqualTo(0); + assertThat(statisticsFile.blobMetadata().size()).isEqualTo(2); + + BlobMetadata blobMetadata = statisticsFile.blobMetadata().get(0); + assertThat(blobMetadata.properties().get(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY)) + .isEqualTo(String.valueOf(4)); + } + + @Test + public void testComputeTableStatsActionWithoutExplicitColumns() + throws NoSuchTableException, ParseException { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c"), + new SimpleRecord(4, "d")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + SparkActions actions = SparkActions.get(); + ComputeTableStats.Result results = actions.computeTableStats(table).execute(); + assertThat(results).isNotNull(); + + assertThat(table.statisticsFiles().size()).isEqualTo(1); + StatisticsFile statisticsFile = table.statisticsFiles().get(0); + assertThat(statisticsFile.blobMetadata().size()).isEqualTo(2); + assertThat(statisticsFile.fileSizeInBytes()).isNotEqualTo(0); + assertThat( + Long.parseLong( + statisticsFile + .blobMetadata() + .get(0) + .properties() + .get(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY))) + .isEqualTo(4); + assertThat( + Long.parseLong( + statisticsFile + .blobMetadata() + .get(1) + .properties() + .get(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY))) + .isEqualTo(4); + } + + @Test + public void testComputeTableStatsForInvalidColumns() throws NoSuchTableException, ParseException { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + // Append data to create snapshot + sql("INSERT into %s values(1, 'abcd')", tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + SparkActions actions = SparkActions.get(); + assertThatThrownBy(() -> actions.computeTableStats(table).columns("id1").execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Can't find column id1 in table"); + } + + @Test + public void testComputeTableStatsWithNoSnapshots() throws NoSuchTableException, ParseException { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + SparkActions actions = SparkActions.get(); + ComputeTableStats.Result result = actions.computeTableStats(table).columns("id").execute(); + assertThat(result.statisticsFile()).isNull(); + } + + @Test + public void testComputeTableStatsWithNullValues() throws NoSuchTableException, ParseException { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + List records = + Lists.newArrayList( + new SimpleRecord(1, null), + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c"), + new SimpleRecord(4, "d")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + SparkActions actions = SparkActions.get(); + ComputeTableStats.Result results = actions.computeTableStats(table).columns("data").execute(); + assertThat(results).isNotNull(); + + List statisticsFiles = table.statisticsFiles(); + assertThat(statisticsFiles.size()).isEqualTo(1); + + StatisticsFile statisticsFile = statisticsFiles.get(0); + assertThat(statisticsFile.fileSizeInBytes()).isNotEqualTo(0); + assertThat(statisticsFile.blobMetadata().size()).isEqualTo(1); + + BlobMetadata blobMetadata = statisticsFile.blobMetadata().get(0); + assertThat(blobMetadata.properties().get(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY)) + .isEqualTo(String.valueOf(4)); + } + + @Test + public void testComputeTableStatsWithSnapshotHavingDifferentSchemas() + throws NoSuchTableException, ParseException { + SparkActions actions = SparkActions.get(); + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + // Append data to create snapshot + sql("INSERT into %s values(1, 'abcd')", tableName); + long snapshotId1 = Spark3Util.loadIcebergTable(spark, tableName).currentSnapshot().snapshotId(); + // Snapshot id not specified + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + assertThatCode(() -> actions.computeTableStats(table).columns("data").execute()) + .doesNotThrowAnyException(); + + sql("ALTER TABLE %s DROP COLUMN %s", tableName, "data"); + // Append data to create snapshot + sql("INSERT into %s values(1)", tableName); + table.refresh(); + long snapshotId2 = Spark3Util.loadIcebergTable(spark, tableName).currentSnapshot().snapshotId(); + + // Snapshot id specified + assertThatCode( + () -> actions.computeTableStats(table).snapshot(snapshotId1).columns("data").execute()) + .doesNotThrowAnyException(); + + assertThatThrownBy( + () -> actions.computeTableStats(table).snapshot(snapshotId2).columns("data").execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Can't find column data in table"); + } + + @Test + public void testComputeTableStatsWhenSnapshotIdNotSpecified() + throws NoSuchTableException, ParseException { + sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName); + // Append data to create snapshot + sql("INSERT into %s values(1, 'abcd')", tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + SparkActions actions = SparkActions.get(); + ComputeTableStats.Result results = actions.computeTableStats(table).columns("data").execute(); + + assertThat(results).isNotNull(); + + List statisticsFiles = table.statisticsFiles(); + assertThat(statisticsFiles.size()).isEqualTo(1); + + StatisticsFile statisticsFile = statisticsFiles.get(0); + assertThat(statisticsFile.fileSizeInBytes()).isNotEqualTo(0); + assertThat(statisticsFile.blobMetadata().size()).isEqualTo(1); + + BlobMetadata blobMetadata = statisticsFile.blobMetadata().get(0); + assertThat(blobMetadata.properties().get(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY)) + .isEqualTo(String.valueOf(1)); + } + + @Test + public void testComputeTableStatsWithNestedSchema() + throws NoSuchTableException, ParseException, IOException { + List records = Lists.newArrayList(createNestedRecord()); + Table table = + validationCatalog.createTable( + tableIdent, + SCHEMA_WITH_NESTED_COLUMN, + PartitionSpec.unpartitioned(), + ImmutableMap.of()); + DataFile dataFile = + FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), records); + table.newAppend().appendFile(dataFile).commit(); + + Table tbl = Spark3Util.loadIcebergTable(spark, tableName); + SparkActions actions = SparkActions.get(); + actions.computeTableStats(tbl).execute(); + + tbl.refresh(); + List statisticsFiles = tbl.statisticsFiles(); + assertThat(statisticsFiles.size()).isEqualTo(1); + StatisticsFile statisticsFile = statisticsFiles.get(0); + assertThat(statisticsFile.fileSizeInBytes()).isNotEqualTo(0); + assertThat(statisticsFile.blobMetadata().size()).isEqualTo(1); + } + + @Test + public void testComputeTableStatsWithNoComputableColumns() throws IOException { + List records = Lists.newArrayList(createNestedRecord()); + Table table = + validationCatalog.createTable( + tableIdent, NESTED_SCHEMA, PartitionSpec.unpartitioned(), ImmutableMap.of()); + DataFile dataFile = + FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), records); + table.newAppend().appendFile(dataFile).commit(); + + table.refresh(); + SparkActions actions = SparkActions.get(); + assertThatThrownBy(() -> actions.computeTableStats(table).execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("No columns found to compute stats"); + } + + @Test + public void testComputeTableStatsOnByteColumn() throws NoSuchTableException, ParseException { + testComputeTableStats("byte_col", "TINYINT"); + } + + @Test + public void testComputeTableStatsOnShortColumn() throws NoSuchTableException, ParseException { + testComputeTableStats("short_col", "SMALLINT"); + } + + @Test + public void testComputeTableStatsOnIntColumn() throws NoSuchTableException, ParseException { + testComputeTableStats("int_col", "INT"); + } + + @Test + public void testComputeTableStatsOnLongColumn() throws NoSuchTableException, ParseException { + testComputeTableStats("long_col", "BIGINT"); + } + + @Test + public void testComputeTableStatsOnTimestampColumn() throws NoSuchTableException, ParseException { + testComputeTableStats("timestamp_col", "TIMESTAMP"); + } + + @Test + public void testComputeTableStatsOnTimestampNtzColumn() + throws NoSuchTableException, ParseException { + testComputeTableStats("timestamp_col", "TIMESTAMP_NTZ"); + } + + @Test + public void testComputeTableStatsOnDateColumn() throws NoSuchTableException, ParseException { + testComputeTableStats("date_col", "DATE"); + } + + @Test + public void testComputeTableStatsOnDecimalColumn() throws NoSuchTableException, ParseException { + testComputeTableStats("decimal_col", "DECIMAL(20, 2)"); + } + + @Test + public void testComputeTableStatsOnBinaryColumn() throws NoSuchTableException, ParseException { + testComputeTableStats("binary_col", "BINARY"); + } + + public void testComputeTableStats(String columnName, String type) + throws NoSuchTableException, ParseException { + sql("CREATE TABLE %s (id int, %s %s) USING iceberg", tableName, columnName, type); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + Dataset dataDF = randomDataDF(table.schema()); + append(tableName, dataDF); + + SparkActions actions = SparkActions.get(); + table.refresh(); + ComputeTableStats.Result results = + actions.computeTableStats(table).columns(columnName).execute(); + assertThat(results).isNotNull(); + + List statisticsFiles = table.statisticsFiles(); + assertThat(statisticsFiles.size()).isEqualTo(1); + + StatisticsFile statisticsFile = statisticsFiles.get(0); + assertThat(statisticsFile.fileSizeInBytes()).isNotEqualTo(0); + assertThat(statisticsFile.blobMetadata().size()).isEqualTo(1); + + BlobMetadata blobMetadata = statisticsFile.blobMetadata().get(0); + assertThat(blobMetadata.properties().get(APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY)) + .isNotNull(); + } + + private GenericRecord createNestedRecord() { + GenericRecord record = GenericRecord.create(SCHEMA_WITH_NESTED_COLUMN); + GenericRecord nested = GenericRecord.create(NESTED_STRUCT_TYPE); + GenericRecord leaf = GenericRecord.create(LEAF_STRUCT_TYPE); + leaf.set(0, 0L); + leaf.set(1, 0.0); + nested.set(0, leaf); + record.set(0, nested); + record.set(1, "data"); + return record; + } + + private Dataset randomDataDF(Schema schema) { + Iterable rows = RandomData.generateSpark(schema, 10, 0); + JavaRDD rowRDD = sparkContext.parallelize(Lists.newArrayList(rows)); + StructType rowSparkType = SparkSchemaUtil.convert(schema); + return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false); + } + + private void append(String table, Dataset df) throws NoSuchTableException { + // fanout writes are enabled as write-time clustering is not supported without Spark extensions + df.coalesce(1).writeTo(table).option(SparkWriteOptions.FANOUT_ENABLED, "true").append(); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } +}