From b9bc338c479f0117843fb244e5aff9eb36539c22 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Thu, 14 Sep 2023 16:27:18 +0530 Subject: [PATCH] Write min/max statistics with truncation for strings in parquet The current logic skips writing min/max statistics if they are longer than 4Kb This is changed to write stats with truncation to allow readers to perform filtering --- .../parquet/writer/ParquetMetadataUtils.java | 67 +++++++++++++++++++ .../parquet/writer/PrimitiveColumnWriter.java | 5 +- .../parquet/writer/TestParquetWriter.java | 51 ++++++++++++++ 3 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetMetadataUtils.java diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetMetadataUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetMetadataUtils.java new file mode 100644 index 000000000000..422688a80cde --- /dev/null +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetMetadataUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.writer; + +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.format.Statistics; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.io.api.Binary; + +import static com.google.common.base.Verify.verify; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; + +public final class ParquetMetadataUtils +{ + private ParquetMetadataUtils() {} + + public static > Statistics toParquetStatistics(org.apache.parquet.column.statistics.Statistics stats, int truncateLength) + { + // TODO Utilize https://github.com/apache/parquet-format/pull/216 when available to populate is_max_value_exact/is_min_value_exact + if (isTruncationPossible(stats, truncateLength)) { + // parquet-mr drops statistics larger than MAX_STATS_SIZE rather than truncating them. + // In order to ensure truncation rather than no stats, we need to use a truncateLength which would never exceed ParquetMetadataConverter.MAX_STATS_SIZE + verify( + 2L * truncateLength < MAX_STATS_SIZE, + "Twice of truncateLength %s must be less than MAX_STATS_SIZE %s", + truncateLength, + MAX_STATS_SIZE); + // We need to take a lock here because CharsetValidator inside BinaryTruncator modifies a reusable dummyBuffer in-place + // and DEFAULT_UTF8_TRUNCATOR is a static instance, which makes this method thread unsafe. + // isTruncationPossible should ensure that locking is used only when we expect truncation, which is an uncommon scenario. + // TODO remove synchronization when we use a release with the fix https://github.com/apache/parquet-mr/pull/1154 + synchronized (ParquetMetadataUtils.class) { + return ParquetMetadataConverter.toParquetStatistics(stats, truncateLength); + } + } + return ParquetMetadataConverter.toParquetStatistics(stats); + } + + private static > boolean isTruncationPossible(org.apache.parquet.column.statistics.Statistics stats, int truncateLength) + { + PrimitiveTypeName primitiveType = stats.type().getPrimitiveTypeName(); + if (!primitiveType.equals(BINARY) && !primitiveType.equals(FIXED_LEN_BYTE_ARRAY)) { + return false; + } + if (stats.isEmpty() || !stats.hasNonNullValue() || !(stats instanceof BinaryStatistics binaryStatistics)) { + return false; + } + // non-null value exists, so min and max can't be null + Binary min = binaryStatistics.genericGetMin(); + Binary max = binaryStatistics.genericGetMax(); + return min.length() > truncateLength || max.length() > truncateLength; + } +} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java index 6935edcfefbb..d925d690288c 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java @@ -60,6 +60,9 @@ public class PrimitiveColumnWriter private static final int INSTANCE_SIZE = instanceSize(PrimitiveColumnWriter.class); private static final int MINIMUM_OUTPUT_BUFFER_CHUNK_SIZE = 8 * 1024; private static final int MAXIMUM_OUTPUT_BUFFER_CHUNK_SIZE = 2 * 1024 * 1024; + // ParquetMetadataConverter.MAX_STATS_SIZE is 4096, we need a value which would guarantee that min and max + // don't add up to 4096 (so less than 2048). Using 1K as that is big enough for most use cases. + private static final int MAX_STATISTICS_LENGTH_IN_BYTES = 1024; private final ColumnDescriptor columnDescriptor; private final CompressionCodec compressionCodec; @@ -180,7 +183,7 @@ private ColumnMetaData getColumnMetaData() totalUnCompressedSize, totalCompressedSize, -1); - columnMetaData.setStatistics(ParquetMetadataConverter.toParquetStatistics(columnStatistics)); + columnMetaData.setStatistics(ParquetMetadataUtils.toParquetStatistics(columnStatistics, MAX_STATISTICS_LENGTH_IN_BYTES)); ImmutableList.Builder pageEncodingStats = ImmutableList.builder(); dataPagesWithEncoding.entrySet().stream() .map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue())) diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java index dc190818d88f..42fee9542a81 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableListMultimap; import io.airlift.slice.Slice; import io.airlift.slice.SliceInput; +import io.airlift.slice.Slices; import io.airlift.units.DataSize; import io.trino.parquet.DataPage; import io.trino.parquet.DiskRange; @@ -26,6 +27,8 @@ import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.PageReader; import io.trino.parquet.reader.TestingParquetDataSource; +import io.trino.spi.Page; +import io.trino.spi.block.Block; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Type; import org.apache.parquet.VersionParser; @@ -49,6 +52,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; +import static io.trino.operator.scalar.CharacterStringCasts.varcharToVarcharSaturatedFloorCast; import static io.trino.parquet.ParquetCompressionUtils.decompress; import static io.trino.parquet.ParquetTestUtils.createParquetWriter; import static io.trino.parquet.ParquetTestUtils.generateInputPages; @@ -56,6 +60,7 @@ import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.Math.toIntExact; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; @@ -126,6 +131,52 @@ public void testWrittenPageSize() assertThat(pagesRead).isGreaterThan(10); } + @Test + public void testLargeStringTruncation() + throws IOException + { + List columnNames = ImmutableList.of("columnA", "columnB"); + List types = ImmutableList.of(VARCHAR, VARCHAR); + + Slice minA = Slices.utf8Slice("abc".repeat(300)); // within truncation threshold + Block blockA = VARCHAR.createBlockBuilder(null, 2) + .writeEntry(minA) + .writeEntry(Slices.utf8Slice("y".repeat(3200))) // bigger than truncation threshold + .build(); + + String threeByteCodePoint = new String(Character.toChars(0x20AC)); + String maxCodePoint = new String(Character.toChars(Character.MAX_CODE_POINT)); + Slice minB = Slices.utf8Slice(threeByteCodePoint.repeat(300)); // truncation in middle of unicode bytes + Block blockB = VARCHAR.createBlockBuilder(null, 2) + .writeEntry(minB) + // start with maxCodePoint to make it max value in stats + // last character for truncation is maxCodePoint + .writeEntry(Slices.utf8Slice(maxCodePoint + "d".repeat(1017) + maxCodePoint)) + .build(); + + ParquetDataSource dataSource = new TestingParquetDataSource( + writeParquetFile( + ParquetWriterOptions.builder().build(), + types, + columnNames, + ImmutableList.of(new Page(2, blockA, blockB))), + new ParquetReaderOptions()); + + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + BlockMetaData blockMetaData = getOnlyElement(parquetMetadata.getBlocks()); + + ColumnChunkMetaData chunkMetaData = blockMetaData.getColumns().get(0); + assertThat(chunkMetaData.getStatistics().getMinBytes()).isEqualTo(minA.getBytes()); + Slice truncatedMax = Slices.utf8Slice("y".repeat(1023) + "z"); + assertThat(chunkMetaData.getStatistics().getMaxBytes()).isEqualTo(truncatedMax.getBytes()); + + chunkMetaData = blockMetaData.getColumns().get(1); + Slice truncatedMin = varcharToVarcharSaturatedFloorCast(1024, minB); + assertThat(chunkMetaData.getStatistics().getMinBytes()).isEqualTo(truncatedMin.getBytes()); + truncatedMax = Slices.utf8Slice(maxCodePoint + "d".repeat(1016) + "e"); + assertThat(chunkMetaData.getStatistics().getMaxBytes()).isEqualTo(truncatedMax.getBytes()); + } + @Test public void testColumnReordering() throws IOException