Skip to content

Commit

Permalink
consolidate createPrunedColumnsMetadata and ParquetMetadata#buildBlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
jinyangli34 committed Dec 12, 2024
1 parent e7ef014 commit 041670a
Show file tree
Hide file tree
Showing 40 changed files with 330 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.trino.parquet.DiskRange;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.RowGroupInfo;
import io.trino.parquet.reader.TrinoColumnIndexStore;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.format.ColumnChunk;
Expand All @@ -30,14 +34,13 @@
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -47,7 +50,7 @@
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats;
Expand All @@ -65,21 +68,19 @@ public class ParquetMetadata
private static final Logger log = Logger.get(ParquetMetadata.class);

private final FileMetaData fileMetaData;
private final MessageType messageType;
private final ParquetDataSourceId dataSourceId;
private final FileMetadata parquetMetadata;
private final Optional<Long> offset;
private final Optional<Long> length;
private final Optional<DiskRange> diskRange;

public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId, Optional<Long> offset, Optional<Long> length)
public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId, Optional<DiskRange> diskRange)
throws ParquetCorruptionException
{
this.fileMetaData = requireNonNull(fileMetaData, "fileMetaData is null");
this.messageType = readMessageType();
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
this.offset = requireNonNull(offset, "offset is null");
this.length = requireNonNull(length, "length is null");
checkArgument(offset.isEmpty() && length.isEmpty() || (offset.isPresent() && length.isPresent()),
"Both offset and length must be present or absent");
this.parquetMetadata = new FileMetadata(readMessageType(), keyValueMetaData(fileMetaData), fileMetaData.getCreated_by());
this.diskRange = requireNonNull(diskRange, "range is null");
this.parquetMetadata = new FileMetadata(messageType, keyValueMetaData(fileMetaData), fileMetaData.getCreated_by());
}

public FileMetadata getFileMetaData()
Expand All @@ -90,93 +91,141 @@ public FileMetadata getFileMetaData()
@Override
public String toString()
{
return "ParquetMetaData{" + fileMetaData + "}";
return toStringHelper(this)
.add("dataSourceId", dataSourceId)
.add("fileMetaData", fileMetaData)
.add("diskRange", diskRange)
.toString();
}

public List<BlockMetadata> getBlocks(Collection<ColumnDescriptor> columnDescriptors)
private List<RowGroupOffset> getRowGroups()
{
Set<ColumnPath> paths = columnDescriptors.stream()
.map(ColumnDescriptor::getPath)
.map(ColumnPath::get)
.collect(toImmutableSet());
List<RowGroup> rowGroups = fileMetaData.getRow_groups();
if (rowGroups == null) {
return ImmutableList.of();
}
ImmutableList.Builder<RowGroupOffset> builder = ImmutableList.builder();
long lastRowCount = 0;
long fileRowCount = 0;
for (int i = 0; i < rowGroups.size(); i++) {
RowGroup rowGroup = rowGroups.get(i);
fileRowCount += lastRowCount;
lastRowCount = rowGroup.getNum_rows();
if (diskRange.isPresent() && rowGroup.isSetFile_offset()) {
if (rowGroup.file_offset >= diskRange.get().getOffset() + diskRange.get().getLength()) {
break;
}
if (i < rowGroups.size() - 1 && rowGroups.get(i + 1).isSetFile_offset() && diskRange.get().getOffset() >= rowGroups.get(i + 1).file_offset) {
continue;
}
}
builder.add(new RowGroupOffset(rowGroup, fileRowCount));
}

return buildBlocks(paths);
return builder.build();
}

public List<BlockMetadata> getBlocks()
private ColumnChunkMetadata toColumnChunkMetadata(MessageType messageType, ColumnChunk columnChunk, ColumnPath columnPath)
{
return buildBlocks(ImmutableSet.of());
ColumnMetaData metaData = columnChunk.meta_data;
PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType();
ColumnChunkMetadata column = ColumnChunkMetadata.get(
columnPath,
primitiveType,
CompressionCodecName.fromParquet(metaData.codec),
convertEncodingStats(metaData.encoding_stats),
readEncodings(metaData.encodings),
MetadataReader.readStats(Optional.ofNullable(fileMetaData.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
column.setColumnIndexReference(toColumnIndexReference(columnChunk));
column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
column.setBloomFilterOffset(metaData.bloom_filter_offset);

return column;
}

private List<BlockMetadata> buildBlocks(Set<ColumnPath> paths)
public List<RowGroupInfo> getRowGroupInfo()
throws ParquetCorruptionException
{
List<SchemaElement> schema = fileMetaData.getSchema();
MessageType messageType = readParquetSchema(schema);
List<BlockMetadata> blocks = new ArrayList<>();
List<RowGroup> rowGroups = fileMetaData.getRow_groups();
if (rowGroups != null) {
for (int i = 0; i < rowGroups.size(); i++) {
RowGroup rowGroup = rowGroups.get(i);
List<ColumnChunk> columns = rowGroup.getColumns();
checkState(!columns.isEmpty(), "No columns in row group: %s [%s]", rowGroup, dataSourceId);
String filePath = columns.get(0).getFile_path();
return getRowGroupInfo(Optional.empty(), Optional.empty());
}

if (offset.isPresent() && length.isPresent() && rowGroup.isSetFile_offset()) {
if (rowGroup.file_offset >= offset.get() + length.get()) {
break;
}
if (i < rowGroups.size() - 1 && rowGroups.get(i + 1).isSetFile_offset() && offset.get() >= rowGroups.get(i + 1).file_offset) {
continue;
}
public List<RowGroupInfo> getRowGroupInfo(Optional<ParquetDataSource> dataSource, Optional<Map<List<String>, ColumnDescriptor>> descriptorsByPath)
throws ParquetCorruptionException
{
Optional<Set<ColumnPath>> filterColumnPaths = descriptorsByPath.map(dp ->
dp.keySet().stream()
.map(p -> p.toArray(new String[0]))
.map(ColumnPath::get)
.collect(toImmutableSet()));
ImmutableList.Builder<RowGroupInfo> rowGroupInfoBuilder = ImmutableList.builder();
for (RowGroupOffset rowGroupOffset : getRowGroups()) {
List<ColumnChunk> columns = rowGroupOffset.rowGroup.getColumns();
validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroupOffset.rowGroup);
String filePath = columns.getFirst().getFile_path();

ImmutableMap.Builder<ColumnPath, ColumnChunkMetadata> columnMetadataBuilder = ImmutableMap.builderWithExpectedSize(columns.size());

for (ColumnChunk columnChunk : columns) {
checkState((filePath == null && columnChunk.getFile_path() == null)
|| (filePath != null && filePath.equals(columnChunk.getFile_path())),
"all column chunks of the same row group must be in the same file [%s]", dataSourceId);
ColumnPath columnPath = toColumnPath(columnChunk);
if (filterColumnPaths.isEmpty() || filterColumnPaths.get().contains(columnPath)) {
ColumnChunkMetadata chunkMetadata = toColumnChunkMetadata(messageType, columnChunk, columnPath);
columnMetadataBuilder.put(columnPath, chunkMetadata);
}
ImmutableList.Builder<ColumnChunkMetadata> columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size());
for (ColumnChunk columnChunk : columns) {
checkState(
(filePath == null && columnChunk.getFile_path() == null)
|| (filePath != null && filePath.equals(columnChunk.getFile_path())),
"all column chunks of the same row group must be in the same file [%s]", dataSourceId);
ColumnMetaData metaData = columnChunk.meta_data;
String[] path = metaData.path_in_schema.stream()
.map(value -> value.toLowerCase(Locale.ENGLISH))
.toArray(String[]::new);
ColumnPath columnPath = ColumnPath.get(path);
if (!paths.isEmpty() && !paths.contains(columnPath)) {
continue;
}
Map<ColumnPath, ColumnChunkMetadata> columnChunkMetadata = columnMetadataBuilder.buildOrThrow();

if (filterColumnPaths.isPresent() && filterColumnPaths.get().size() != columnChunkMetadata.size()) {
Set<List<String>> existingPaths = columns.stream()
.map(ParquetMetadata::toColumnPath)
.map(p -> ImmutableList.copyOf(p.toArray()))
.collect(toImmutableSet());
for (Map.Entry<List<String>, ColumnDescriptor> entry : descriptorsByPath.get().entrySet()) {
if (!existingPaths.contains(entry.getKey())) {
throw new ParquetCorruptionException(dataSourceId, "Metadata is missing for column: %s", entry.getValue());
}
PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType();
ColumnChunkMetadata column = ColumnChunkMetadata.get(
columnPath,
primitiveType,
CompressionCodecName.fromParquet(metaData.codec),
convertEncodingStats(metaData.encoding_stats),
readEncodings(metaData.encodings),
MetadataReader.readStats(Optional.ofNullable(fileMetaData.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
column.setColumnIndexReference(toColumnIndexReference(columnChunk));
column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
column.setBloomFilterOffset(metaData.bloom_filter_offset);
columnMetadataBuilder.add(column);
}
blocks.add(new BlockMetadata(rowGroup.getNum_rows(), columnMetadataBuilder.build()));
}

PrunedBlockMetadata columnsMetadata = new PrunedBlockMetadata(rowGroupOffset.rowGroup.getNum_rows(), dataSourceId, columnChunkMetadata);
Optional<ColumnIndexStore> indexStore = Optional.empty();
if (filterColumnPaths.isPresent() && dataSource.isPresent()) {
indexStore = Optional.of(new TrinoColumnIndexStore(dataSource.get(), columnsMetadata.getBlockMetadata(), filterColumnPaths.get(), ImmutableSet.of()));
}
rowGroupInfoBuilder.add(new RowGroupInfo(columnsMetadata, rowGroupOffset.offset, indexStore));
}

return blocks;
return rowGroupInfoBuilder.build();
}

private static MessageType readParquetSchema(List<SchemaElement> schema)
private MessageType readMessageType()
throws ParquetCorruptionException
{
List<SchemaElement> schema = fileMetaData.getSchema();
validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty");

Iterator<SchemaElement> schemaIterator = schema.iterator();
SchemaElement rootSchema = schemaIterator.next();
Types.MessageTypeBuilder builder = Types.buildMessage();
readTypeSchema(builder, schemaIterator, rootSchema.getNum_children());
return builder.named(rootSchema.name);
}

private static ColumnPath toColumnPath(ColumnChunk columnChunk)
{
String[] paths = columnChunk.meta_data.path_in_schema.stream()
.map(value -> value.toLowerCase(Locale.ENGLISH))
.toArray(String[]::new);
return ColumnPath.get(paths);
}

private static void readTypeSchema(Types.GroupBuilder<?> builder, Iterator<SchemaElement> schemaIterator, int typeCount)
{
for (int i = 0; i < typeCount; i++) {
Expand Down Expand Up @@ -248,24 +297,15 @@ private static Set<Encoding> readEncodings(List<org.apache.parquet.format.Encodi
return Collections.unmodifiableSet(columnEncodings);
}

private MessageType readMessageType()
throws ParquetCorruptionException
{
List<SchemaElement> schema = fileMetaData.getSchema();
validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty");

Iterator<SchemaElement> schemaIterator = schema.iterator();
SchemaElement rootSchema = schemaIterator.next();
Types.MessageTypeBuilder builder = Types.buildMessage();
readTypeSchema(builder, schemaIterator, rootSchema.getNum_children());
return builder.named(rootSchema.name);
}

private static Map<String, String> keyValueMetaData(FileMetaData fileMetaData)
{
if (fileMetaData.getKey_value_metadata() == null) {
return ImmutableMap.of();
}
return fileMetaData.getKey_value_metadata().stream().collect(toMap(KeyValue::getKey, KeyValue::getValue));
}

private record RowGroupOffset(RowGroup rowGroup, long offset)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,29 @@
package io.trino.parquet.metadata;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSourceId;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.ColumnPath;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Arrays.asList;
import static java.util.function.Function.identity;

public final class PrunedBlockMetadata
{
/**
* Stores only the necessary columns metadata from BlockMetadata and indexes them by path for efficient look-ups
*/
public static PrunedBlockMetadata createPrunedColumnsMetadata(BlockMetadata blockMetadata, ParquetDataSourceId dataSourceId, Map<List<String>, ColumnDescriptor> descriptorsByPath)
throws ParquetCorruptionException
{
Set<List<String>> requiredPaths = descriptorsByPath.keySet();
Map<List<String>, ColumnChunkMetadata> columnMetadataByPath = blockMetadata.columns().stream()
.collect(toImmutableMap(
column -> asList(column.getPath().toArray()),
identity(),
// Same column name may occur more than once when the file is written by case-sensitive tools
(oldValue, _) -> oldValue));
ImmutableMap.Builder<List<String>, ColumnChunkMetadata> columnMetadataByPathBuilder = ImmutableMap.builderWithExpectedSize(requiredPaths.size());
for (Map.Entry<List<String>, ColumnDescriptor> entry : descriptorsByPath.entrySet()) {
List<String> requiredPath = entry.getKey();
ColumnDescriptor columnDescriptor = entry.getValue();
ColumnChunkMetadata columnChunkMetadata = columnMetadataByPath.get(requiredPath);
if (columnChunkMetadata == null) {
throw new ParquetCorruptionException(dataSourceId, "Metadata is missing for column: %s", columnDescriptor);
}
columnMetadataByPathBuilder.put(requiredPath, columnChunkMetadata);
}
return new PrunedBlockMetadata(blockMetadata.rowCount(), dataSourceId, columnMetadataByPathBuilder.buildOrThrow());
}

private final long rowCount;
private final ParquetDataSourceId dataSourceId;
private final Map<List<String>, ColumnChunkMetadata> columnMetadataByPath;
private final Map<ColumnPath, ColumnChunkMetadata> columnMetadataByPath;
private final BlockMetadata blockMetadata;

private PrunedBlockMetadata(long rowCount, ParquetDataSourceId dataSourceId, Map<List<String>, ColumnChunkMetadata> columnMetadataByPath)
public PrunedBlockMetadata(long rowCount, ParquetDataSourceId dataSourceId, Map<ColumnPath, ColumnChunkMetadata> columnMetadataByPath)
{
this.rowCount = rowCount;
this.dataSourceId = dataSourceId;
this.columnMetadataByPath = columnMetadataByPath;
this.blockMetadata = new BlockMetadata(rowCount, ImmutableList.copyOf(columnMetadataByPath.values()));
}

public long getRowCount()
Expand All @@ -77,10 +49,15 @@ public List<ColumnChunkMetadata> getColumns()
return ImmutableList.copyOf(columnMetadataByPath.values());
}

public BlockMetadata getBlockMetadata()
{
return blockMetadata;
}

public ColumnChunkMetadata getColumnChunkMetaData(ColumnDescriptor columnDescriptor)
throws ParquetCorruptionException
{
ColumnChunkMetadata columnChunkMetadata = columnMetadataByPath.get(asList(columnDescriptor.getPath()));
ColumnChunkMetadata columnChunkMetadata = columnMetadataByPath.get(ColumnPath.get(columnDescriptor.getPath()));
if (columnChunkMetadata == null) {
throw new ParquetCorruptionException(dataSourceId, "Metadata is missing for column: %s", columnDescriptor);
}
Expand All @@ -93,6 +70,7 @@ public String toString()
return toStringHelper(this)
.add("rowCount", rowCount)
.add("columnMetadataByPath", columnMetadataByPath)
.add("blockMetadata", blockMetadata)
.toString();
}
}
Loading

0 comments on commit 041670a

Please sign in to comment.