Skip to content

Commit

Permalink
Write min/max statistics with truncation for strings in parquet
Browse files Browse the repository at this point in the history
The current logic skips writing min/max statistics if they are longer than 4Kb
This is changed to write stats with truncation to allow readers to perform filtering
  • Loading branch information
raunaqmorarka committed Oct 28, 2023
1 parent 1e03f76 commit b9bc338
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.writer;

import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.format.Statistics;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.io.api.Binary;

import static com.google.common.base.Verify.verify;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;

public final class ParquetMetadataUtils
{
private ParquetMetadataUtils() {}

public static <T extends Comparable<T>> Statistics toParquetStatistics(org.apache.parquet.column.statistics.Statistics<T> stats, int truncateLength)
{
// TODO Utilize https://github.com/apache/parquet-format/pull/216 when available to populate is_max_value_exact/is_min_value_exact
if (isTruncationPossible(stats, truncateLength)) {
// parquet-mr drops statistics larger than MAX_STATS_SIZE rather than truncating them.
// In order to ensure truncation rather than no stats, we need to use a truncateLength which would never exceed ParquetMetadataConverter.MAX_STATS_SIZE
verify(
2L * truncateLength < MAX_STATS_SIZE,
"Twice of truncateLength %s must be less than MAX_STATS_SIZE %s",
truncateLength,
MAX_STATS_SIZE);
// We need to take a lock here because CharsetValidator inside BinaryTruncator modifies a reusable dummyBuffer in-place
// and DEFAULT_UTF8_TRUNCATOR is a static instance, which makes this method thread unsafe.
// isTruncationPossible should ensure that locking is used only when we expect truncation, which is an uncommon scenario.
// TODO remove synchronization when we use a release with the fix https://github.com/apache/parquet-mr/pull/1154
synchronized (ParquetMetadataUtils.class) {
return ParquetMetadataConverter.toParquetStatistics(stats, truncateLength);
}
}
return ParquetMetadataConverter.toParquetStatistics(stats);
}

private static <T extends Comparable<T>> boolean isTruncationPossible(org.apache.parquet.column.statistics.Statistics<T> stats, int truncateLength)
{
PrimitiveTypeName primitiveType = stats.type().getPrimitiveTypeName();
if (!primitiveType.equals(BINARY) && !primitiveType.equals(FIXED_LEN_BYTE_ARRAY)) {
return false;
}
if (stats.isEmpty() || !stats.hasNonNullValue() || !(stats instanceof BinaryStatistics binaryStatistics)) {
return false;
}
// non-null value exists, so min and max can't be null
Binary min = binaryStatistics.genericGetMin();
Binary max = binaryStatistics.genericGetMax();
return min.length() > truncateLength || max.length() > truncateLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public class PrimitiveColumnWriter
private static final int INSTANCE_SIZE = instanceSize(PrimitiveColumnWriter.class);
private static final int MINIMUM_OUTPUT_BUFFER_CHUNK_SIZE = 8 * 1024;
private static final int MAXIMUM_OUTPUT_BUFFER_CHUNK_SIZE = 2 * 1024 * 1024;
// ParquetMetadataConverter.MAX_STATS_SIZE is 4096, we need a value which would guarantee that min and max
// don't add up to 4096 (so less than 2048). Using 1K as that is big enough for most use cases.
private static final int MAX_STATISTICS_LENGTH_IN_BYTES = 1024;

private final ColumnDescriptor columnDescriptor;
private final CompressionCodec compressionCodec;
Expand Down Expand Up @@ -180,7 +183,7 @@ private ColumnMetaData getColumnMetaData()
totalUnCompressedSize,
totalCompressedSize,
-1);
columnMetaData.setStatistics(ParquetMetadataConverter.toParquetStatistics(columnStatistics));
columnMetaData.setStatistics(ParquetMetadataUtils.toParquetStatistics(columnStatistics, MAX_STATISTICS_LENGTH_IN_BYTES));
ImmutableList.Builder<PageEncodingStats> pageEncodingStats = ImmutableList.builder();
dataPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableListMultimap;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.parquet.DataPage;
import io.trino.parquet.DiskRange;
Expand All @@ -26,6 +27,8 @@
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.PageReader;
import io.trino.parquet.reader.TestingParquetDataSource;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Type;
import org.apache.parquet.VersionParser;
Expand All @@ -49,13 +52,15 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.operator.scalar.CharacterStringCasts.varcharToVarcharSaturatedFloorCast;
import static io.trino.parquet.ParquetCompressionUtils.decompress;
import static io.trino.parquet.ParquetTestUtils.createParquetWriter;
import static io.trino.parquet.ParquetTestUtils.generateInputPages;
import static io.trino.parquet.ParquetTestUtils.writeParquetFile;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Math.toIntExact;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
Expand Down Expand Up @@ -126,6 +131,52 @@ public void testWrittenPageSize()
assertThat(pagesRead).isGreaterThan(10);
}

@Test
public void testLargeStringTruncation()
throws IOException
{
List<String> columnNames = ImmutableList.of("columnA", "columnB");
List<Type> types = ImmutableList.of(VARCHAR, VARCHAR);

Slice minA = Slices.utf8Slice("abc".repeat(300)); // within truncation threshold
Block blockA = VARCHAR.createBlockBuilder(null, 2)
.writeEntry(minA)
.writeEntry(Slices.utf8Slice("y".repeat(3200))) // bigger than truncation threshold
.build();

String threeByteCodePoint = new String(Character.toChars(0x20AC));
String maxCodePoint = new String(Character.toChars(Character.MAX_CODE_POINT));
Slice minB = Slices.utf8Slice(threeByteCodePoint.repeat(300)); // truncation in middle of unicode bytes
Block blockB = VARCHAR.createBlockBuilder(null, 2)
.writeEntry(minB)
// start with maxCodePoint to make it max value in stats
// last character for truncation is maxCodePoint
.writeEntry(Slices.utf8Slice(maxCodePoint + "d".repeat(1017) + maxCodePoint))
.build();

ParquetDataSource dataSource = new TestingParquetDataSource(
writeParquetFile(
ParquetWriterOptions.builder().build(),
types,
columnNames,
ImmutableList.of(new Page(2, blockA, blockB))),
new ParquetReaderOptions());

ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
BlockMetaData blockMetaData = getOnlyElement(parquetMetadata.getBlocks());

ColumnChunkMetaData chunkMetaData = blockMetaData.getColumns().get(0);
assertThat(chunkMetaData.getStatistics().getMinBytes()).isEqualTo(minA.getBytes());
Slice truncatedMax = Slices.utf8Slice("y".repeat(1023) + "z");
assertThat(chunkMetaData.getStatistics().getMaxBytes()).isEqualTo(truncatedMax.getBytes());

chunkMetaData = blockMetaData.getColumns().get(1);
Slice truncatedMin = varcharToVarcharSaturatedFloorCast(1024, minB);
assertThat(chunkMetaData.getStatistics().getMinBytes()).isEqualTo(truncatedMin.getBytes());
truncatedMax = Slices.utf8Slice(maxCodePoint + "d".repeat(1016) + "e");
assertThat(chunkMetaData.getStatistics().getMaxBytes()).isEqualTo(truncatedMax.getBytes());
}

@Test
public void testColumnReordering()
throws IOException
Expand Down

0 comments on commit b9bc338

Please sign in to comment.