diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index a879fc5f51d1..2de423146ac6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -133,9 +133,7 @@ public static Metrics footerMetrics( increment(valueCounts, fieldId, column.getValueCount()); Statistics stats = column.getStatistics(); - if (stats == null) { - missingStats.add(fieldId); - } else if (!stats.isEmpty()) { + if (stats != null && !stats.isEmpty()) { increment(nullValueCounts, fieldId, stats.getNumNulls()); // when there are metrics gathered by Iceberg for a column, we should use those instead @@ -153,6 +151,8 @@ public static Metrics footerMetrics( updateMax(upperBounds, fieldId, field.type(), max, metricsMode); } } + } else { + missingStats.add(fieldId); } } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index b21e234a5d3a..ae0a822d3464 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -18,12 +18,14 @@ */ package org.apache.iceberg.parquet; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.iceberg.Files.localInput; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; import static org.apache.iceberg.parquet.ParquetWritingTestUtils.write; +import static org.apache.iceberg.relocated.com.google.common.collect.Iterables.getOnlyElement; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; @@ -38,8 +40,12 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.Files; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -47,8 +53,10 @@ import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.util.Pair; import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.schema.MessageType; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -86,6 +94,60 @@ public void testRowGroupSizeConfigurableWithWriter() throws IOException { } } + @Test + public void testMetricsMissingColumnStatisticsInRowGroups() throws IOException { + Schema schema = new Schema(optional(1, "stringCol", Types.StringType.get())); + + File file = createTempFile(temp); + + List records = Lists.newArrayListWithCapacity(1); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + + GenericData.Record smallRecord = new GenericData.Record(avroSchema); + smallRecord.put("stringCol", "test"); + records.add(smallRecord); + + GenericData.Record largeRecord = new GenericData.Record(avroSchema); + largeRecord.put("stringCol", Strings.repeat("a", 2048)); + records.add(largeRecord); + + write( + file, + schema, + ImmutableMap.builder() + .put(PARQUET_ROW_GROUP_SIZE_BYTES, "1") + .put(PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, "1") + .put(PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, "1") + .buildOrThrow(), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); + + InputFile inputFile = Files.localInput(file); + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) { + assertThat(reader.getRowGroups()).hasSize(2); + List blocks = reader.getFooter().getBlocks(); + assertThat(blocks).hasSize(2); + + Statistics smallStatistics = getOnlyElement(blocks.get(0).getColumns()).getStatistics(); + assertThat(smallStatistics.hasNonNullValue()).isTrue(); + assertThat(smallStatistics.getMinBytes()).isEqualTo("test".getBytes(UTF_8)); + assertThat(smallStatistics.getMaxBytes()).isEqualTo("test".getBytes(UTF_8)); + + // parquet-mr doesn't write stats larger than the max size rather than truncating + Statistics largeStatistics = getOnlyElement(blocks.get(1).getColumns()).getStatistics(); + assertThat(largeStatistics.hasNonNullValue()).isFalse(); + assertThat(largeStatistics.getMinBytes()).isNull(); + assertThat(largeStatistics.getMaxBytes()).isNull(); + } + + // Null count, lower and upper bounds should be empty because + // one of the statistics in row groups is missing + Metrics metrics = ParquetUtil.fileMetrics(inputFile, MetricsConfig.getDefault()); + assertThat(metrics.nullValueCounts()).isEmpty(); + assertThat(metrics.lowerBounds()).isEmpty(); + assertThat(metrics.upperBounds()).isEmpty(); + } + @Test public void testNumberOfBytesWritten() throws IOException { Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));