From 041670a32447aec4066068f5e4f82be25d72ba1a Mon Sep 17 00:00:00 2001 From: Jinyang Li Date: Thu, 12 Dec 2024 11:04:34 -0800 Subject: [PATCH] consolidate createPrunedColumnsMetadata and ParquetMetadata#buildBlocks --- .../parquet/metadata/ParquetMetadata.java | 208 +++++++++++------- .../parquet/metadata/PrunedBlockMetadata.java | 46 +--- .../parquet/predicate/PredicateUtils.java | 60 ++--- .../trino/parquet/reader/MetadataReader.java | 5 +- .../parquet/reader/TrinoColumnIndexStore.java | 8 +- .../trino/parquet/writer/ParquetWriter.java | 13 +- .../BenchmarkColumnarFilterParquetData.java | 2 +- .../io/trino/parquet/ParquetTestUtils.java | 5 +- .../reader/TestByteStreamSplitEncoding.java | 2 +- .../parquet/reader/TestInt96Timestamp.java | 2 +- .../parquet/reader/TestParquetReader.java | 52 +++-- .../trino/parquet/reader/TestTimeMillis.java | 2 +- .../parquet/writer/TestParquetWriter.java | 50 +++-- .../plugin/deltalake/DeltaLakeMergeSink.java | 10 +- .../DeltaLakePageSourceProvider.java | 2 +- .../plugin/deltalake/DeltaLakeWriter.java | 6 +- .../plugin/deltalake/TestDeltaLakeBasic.java | 2 +- .../parquet/ParquetPageSourceFactory.java | 6 +- .../hive/parquet/TestBloomFilterStore.java | 8 +- .../plugin/hudi/HudiPageSourceProvider.java | 7 +- .../iceberg/IcebergPageSourceProvider.java | 7 +- .../iceberg/IcebergParquetFileWriter.java | 6 +- .../iceberg/procedure/MigrationUtils.java | 4 +- .../plugin/iceberg/util/ParquetUtil.java | 32 ++- .../BaseIcebergConnectorSmokeTest.java | 6 +- .../iceberg/BaseIcebergConnectorTest.java | 8 +- .../plugin/iceberg/IcebergTestUtils.java | 10 +- ...MinioParquetCachingConnectorSmokeTest.java | 2 + ...IcebergMinioParquetConnectorSmokeTest.java | 2 + .../TestIcebergParquetConnectorTest.java | 2 + ...tIcebergGlueCatalogConnectorSmokeTest.java | 2 + ...tIcebergJdbcCatalogConnectorSmokeTest.java | 2 + ...cebergNessieCatalogConnectorSmokeTest.java | 2 + ...ebergPolarisCatalogConnectorSmokeTest.java | 2 + ...alogNestedNamespaceConnectorSmokeTest.java | 2 + ...ergTrinoRestCatalogConnectorSmokeTest.java | 2 + ...gVendingRestCatalogConnectorSmokeTest.java | 2 + ...ergSnowflakeCatalogConnectorSmokeTest.java | 2 + testing/trino-faulttolerant-tests/pom.xml | 6 + ...etFaultTolerantExecutionConnectorTest.java | 2 + 40 files changed, 330 insertions(+), 269 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java index 2a3e51c6c3f4..3d4a96d36fbd 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java @@ -17,9 +17,13 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.log.Logger; +import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetCorruptionException; +import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.RowGroupInfo; +import io.trino.parquet.reader.TrinoColumnIndexStore; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.format.ColumnChunk; @@ -30,14 +34,13 @@ import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -47,7 +50,7 @@ import java.util.Optional; import java.util.Set; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats; @@ -65,21 +68,19 @@ public class ParquetMetadata private static final Logger log = Logger.get(ParquetMetadata.class); private final FileMetaData fileMetaData; + private final MessageType messageType; private final ParquetDataSourceId dataSourceId; private final FileMetadata parquetMetadata; - private final Optional offset; - private final Optional length; + private final Optional diskRange; - public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId, Optional offset, Optional length) + public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId, Optional diskRange) throws ParquetCorruptionException { this.fileMetaData = requireNonNull(fileMetaData, "fileMetaData is null"); + this.messageType = readMessageType(); this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null"); - this.offset = requireNonNull(offset, "offset is null"); - this.length = requireNonNull(length, "length is null"); - checkArgument(offset.isEmpty() && length.isEmpty() || (offset.isPresent() && length.isPresent()), - "Both offset and length must be present or absent"); - this.parquetMetadata = new FileMetadata(readMessageType(), keyValueMetaData(fileMetaData), fileMetaData.getCreated_by()); + this.diskRange = requireNonNull(diskRange, "range is null"); + this.parquetMetadata = new FileMetadata(messageType, keyValueMetaData(fileMetaData), fileMetaData.getCreated_by()); } public FileMetadata getFileMetaData() @@ -90,86 +91,126 @@ public FileMetadata getFileMetaData() @Override public String toString() { - return "ParquetMetaData{" + fileMetaData + "}"; + return toStringHelper(this) + .add("dataSourceId", dataSourceId) + .add("fileMetaData", fileMetaData) + .add("diskRange", diskRange) + .toString(); } - public List getBlocks(Collection columnDescriptors) + private List getRowGroups() { - Set paths = columnDescriptors.stream() - .map(ColumnDescriptor::getPath) - .map(ColumnPath::get) - .collect(toImmutableSet()); + List rowGroups = fileMetaData.getRow_groups(); + if (rowGroups == null) { + return ImmutableList.of(); + } + ImmutableList.Builder builder = ImmutableList.builder(); + long lastRowCount = 0; + long fileRowCount = 0; + for (int i = 0; i < rowGroups.size(); i++) { + RowGroup rowGroup = rowGroups.get(i); + fileRowCount += lastRowCount; + lastRowCount = rowGroup.getNum_rows(); + if (diskRange.isPresent() && rowGroup.isSetFile_offset()) { + if (rowGroup.file_offset >= diskRange.get().getOffset() + diskRange.get().getLength()) { + break; + } + if (i < rowGroups.size() - 1 && rowGroups.get(i + 1).isSetFile_offset() && diskRange.get().getOffset() >= rowGroups.get(i + 1).file_offset) { + continue; + } + } + builder.add(new RowGroupOffset(rowGroup, fileRowCount)); + } - return buildBlocks(paths); + return builder.build(); } - public List getBlocks() + private ColumnChunkMetadata toColumnChunkMetadata(MessageType messageType, ColumnChunk columnChunk, ColumnPath columnPath) { - return buildBlocks(ImmutableSet.of()); + ColumnMetaData metaData = columnChunk.meta_data; + PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType(); + ColumnChunkMetadata column = ColumnChunkMetadata.get( + columnPath, + primitiveType, + CompressionCodecName.fromParquet(metaData.codec), + convertEncodingStats(metaData.encoding_stats), + readEncodings(metaData.encodings), + MetadataReader.readStats(Optional.ofNullable(fileMetaData.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType), + metaData.data_page_offset, + metaData.dictionary_page_offset, + metaData.num_values, + metaData.total_compressed_size, + metaData.total_uncompressed_size); + column.setColumnIndexReference(toColumnIndexReference(columnChunk)); + column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); + column.setBloomFilterOffset(metaData.bloom_filter_offset); + + return column; } - private List buildBlocks(Set paths) + public List getRowGroupInfo() + throws ParquetCorruptionException { - List schema = fileMetaData.getSchema(); - MessageType messageType = readParquetSchema(schema); - List blocks = new ArrayList<>(); - List rowGroups = fileMetaData.getRow_groups(); - if (rowGroups != null) { - for (int i = 0; i < rowGroups.size(); i++) { - RowGroup rowGroup = rowGroups.get(i); - List columns = rowGroup.getColumns(); - checkState(!columns.isEmpty(), "No columns in row group: %s [%s]", rowGroup, dataSourceId); - String filePath = columns.get(0).getFile_path(); + return getRowGroupInfo(Optional.empty(), Optional.empty()); + } - if (offset.isPresent() && length.isPresent() && rowGroup.isSetFile_offset()) { - if (rowGroup.file_offset >= offset.get() + length.get()) { - break; - } - if (i < rowGroups.size() - 1 && rowGroups.get(i + 1).isSetFile_offset() && offset.get() >= rowGroups.get(i + 1).file_offset) { - continue; - } + public List getRowGroupInfo(Optional dataSource, Optional, ColumnDescriptor>> descriptorsByPath) + throws ParquetCorruptionException + { + Optional> filterColumnPaths = descriptorsByPath.map(dp -> + dp.keySet().stream() + .map(p -> p.toArray(new String[0])) + .map(ColumnPath::get) + .collect(toImmutableSet())); + ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); + for (RowGroupOffset rowGroupOffset : getRowGroups()) { + List columns = rowGroupOffset.rowGroup.getColumns(); + validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroupOffset.rowGroup); + String filePath = columns.getFirst().getFile_path(); + + ImmutableMap.Builder columnMetadataBuilder = ImmutableMap.builderWithExpectedSize(columns.size()); + + for (ColumnChunk columnChunk : columns) { + checkState((filePath == null && columnChunk.getFile_path() == null) + || (filePath != null && filePath.equals(columnChunk.getFile_path())), + "all column chunks of the same row group must be in the same file [%s]", dataSourceId); + ColumnPath columnPath = toColumnPath(columnChunk); + if (filterColumnPaths.isEmpty() || filterColumnPaths.get().contains(columnPath)) { + ColumnChunkMetadata chunkMetadata = toColumnChunkMetadata(messageType, columnChunk, columnPath); + columnMetadataBuilder.put(columnPath, chunkMetadata); } - ImmutableList.Builder columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size()); - for (ColumnChunk columnChunk : columns) { - checkState( - (filePath == null && columnChunk.getFile_path() == null) - || (filePath != null && filePath.equals(columnChunk.getFile_path())), - "all column chunks of the same row group must be in the same file [%s]", dataSourceId); - ColumnMetaData metaData = columnChunk.meta_data; - String[] path = metaData.path_in_schema.stream() - .map(value -> value.toLowerCase(Locale.ENGLISH)) - .toArray(String[]::new); - ColumnPath columnPath = ColumnPath.get(path); - if (!paths.isEmpty() && !paths.contains(columnPath)) { - continue; + } + Map columnChunkMetadata = columnMetadataBuilder.buildOrThrow(); + + if (filterColumnPaths.isPresent() && filterColumnPaths.get().size() != columnChunkMetadata.size()) { + Set> existingPaths = columns.stream() + .map(ParquetMetadata::toColumnPath) + .map(p -> ImmutableList.copyOf(p.toArray())) + .collect(toImmutableSet()); + for (Map.Entry, ColumnDescriptor> entry : descriptorsByPath.get().entrySet()) { + if (!existingPaths.contains(entry.getKey())) { + throw new ParquetCorruptionException(dataSourceId, "Metadata is missing for column: %s", entry.getValue()); } - PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType(); - ColumnChunkMetadata column = ColumnChunkMetadata.get( - columnPath, - primitiveType, - CompressionCodecName.fromParquet(metaData.codec), - convertEncodingStats(metaData.encoding_stats), - readEncodings(metaData.encodings), - MetadataReader.readStats(Optional.ofNullable(fileMetaData.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType), - metaData.data_page_offset, - metaData.dictionary_page_offset, - metaData.num_values, - metaData.total_compressed_size, - metaData.total_uncompressed_size); - column.setColumnIndexReference(toColumnIndexReference(columnChunk)); - column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); - column.setBloomFilterOffset(metaData.bloom_filter_offset); - columnMetadataBuilder.add(column); } - blocks.add(new BlockMetadata(rowGroup.getNum_rows(), columnMetadataBuilder.build())); } + + PrunedBlockMetadata columnsMetadata = new PrunedBlockMetadata(rowGroupOffset.rowGroup.getNum_rows(), dataSourceId, columnChunkMetadata); + Optional indexStore = Optional.empty(); + if (filterColumnPaths.isPresent() && dataSource.isPresent()) { + indexStore = Optional.of(new TrinoColumnIndexStore(dataSource.get(), columnsMetadata.getBlockMetadata(), filterColumnPaths.get(), ImmutableSet.of())); + } + rowGroupInfoBuilder.add(new RowGroupInfo(columnsMetadata, rowGroupOffset.offset, indexStore)); } - return blocks; + return rowGroupInfoBuilder.build(); } - private static MessageType readParquetSchema(List schema) + private MessageType readMessageType() + throws ParquetCorruptionException { + List schema = fileMetaData.getSchema(); + validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); + Iterator schemaIterator = schema.iterator(); SchemaElement rootSchema = schemaIterator.next(); Types.MessageTypeBuilder builder = Types.buildMessage(); @@ -177,6 +218,14 @@ private static MessageType readParquetSchema(List schema) return builder.named(rootSchema.name); } + private static ColumnPath toColumnPath(ColumnChunk columnChunk) + { + String[] paths = columnChunk.meta_data.path_in_schema.stream() + .map(value -> value.toLowerCase(Locale.ENGLISH)) + .toArray(String[]::new); + return ColumnPath.get(paths); + } + private static void readTypeSchema(Types.GroupBuilder builder, Iterator schemaIterator, int typeCount) { for (int i = 0; i < typeCount; i++) { @@ -248,19 +297,6 @@ private static Set readEncodings(List schema = fileMetaData.getSchema(); - validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); - - Iterator schemaIterator = schema.iterator(); - SchemaElement rootSchema = schemaIterator.next(); - Types.MessageTypeBuilder builder = Types.buildMessage(); - readTypeSchema(builder, schemaIterator, rootSchema.getNum_children()); - return builder.named(rootSchema.name); - } - private static Map keyValueMetaData(FileMetaData fileMetaData) { if (fileMetaData.getKey_value_metadata() == null) { @@ -268,4 +304,8 @@ private static Map keyValueMetaData(FileMetaData fileMetaData) } return fileMetaData.getKey_value_metadata().stream().collect(toMap(KeyValue::getKey, KeyValue::getValue)); } + + private record RowGroupOffset(RowGroup rowGroup, long offset) + { + } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/PrunedBlockMetadata.java b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/PrunedBlockMetadata.java index 63004cc2ca9f..78c30443cb29 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/PrunedBlockMetadata.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/PrunedBlockMetadata.java @@ -14,57 +14,29 @@ package io.trino.parquet.metadata; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSourceId; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.ColumnPath; import java.util.List; import java.util.Map; -import java.util.Set; import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static java.util.Arrays.asList; -import static java.util.function.Function.identity; public final class PrunedBlockMetadata { - /** - * Stores only the necessary columns metadata from BlockMetadata and indexes them by path for efficient look-ups - */ - public static PrunedBlockMetadata createPrunedColumnsMetadata(BlockMetadata blockMetadata, ParquetDataSourceId dataSourceId, Map, ColumnDescriptor> descriptorsByPath) - throws ParquetCorruptionException - { - Set> requiredPaths = descriptorsByPath.keySet(); - Map, ColumnChunkMetadata> columnMetadataByPath = blockMetadata.columns().stream() - .collect(toImmutableMap( - column -> asList(column.getPath().toArray()), - identity(), - // Same column name may occur more than once when the file is written by case-sensitive tools - (oldValue, _) -> oldValue)); - ImmutableMap.Builder, ColumnChunkMetadata> columnMetadataByPathBuilder = ImmutableMap.builderWithExpectedSize(requiredPaths.size()); - for (Map.Entry, ColumnDescriptor> entry : descriptorsByPath.entrySet()) { - List requiredPath = entry.getKey(); - ColumnDescriptor columnDescriptor = entry.getValue(); - ColumnChunkMetadata columnChunkMetadata = columnMetadataByPath.get(requiredPath); - if (columnChunkMetadata == null) { - throw new ParquetCorruptionException(dataSourceId, "Metadata is missing for column: %s", columnDescriptor); - } - columnMetadataByPathBuilder.put(requiredPath, columnChunkMetadata); - } - return new PrunedBlockMetadata(blockMetadata.rowCount(), dataSourceId, columnMetadataByPathBuilder.buildOrThrow()); - } - private final long rowCount; private final ParquetDataSourceId dataSourceId; - private final Map, ColumnChunkMetadata> columnMetadataByPath; + private final Map columnMetadataByPath; + private final BlockMetadata blockMetadata; - private PrunedBlockMetadata(long rowCount, ParquetDataSourceId dataSourceId, Map, ColumnChunkMetadata> columnMetadataByPath) + public PrunedBlockMetadata(long rowCount, ParquetDataSourceId dataSourceId, Map columnMetadataByPath) { this.rowCount = rowCount; this.dataSourceId = dataSourceId; this.columnMetadataByPath = columnMetadataByPath; + this.blockMetadata = new BlockMetadata(rowCount, ImmutableList.copyOf(columnMetadataByPath.values())); } public long getRowCount() @@ -77,10 +49,15 @@ public List getColumns() return ImmutableList.copyOf(columnMetadataByPath.values()); } + public BlockMetadata getBlockMetadata() + { + return blockMetadata; + } + public ColumnChunkMetadata getColumnChunkMetaData(ColumnDescriptor columnDescriptor) throws ParquetCorruptionException { - ColumnChunkMetadata columnChunkMetadata = columnMetadataByPath.get(asList(columnDescriptor.getPath())); + ColumnChunkMetadata columnChunkMetadata = columnMetadataByPath.get(ColumnPath.get(columnDescriptor.getPath())); if (columnChunkMetadata == null) { throw new ParquetCorruptionException(dataSourceId, "Metadata is missing for column: %s", columnDescriptor); } @@ -93,6 +70,7 @@ public String toString() return toStringHelper(this) .add("rowCount", rowCount) .add("columnMetadataByPath", columnMetadataByPath) + .add("blockMetadata", blockMetadata) .toString(); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java index 6901bb23a4e6..8f1a655576ca 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java @@ -27,6 +27,7 @@ import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; +import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.metadata.PrunedBlockMetadata; import io.trino.parquet.reader.RowGroupInfo; import io.trino.spi.predicate.TupleDomain; @@ -39,6 +40,7 @@ import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.PageType; import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; import org.apache.parquet.io.ParquetDecodingException; @@ -54,11 +56,11 @@ import java.util.Optional; import java.util.Set; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.parquet.BloomFilterStore.getBloomFilterStore; import static io.trino.parquet.ParquetCompressionUtils.decompress; import static io.trino.parquet.ParquetReaderUtils.isOnlyDictionaryEncodingPages; import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding; -import static io.trino.parquet.metadata.PrunedBlockMetadata.createPrunedColumnsMetadata; import static io.trino.parquet.reader.TrinoColumnIndexStore.getColumnIndexStore; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateType.DATE; @@ -180,10 +182,8 @@ public static boolean predicateMatches( } public static List getFilteredRowGroups( - long splitStart, - long splitLength, + ParquetMetadata parquetMetadata, ParquetDataSource dataSource, - List blocksMetaData, List> parquetTupleDomains, List parquetPredicates, Map, ColumnDescriptor> descriptorsByPath, @@ -192,35 +192,37 @@ public static List getFilteredRowGroups( ParquetReaderOptions options) throws IOException { - long fileRowCount = 0; + Set columnPaths = descriptorsByPath.keySet().stream() + .map(p -> p.toArray(new String[0])) + .map(ColumnPath::get) + .collect(toImmutableSet()); + + List rowGroupInfos = parquetMetadata.getRowGroupInfo(Optional.of(dataSource), Optional.of(descriptorsByPath)); ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); - for (BlockMetadata block : blocksMetaData) { - long blockStart = block.getStartingPos(); - boolean splitContainsBlock = splitStart <= blockStart && blockStart < splitStart + splitLength; - if (splitContainsBlock) { - for (int i = 0; i < parquetTupleDomains.size(); i++) { - TupleDomain parquetTupleDomain = parquetTupleDomains.get(i); - TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i); - Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); - Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); - PrunedBlockMetadata columnsMetadata = createPrunedColumnsMetadata(block, dataSource.getId(), descriptorsByPath); - if (predicateMatches( - parquetPredicate, - columnsMetadata, - dataSource, - descriptorsByPath, - parquetTupleDomain, - columnIndex, - bloomFilterStore, - timeZone, - domainCompactionThreshold)) { - rowGroupInfoBuilder.add(new RowGroupInfo(columnsMetadata, fileRowCount, columnIndex)); - break; - } + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + BlockMetadata block = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); + + for (int i = 0; i < parquetTupleDomains.size(); i++) { + TupleDomain parquetTupleDomain = parquetTupleDomains.get(i); + TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i); + Optional columnIndex = getColumnIndexStore(dataSource, block, columnPaths, parquetTupleDomain, options); + Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); + if (predicateMatches( + parquetPredicate, + rowGroupInfo.prunedBlockMetadata(), + dataSource, + descriptorsByPath, + parquetTupleDomain, + columnIndex, + bloomFilterStore, + timeZone, + domainCompactionThreshold)) { + rowGroupInfoBuilder.add(new RowGroupInfo(rowGroupInfo.prunedBlockMetadata(), block.rowCount(), columnIndex)); + break; } } - fileRowCount += block.rowCount(); } + return rowGroupInfoBuilder.build(); } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java index f63aea8fcc8c..5ed5cad8e0e9 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java @@ -16,6 +16,7 @@ import com.google.common.annotations.VisibleForTesting; import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; @@ -52,7 +53,7 @@ public final class MetadataReader private MetadataReader() {} - public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional parquetWriteValidation, Optional offset, Optional length) + public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional parquetWriteValidation, Optional diskRange) throws IOException { // Parquet File Layout: @@ -89,7 +90,7 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional< InputStream metadataStream = buffer.slice(buffer.length() - completeFooterSize, metadataLength).getInput(); FileMetaData fileMetaData = readFileMetaData(metadataStream); - ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, dataSource.getId(), offset, length); + ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, dataSource.getId(), diskRange); validateFileMetadata(dataSource.getId(), parquetMetadata.getFileMetaData(), parquetWriteValidation); return parquetMetadata; } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java index fa9b7ae142d5..88383d07dd09 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java @@ -35,7 +35,6 @@ import java.io.IOException; import java.io.InputStream; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -140,7 +139,7 @@ public OffsetIndex getOffsetIndex(ColumnPath column) public static Optional getColumnIndexStore( ParquetDataSource dataSource, BlockMetadata blockMetadata, - Map, ColumnDescriptor> descriptorsByPath, + Set columnsReadPaths, TupleDomain parquetTupleDomain, ParquetReaderOptions options) { @@ -160,11 +159,6 @@ public static Optional getColumnIndexStore( return Optional.empty(); } - Set columnsReadPaths = new HashSet<>(descriptorsByPath.size()); - for (List path : descriptorsByPath.keySet()) { - columnsReadPaths.add(ColumnPath.get(path.toArray(new String[0]))); - } - Map parquetDomains = parquetTupleDomain.getDomains() .orElseThrow(() -> new IllegalStateException("Predicate other than none should have domains")); Set columnsFilteredPaths = parquetDomains.keySet().stream() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index 39047357cdf7..55ec3236fe70 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -24,12 +24,10 @@ import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.ParquetWriteValidation; -import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.FileMetadata; import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; -import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.writer.ColumnWriter.BufferData; import io.trino.spi.Page; import io.trino.spi.type.Type; @@ -77,7 +75,6 @@ import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName; import static io.trino.parquet.ParquetWriteValidation.ParquetWriteValidationBuilder; -import static io.trino.parquet.metadata.PrunedBlockMetadata.createPrunedColumnsMetadata; import static io.trino.parquet.writer.ParquetDataOutput.createDataOutput; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DoubleType.DOUBLE; @@ -240,7 +237,7 @@ public void validate(ParquetDataSource input) checkState(validationBuilder.isPresent(), "validation is not enabled"); ParquetWriteValidation writeValidation = validationBuilder.get().build(); try { - ParquetMetadata parquetMetadata = MetadataReader.readFooter(input, Optional.of(writeValidation), Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(input, Optional.of(writeValidation), Optional.empty()); try (ParquetReader parquetReader = createParquetReader(input, parquetMetadata, writeValidation)) { for (Page page = parquetReader.nextPage(); page != null; page = parquetReader.nextPage()) { // fully load the page @@ -277,16 +274,10 @@ private ParquetReader createParquetReader(ParquetDataSource input, ParquetMetada .orElseThrow())); } Map, ColumnDescriptor> descriptorsByPath = getDescriptors(fileMetaData.getSchema(), fileMetaData.getSchema()); - long nextStart = 0; - ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); - for (BlockMetadata block : parquetMetadata.getBlocks()) { - rowGroupInfoBuilder.add(new RowGroupInfo(createPrunedColumnsMetadata(block, input.getId(), descriptorsByPath), nextStart, Optional.empty())); - nextStart += block.rowCount(); - } return new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), columnFields.build(), - rowGroupInfoBuilder.build(), + parquetMetadata.getRowGroupInfo(Optional.of(input), Optional.of(descriptorsByPath)), input, parquetTimeZone.orElseThrow(), newSimpleAggregatedMemoryContext(), diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/BenchmarkColumnarFilterParquetData.java b/lib/trino-parquet/src/test/java/io/trino/parquet/BenchmarkColumnarFilterParquetData.java index f7228cf4580d..e6cdd9825e77 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/BenchmarkColumnarFilterParquetData.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/BenchmarkColumnarFilterParquetData.java @@ -225,7 +225,7 @@ public void setup() testData.getColumnNames(), testData.getPages()), new ParquetReaderOptions()); - parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); + parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); columnNames = columns.stream() .map(TpchColumn::getColumnName) .collect(toImmutableList()); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java index febdaccf617b..7f012a8a2b26 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java @@ -141,16 +141,15 @@ public static ParquetReader createParquetReader( columnName -> descriptorsByPath.get(ImmutableList.of(columnName.toLowerCase(ENGLISH)))); TupleDomainParquetPredicate parquetPredicate = buildPredicate(fileSchema, parquetTupleDomain, descriptorsByPath, UTC); List rowGroups = getFilteredRowGroups( - 0, - input.getEstimatedSize(), + parquetMetadata, input, - parquetMetadata.getBlocks(), ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, UTC, 1000, options); + return new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), columnFields.build(), diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java index 47028cbb5e91..7f448bdbed2d 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java @@ -50,7 +50,7 @@ public void testReadFloatDouble() ParquetDataSource dataSource = new FileParquetDataSource( new File(Resources.getResource("byte_stream_split_float_and_double.parquet").toURI()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); readAndCompare(reader, getExpectedValues()); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java index ead68a944880..d9ca113e5625 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java @@ -112,7 +112,7 @@ public void testNanosOutsideDayRange() ParquetDataSource dataSource = new FileParquetDataSource( new File(Resources.getResource("int96_timestamps_nanos_outside_day_range.parquet").toURI()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); Page page = reader.nextPage(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java index 62022dece9e7..f7fda503028c 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java @@ -15,14 +15,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; import io.airlift.units.DataSize; import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.metadata.PrunedBlockMetadata; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -85,13 +86,13 @@ public void testColumnReaderMemoryUsage() columnNames, generateInputPages(types, 100, 5)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - assertThat(parquetMetadata.getBlocks().size()).isGreaterThan(1); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo().size()).isGreaterThan(1); // Verify file has only non-dictionary encodings as dictionary memory usage is already tested in TestFlatColumnReader#testMemoryUsage - parquetMetadata.getBlocks().forEach(block -> { - block.columns() + parquetMetadata.getRowGroupInfo().forEach(rowGroupInfo -> { + rowGroupInfo.prunedBlockMetadata().getBlockMetadata().columns() .forEach(columnChunkMetaData -> assertThat(columnChunkMetaData.getEncodingStats().hasDictionaryEncodedPages()).isFalse()); - assertThat(block.rowCount()).isEqualTo(100); + assertThat(rowGroupInfo.prunedBlockMetadata().getBlockMetadata().rowCount()).isEqualTo(100); }); AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); @@ -111,7 +112,7 @@ public void testColumnReaderMemoryUsage() assertThat(currentMemoryUsage).isGreaterThan(initialMemoryUsage); // Memory usage does not change until next row group (1 page per row-group) - long rowGroupRowCount = parquetMetadata.getBlocks().get(0).rowCount(); + long rowGroupRowCount = parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getBlockMetadata().rowCount(); int rowsRead = page.getPositionCount(); while (rowsRead < rowGroupRowCount) { rowsRead += reader.nextPage().getPositionCount(); @@ -138,8 +139,8 @@ public void testEmptyRowRangesWithColumnIndex() ParquetDataSource dataSource = new FileParquetDataSource( new File(Resources.getResource("lineitem_sorted_by_shipdate/data.parquet").toURI()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - assertThat(parquetMetadata.getBlocks()).hasSize(2); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo()).hasSize(2); // The predicate and the file are prepared so that page indexes will result in non-overlapping row ranges and eliminate the entire first row group // while the second row group still has to be read TupleDomain predicate = TupleDomain.withColumnDomains( @@ -159,7 +160,7 @@ public void testEmptyRowRangesWithColumnIndex() assertThat(metrics).containsKey(COLUMN_INDEX_ROWS_FILTERED); // Column index should filter at least the first row group assertThat(((Count) metrics.get(COLUMN_INDEX_ROWS_FILTERED)).getTotal()) - .isGreaterThanOrEqualTo(parquetMetadata.getBlocks().get(0).rowCount()); + .isGreaterThanOrEqualTo(parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getBlockMetadata().rowCount()); } } @@ -215,22 +216,27 @@ public void testOffsetColumnFilter() // Read single column, 1 row group ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[] {"columnb"}, Types.optional(INT64).named(""), 0, 0); - ParquetMetadata parquetMetadata1 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(100L), Optional.of(101L)); - List columnBlocks1 = parquetMetadata1.getBlocks(ImmutableSet.of(columnDescriptor)); - assertThat(columnBlocks1.stream().allMatch(block -> block.columns().size() == 1)).isTrue(); - assertThat(columnBlocks1.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(100); + ParquetMetadata parquetMetadata1 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(new DiskRange(100, 101))); + List rowGroupInfo1 = parquetMetadata1.getRowGroupInfo(Optional.of(dataSource), Optional.of(ImmutableMap.of(ImmutableList.of("columnb"), columnDescriptor))); + + assertThat(rowGroupInfo1.stream().allMatch(rg -> rg.prunedBlockMetadata().getColumns().size() == 1)).isTrue(); + assertThat(rowGroupInfo1.stream().map(RowGroupInfo::prunedBlockMetadata).mapToLong(PrunedBlockMetadata::getRowCount).sum()).isEqualTo(100); // Read both columns, half row groups - ParquetMetadata parquetMetadata2 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(0L), Optional.of(estimatedDataSize / 2)); - List columnBlocks2 = parquetMetadata2.getBlocks(); - assertThat(columnBlocks2.stream().allMatch(block -> block.columns().size() == 2)).isTrue(); - assertThat(columnBlocks2.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(300); + ParquetMetadata parquetMetadata2 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(new DiskRange(0, estimatedDataSize / 2))); + List rowGroupInfo2 = parquetMetadata2.getRowGroupInfo(); + + assertThat(rowGroupInfo2.stream().map(RowGroupInfo::prunedBlockMetadata).map(PrunedBlockMetadata::getColumns).allMatch(c -> c.size() == 2)).isTrue(); + assertThat(rowGroupInfo2.stream().map(RowGroupInfo::prunedBlockMetadata).map(PrunedBlockMetadata::getBlockMetadata).mapToLong(BlockMetadata::rowCount).sum()) + .isEqualTo(300); // Read both columns, all row groups - ParquetMetadata parquetMetadata3 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - List columnBlocks3 = parquetMetadata3.getBlocks(); - assertThat(columnBlocks3.stream().allMatch(block -> block.columns().size() == 2)).isTrue(); - assertThat(columnBlocks3.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(500); + ParquetMetadata parquetMetadata3 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + List rowGroupInfo3 = parquetMetadata3.getRowGroupInfo(); + + assertThat(rowGroupInfo2.stream().map(RowGroupInfo::prunedBlockMetadata).map(PrunedBlockMetadata::getColumns).allMatch(c -> c.size() == 2)).isTrue(); + assertThat(rowGroupInfo3.stream().map(RowGroupInfo::prunedBlockMetadata).map(PrunedBlockMetadata::getBlockMetadata).mapToLong(BlockMetadata::rowCount).sum()) + .isEqualTo(500); } private void testReadingOldParquetFiles(File file, List columnNames, Type columnType, List expectedValues) @@ -240,7 +246,7 @@ private void testReadingOldParquetFiles(File file, List columnNames, Typ file, new ParquetReaderOptions()); ConnectorSession session = TestingConnectorSession.builder().build(); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); try (ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), ImmutableList.of(columnType), columnNames)) { Page page = reader.nextPage(); Iterator expected = expectedValues.iterator(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestTimeMillis.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestTimeMillis.java index 08a2fd5952e3..99ae226bca08 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestTimeMillis.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestTimeMillis.java @@ -60,7 +60,7 @@ private void testTimeMillsInt32(TimeType timeType) ParquetDataSource dataSource = new FileParquetDataSource( new File(Resources.getResource("time_millis_int32.snappy.parquet").toURI()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); Page page = reader.nextPage(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java index ef995ce4b753..5a248553afd2 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java @@ -33,6 +33,7 @@ import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.PageReader; import io.trino.parquet.reader.ParquetReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.reader.TestingParquetDataSource; import io.trino.spi.Page; import io.trino.spi.block.Block; @@ -128,11 +129,11 @@ public void testWrittenPageSize() columnNames, generateInputPages(types, 100, 1000)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - assertThat(parquetMetadata.getBlocks()).hasSize(1); - assertThat(parquetMetadata.getBlocks().get(0).rowCount()).isEqualTo(100 * 1000); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo()).hasSize(1); + assertThat(parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getBlockMetadata().rowCount()).isEqualTo(100 * 1000); - ColumnChunkMetadata chunkMetaData = parquetMetadata.getBlocks().get(0).columns().get(0); + ColumnChunkMetadata chunkMetaData = parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getColumns().getFirst(); DiskRange range = new DiskRange(chunkMetaData.getStartingPos(), chunkMetaData.getTotalSize()); Map chunkReader = dataSource.planRead(ImmutableListMultimap.of(0, range), newSimpleAggregatedMemoryContext()); @@ -177,12 +178,12 @@ public void testWrittenPageValueCount() columnNames, generateInputPages(types, 100, 1000)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - assertThat(parquetMetadata.getBlocks()).hasSize(1); - assertThat(parquetMetadata.getBlocks().get(0).rowCount()).isEqualTo(100 * 1000); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo()).hasSize(1); + assertThat(parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getRowCount()).isEqualTo(100 * 1000); - ColumnChunkMetadata columnAMetaData = parquetMetadata.getBlocks().get(0).columns().get(0); - ColumnChunkMetadata columnBMetaData = parquetMetadata.getBlocks().get(0).columns().get(1); + ColumnChunkMetadata columnAMetaData = parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getColumns().getFirst(); + ColumnChunkMetadata columnBMetaData = parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getColumns().get(1); Map chunkReader = dataSource.planRead( ImmutableListMultimap.of( 0, new DiskRange(columnAMetaData.getStartingPos(), columnAMetaData.getTotalSize()), @@ -258,8 +259,8 @@ public void testLargeStringTruncation() ImmutableList.of(new Page(2, blockA, blockB))), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - BlockMetadata blockMetaData = getOnlyElement(parquetMetadata.getBlocks()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + BlockMetadata blockMetaData = getOnlyElement(parquetMetadata.getRowGroupInfo()).prunedBlockMetadata().getBlockMetadata(); ColumnChunkMetadata chunkMetaData = blockMetaData.columns().get(0); assertThat(chunkMetaData.getStatistics().getMinBytes()).isEqualTo(minA.getBytes()); @@ -291,11 +292,11 @@ public void testColumnReordering() generateInputPages(types, 100, 100)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - assertThat(parquetMetadata.getBlocks().size()).isGreaterThanOrEqualTo(10); - for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo().size()).isGreaterThanOrEqualTo(10); + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { // Verify that the columns are stored in the same order as the metadata - List offsets = blockMetaData.columns().stream() + List offsets = rowGroupInfo.prunedBlockMetadata().getColumns().stream() .map(ColumnChunkMetadata::getFirstDataPageOffset) .collect(toImmutableList()); assertThat(offsets).isSorted(); @@ -348,10 +349,10 @@ public void testDictionaryPageOffset() generateInputPages(types, 100, 100)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - assertThat(parquetMetadata.getBlocks().size()).isGreaterThanOrEqualTo(1); - for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { - ColumnChunkMetadata chunkMetaData = getOnlyElement(blockMetaData.columns()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo().size()).isGreaterThanOrEqualTo(1); + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { + ColumnChunkMetadata chunkMetaData = getOnlyElement(rowGroupInfo.prunedBlockMetadata().getColumns()); assertThat(chunkMetaData.getDictionaryPageOffset()).isGreaterThan(0); int dictionaryPageSize = toIntExact(chunkMetaData.getFirstDataPageOffset() - chunkMetaData.getDictionaryPageOffset()); assertThat(dictionaryPageSize).isGreaterThan(0); @@ -397,10 +398,11 @@ public void testWriteBloomFilters(Type type, List data) generateInputPages(types, 100, data)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); // Check that bloom filters are right after each other int bloomFilterSize = Integer.highestOneBit(BlockSplitBloomFilter.optimalNumOfBits(BLOOM_FILTER_EXPECTED_ENTRIES, DEFAULT_BLOOM_FILTER_FPP) / 8) << 1; - for (BlockMetadata block : parquetMetadata.getBlocks()) { + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { + BlockMetadata block = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); for (int i = 0; i < block.columns().size(); i++) { ColumnChunkMetadata chunkMetaData = block.columns().get(i); assertThat(hasBloomFilter(chunkMetaData)).isTrue(); @@ -414,7 +416,7 @@ public void testWriteBloomFilters(Type type, List data) } } } - int rowGroupCount = parquetMetadata.getBlocks().size(); + int rowGroupCount = parquetMetadata.getRowGroupInfo().size(); assertThat(rowGroupCount).isGreaterThanOrEqualTo(2); TupleDomain predicate = TupleDomain.withColumnDomains( @@ -462,8 +464,8 @@ void testBloomFilterWithDictionaryFallback() .build()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - BlockMetadata blockMetaData = getOnlyElement(parquetMetadata.getBlocks()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + BlockMetadata blockMetaData = getOnlyElement(parquetMetadata.getRowGroupInfo()).prunedBlockMetadata().getBlockMetadata(); ColumnChunkMetadata chunkMetaData = getOnlyElement(blockMetaData.columns()); assertThat(chunkMetaData.getEncodingStats().hasDictionaryPages()).isTrue(); assertThat(chunkMetaData.getEncodingStats().hasDictionaryEncodedPages()).isTrue(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 05bc022eacf2..35ead8cdc410 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -25,9 +25,10 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; -import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.metadata.PrunedBlockMetadata; import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.plugin.deltalake.delete.RoaringBitmapArray; import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; @@ -378,8 +379,11 @@ private Slice writeMergeResult(Slice path, FileDeletion deletion) } TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path.toStringUtf8())); try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, parquetReaderOptions, fileFormatDataSourceStats)) { - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - long rowCount = parquetMetadata.getBlocks().stream().map(BlockMetadata::rowCount).mapToLong(Long::longValue).sum(); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + long rowCount = parquetMetadata.getRowGroupInfo(Optional.of(dataSource), Optional.empty()).stream() + .map(RowGroupInfo::prunedBlockMetadata) + .mapToLong(PrunedBlockMetadata::getRowCount) + .sum(); RoaringBitmapArray rowsRetained = new RoaringBitmapArray(); rowsRetained.addRange(0, rowCount - 1); rowsRetained.andNot(deletedRows); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 2f02ff987d5e..6ae9b850f5a4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -303,7 +303,7 @@ private PositionDeleteFilter readDeletes( public Map loadParquetIdAndNameMapping(TrinoInputFile inputFile, ParquetReaderOptions options) { try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, options, fileFormatDataSourceStats)) { - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index 5d7e28a3d445..82f752f97221 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -22,6 +22,7 @@ import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.deltalake.DataFileInfo.DataFileType; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics; import io.trino.plugin.hive.FileWriter; @@ -183,7 +184,7 @@ public DataFileInfo getDataFileInfo() { Location path = rootTableLocation.appendPath(relativeFilePath); FileMetaData fileMetaData = fileWriter.getFileMetadata(); - ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString()), Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString()), Optional.empty()); return new DataFileInfo( relativeFilePath, @@ -203,7 +204,8 @@ public static DeltaLakeJsonFileStatistics readStatistics(ParquetMetadata parquet .collect(toImmutableMap(column -> column.basePhysicalColumnName().toLowerCase(ENGLISH), DeltaLakeColumnHandle::basePhysicalType)); ImmutableMultimap.Builder metadataForColumn = ImmutableMultimap.builder(); - for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { + BlockMetadata blockMetaData = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); for (ColumnChunkMetadata columnChunkMetaData : blockMetaData.columns()) { if (columnChunkMetaData.getPath().size() != 1) { continue; // Only base column stats are supported diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 9f376ce0f4c7..351feb720cd3 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -329,7 +329,7 @@ private void testOptimizeWithColumnMappingMode(String columnMappingMode) TrinoInputFile inputFile = new LocalInputFile(tableLocation.resolve(addFileEntry.getPath()).toFile()); ParquetMetadata parquetMetadata = MetadataReader.readFooter( new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()), - Optional.empty(), Optional.empty(), Optional.empty()); + Optional.empty(), Optional.empty()); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); PrimitiveType physicalType = getOnlyElement(fileMetaData.getSchema().getColumns().iterator()).getPrimitiveType(); assertThat(physicalType.getName()).isEqualTo(physicalName); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index 2c69a6851157..c8115af76a4c 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -221,7 +221,7 @@ public static ReaderPageSource createPageSource( AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); dataSource = createDataSource(inputFile, estimatedFileSize, options, memoryContext, stats); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, parquetWriteValidation, Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, parquetWriteValidation, Optional.empty()); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); fileSchema = fileMetaData.getSchema(); @@ -250,10 +250,8 @@ public static ReaderPageSource createPageSource( } List rowGroups = getFilteredRowGroups( - start, - length, + parquetMetadata, dataSource, - parquetMetadata.getBlocks(descriptorsByPath.values()), parquetTupleDomains, parquetPredicates, descriptorsByPath, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java index 443ae8fea106..2265c0373425 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java @@ -22,6 +22,7 @@ import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.predicate.TupleDomainParquetPredicate; import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; @@ -308,10 +309,11 @@ private static BloomFilterStore generateBloomFilterStore(ParquetTester.TempFile TrinoInputFile inputFile = new LocalInputFile(tempFile.getFile()); TrinoParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - ColumnChunkMetadata columnChunkMetaData = getOnlyElement(getOnlyElement(parquetMetadata.getBlocks()).columns()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + RowGroupInfo rowGroupInfo = getOnlyElement(parquetMetadata.getRowGroupInfo()); + ColumnChunkMetadata columnChunkMetaData = getOnlyElement(rowGroupInfo.prunedBlockMetadata().getBlockMetadata().columns()); - return new BloomFilterStore(dataSource, getOnlyElement(parquetMetadata.getBlocks()), Set.of(columnChunkMetaData.getPath())); + return new BloomFilterStore(dataSource, rowGroupInfo.prunedBlockMetadata().getBlockMetadata(), Set.of(columnChunkMetaData.getPath())); } private static class BloomFilterTypeTestCase diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 8860248ab208..c0ebe56e94ac 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -20,6 +20,7 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; @@ -198,7 +199,7 @@ private static ConnectorPageSource createPageSource( try { AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); dataSource = createDataSource(inputFile, OptionalLong.of(hudiSplit.getFileSize()), options, memoryContext, dataSourceStats); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(new DiskRange(start, length))); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); @@ -215,10 +216,8 @@ private static ConnectorPageSource createPageSource( TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone); List rowGroups = getFilteredRowGroups( - start, - length, + parquetMetadata, dataSource, - parquetMetadata.getBlocks(), ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index ca1a21661c21..7a0cb6a334a8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -34,6 +34,7 @@ import io.trino.orc.TupleDomainOrcPredicate; import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder; import io.trino.parquet.Column; +import io.trino.parquet.DiskRange; import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; @@ -912,7 +913,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( ParquetDataSource dataSource = null; try { dataSource = createDataSource(inputFile, OptionalLong.of(fileSize), options, memoryContext, fileFormatDataSourceStats); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(start), Optional.of(length)); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(new DiskRange(start, length))); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); if (nameMapping.isPresent() && !ParquetSchemaUtil.hasIds(fileSchema)) { @@ -938,10 +939,8 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC); List rowGroups = getFilteredRowGroups( - start, - length, + parquetMetadata, dataSource, - parquetMetadata.getBlocks(), ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java index 22107bf26ee4..f9b2d303648d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java @@ -17,6 +17,7 @@ import io.trino.filesystem.TrinoOutputFile; import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.plugin.hive.parquet.ParquetFileWriter; import io.trino.spi.Page; @@ -82,8 +83,9 @@ public FileMetrics getFileMetrics() { ParquetMetadata parquetMetadata; try { - parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()), Optional.empty(), Optional.empty()); - return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata))); + parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()), Optional.empty()); + List rowGroupInfos = parquetMetadata.getRowGroupInfo(); + return new FileMetrics(footerMetrics(parquetMetadata, rowGroupInfos, Stream.empty(), metricsConfig, null), Optional.of(getSplitOffsets(rowGroupInfos))); } catch (IOException e) { throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Error creating metadata for Parquet file %s", location), e); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java index c5eff09ece1b..a2a4b75b78d9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java @@ -148,8 +148,8 @@ public static Metrics loadMetrics(TrinoInputFile file, HiveStorageFormat storage private static Metrics parquetMetrics(TrinoInputFile file, MetricsConfig metricsConfig, NameMapping nameMapping) { try (ParquetDataSource dataSource = new TrinoParquetDataSource(file, new ParquetReaderOptions(), new FileFormatDataSourceStats())) { - ParquetMetadata metadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty(), Optional.empty()); - return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig, nameMapping); + ParquetMetadata metadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + return ParquetUtil.footerMetrics(metadata, metadata.getRowGroupInfo(), Stream.empty(), metricsConfig, nameMapping); } catch (IOException e) { throw new UncheckedIOException("Failed to read file footer: " + file.location(), e); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java index 0a676ca339ca..06209a1ecac9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java @@ -14,10 +14,12 @@ package io.trino.plugin.iceberg.util; -import com.google.common.collect.ImmutableList; +import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.metadata.PrunedBlockMetadata; +import io.trino.parquet.reader.RowGroupInfo; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; @@ -45,8 +47,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -57,6 +57,7 @@ import java.util.function.Function; import java.util.stream.Stream; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toMap; @@ -68,13 +69,9 @@ public final class ParquetUtil // based on org.apache.iceberg.parquet.ParquetUtil and on org.apache.iceberg.parquet.ParquetConversions private ParquetUtil() {} - public static Metrics footerMetrics(ParquetMetadata metadata, Stream> fieldMetrics, MetricsConfig metricsConfig) - { - return footerMetrics(metadata, fieldMetrics, metricsConfig, null); - } - public static Metrics footerMetrics( ParquetMetadata metadata, + List rowGroupInfos, Stream> fieldMetrics, MetricsConfig metricsConfig, NameMapping nameMapping) @@ -95,8 +92,8 @@ public static Metrics footerMetrics( Map> fieldMetricsMap = fieldMetrics.collect(toMap(FieldMetrics::id, identity())); - List blocks = metadata.getBlocks(); - for (BlockMetadata block : blocks) { + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + BlockMetadata block = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); rowCount += block.rowCount(); for (ColumnChunkMetadata column : block.columns()) { Integer fieldId = fileSchema.aliasToId(column.getPath().toDotString()); @@ -155,14 +152,15 @@ public static Metrics footerMetrics( toBufferMap(fileSchema, upperBounds)); } - public static List getSplitOffsets(ParquetMetadata metadata) + public static List getSplitOffsets(List rowGroupInfos) + throws ParquetCorruptionException { - List splitOffsets = new ArrayList<>(metadata.getBlocks().size()); - for (BlockMetadata blockMetaData : metadata.getBlocks()) { - splitOffsets.add(blockMetaData.getStartingPos()); - } - Collections.sort(splitOffsets); - return ImmutableList.copyOf(splitOffsets); + return rowGroupInfos.stream() + .map(RowGroupInfo::prunedBlockMetadata) + .map(PrunedBlockMetadata::getBlockMetadata) + .map(BlockMetadata::getStartingPos) + .sorted() + .collect(toImmutableList()); } private static void updateFromFieldMetrics( diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index 8a77a946f14c..aceeb31b8b96 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -20,6 +20,7 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; import io.trino.testing.BaseConnectorSmokeTest; import io.trino.testing.TestingConnectorBehavior; @@ -522,6 +523,7 @@ public void testCreateTableWithNonExistingSchemaVerifyLocation() @Test public void testSortedNationTable() + throws ParquetCorruptionException { Session withSmallRowGroups = withSmallRowGroups(getSession()); try (TestTable table = new TestTable( @@ -538,6 +540,7 @@ public void testSortedNationTable() @Test public void testFileSortingWithLargerTable() + throws ParquetCorruptionException { // Using a larger table forces buffered data to be written to disk Session withSmallRowGroups = Session.builder(getSession()) @@ -734,7 +737,8 @@ public void testPartitionFilterRequired() assertUpdate(session, "DROP TABLE " + tableName); } - protected abstract boolean isFileSorted(Location path, String sortColumnName); + protected abstract boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException; @Test public void testTableChangesFunction() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 266d35e96136..aed5e600ba22 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -30,6 +30,7 @@ import io.trino.metadata.QualifiedObjectName; import io.trino.metadata.TableHandle; import io.trino.operator.OperatorStats; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.hive.HiveCompressionCodec; import io.trino.plugin.hive.TestingHivePlugin; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; @@ -1506,6 +1507,7 @@ private void testCreateSortedTableWithSortTransform(String columnName, String so @Test public void testSortOrderChange() + throws ParquetCorruptionException { Session withSmallRowGroups = withSmallRowGroups(getSession()); try (TestTable table = new TestTable( @@ -1535,6 +1537,7 @@ public void testSortOrderChange() @Test public void testSortingDisabled() + throws ParquetCorruptionException { Session withSortingDisabled = Session.builder(withSmallRowGroups(getSession())) .setCatalogSessionProperty(ICEBERG_CATALOG, "sorted_writing_enabled", "false") @@ -1553,6 +1556,7 @@ public void testSortingDisabled() @Test public void testOptimizeWithSortOrder() + throws ParquetCorruptionException { Session withSmallRowGroups = withSmallRowGroups(getSession()); try (TestTable table = new TestTable( @@ -1575,6 +1579,7 @@ public void testOptimizeWithSortOrder() @Test public void testUpdateWithSortOrder() + throws ParquetCorruptionException { Session withSmallRowGroups = withSmallRowGroups(getSession()); try (TestTable table = new TestTable( @@ -1595,7 +1600,8 @@ public void testUpdateWithSortOrder() } } - protected abstract boolean isFileSorted(String path, String sortColumnName); + protected abstract boolean isFileSorted(String path, String sortColumnName) + throws ParquetCorruptionException; @Test public void testSortingOnNestedField() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java index 493a6ddce569..26ae68b3334f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java @@ -26,11 +26,13 @@ import io.trino.orc.metadata.OrcColumnId; import io.trino.orc.metadata.statistics.StringStatistics; import io.trino.orc.metadata.statistics.StripeStatistics; +import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.TrinoViewHiveMetastore; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; @@ -126,20 +128,22 @@ private static boolean checkOrcFileSorting(Supplier dataSourceSup @SuppressWarnings({"unchecked", "rawtypes"}) public static boolean checkParquetFileSorting(TrinoInputFile inputFile, String sortColumnName) + throws ParquetCorruptionException { ParquetMetadata parquetMetadata; try { parquetMetadata = MetadataReader.readFooter( new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()), - Optional.empty(), Optional.empty(), Optional.empty()); + Optional.empty(), Optional.empty()); } catch (IOException e) { throw new UncheckedIOException(e); } Comparable previousMax = null; - verify(parquetMetadata.getBlocks().size() > 1, "Test must produce at least two row groups"); - for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { + verify(parquetMetadata.getRowGroupInfo().size() > 1, "Test must produce at least two row groups"); + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { + BlockMetadata blockMetaData = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); ColumnChunkMetadata columnMetadata = blockMetaData.columns().stream() .filter(column -> getOnlyElement(column.getPath().iterator()).equalsIgnoreCase(sortColumnName)) .collect(onlyElement()); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetCachingConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetCachingConnectorSmokeTest.java index addd4954ab4b..5ba0726f08a3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetCachingConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetCachingConnectorSmokeTest.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Closer; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import org.apache.iceberg.FileFormat; import org.junit.jupiter.api.AfterAll; @@ -60,6 +61,7 @@ public ImmutableMap getAdditionalIcebergProperties() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorSmokeTest.java index 06073417e602..b681c6ed3019 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorSmokeTest.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; import static org.apache.iceberg.FileFormat.PARQUET; @@ -28,6 +29,7 @@ public TestIcebergMinioParquetConnectorSmokeTest() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java index 99a2b2220dee..b992d39a9df0 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java @@ -16,6 +16,7 @@ import io.trino.Session; import io.trino.filesystem.Location; import io.trino.operator.OperatorStats; +import io.trino.parquet.ParquetCorruptionException; import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import io.trino.testing.QueryRunner.MaterializedResultWithPlan; @@ -151,6 +152,7 @@ public void testPushdownPredicateToParquetAfterColumnRename() @Override protected boolean isFileSorted(String path, String sortColumnName) + throws ParquetCorruptionException { return checkParquetFileSorting(fileSystem.newInputFile(Location.of(path)), sortColumnName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java index 9dec58090118..a4d79d5ccd19 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -34,6 +34,7 @@ import io.trino.hdfs.HdfsEnvironment; import io.trino.hdfs.TrinoHdfsFileSystemStats; import io.trino.hdfs.authentication.NoHdfsAuthentication; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergQueryRunner; import io.trino.plugin.iceberg.SchemaInitializer; @@ -248,6 +249,7 @@ protected void deleteDirectory(String location) @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { TrinoFileSystem fileSystem = fileSystemFactory.create(SESSION); return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java index 7d2de165809e..a8907a56009d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -241,6 +242,7 @@ protected void deleteDirectory(String location) @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java index 5ad8d045335c..6d215e079f40 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -366,6 +367,7 @@ public void testDropTableWithMissingDataFile() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java index 97a5a939dec8..bc4f44982e89 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.catalog.rest; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergConnector; @@ -129,6 +130,7 @@ protected boolean locationExists(String location) @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java index 28ff8ffe54ef..a621021ff2bf 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java @@ -15,6 +15,7 @@ import io.airlift.http.server.testing.TestingHttpServer; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -200,6 +201,7 @@ public void testDropTableWithNonExistentTableLocation() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java index 831e945240fa..aa3f71483081 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableMap; import io.airlift.http.server.testing.TestingHttpServer; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -249,6 +250,7 @@ public void testDropTableWithNonExistentTableLocation() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java index c2efa55daa8d..ee355ebd86bb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java @@ -19,6 +19,7 @@ import io.trino.filesystem.s3.S3FileSystemConfig; import io.trino.filesystem.s3.S3FileSystemFactory; import io.trino.filesystem.s3.S3FileSystemStats; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -309,6 +310,7 @@ public void testDropTableWithNonExistentTableLocation() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java index 394e4c0b32a9..24b22253aba5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergQueryRunner; import io.trino.plugin.iceberg.SchemaInitializer; @@ -686,6 +687,7 @@ public void testSnowflakeNativeTable() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/testing/trino-faulttolerant-tests/pom.xml b/testing/trino-faulttolerant-tests/pom.xml index b17540be8734..7727e628af1d 100644 --- a/testing/trino-faulttolerant-tests/pom.xml +++ b/testing/trino-faulttolerant-tests/pom.xml @@ -256,6 +256,12 @@ test + + io.trino + trino-parquet + test + + io.trino trino-plugin-toolkit diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java index c4a6e5a38f6e..4e1cf34a0e17 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java @@ -14,6 +14,7 @@ package io.trino.faulttolerant.iceberg; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -80,6 +81,7 @@ public void testStatsBasedRepartitionDataOnInsert() @Override protected boolean isFileSorted(String path, String sortColumnName) + throws ParquetCorruptionException { return checkParquetFileSorting(fileSystem.newInputFile(Location.of(path)), sortColumnName); }