Skip to content

Commit

Permalink
Extract fields by names in CheckpointEntryIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Nov 17, 2023
1 parent d6b63af commit 6138c93
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 185 deletions.
25 changes: 25 additions & 0 deletions lib/trino-parquet/src/main/java/io/trino/parquet/Column.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet;

import static java.util.Objects.requireNonNull;

public record Column(String name, Field field)
{
public Column
{
requireNonNull(name, "name is null");
requireNonNull(field, "field is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.log.Logger;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.parquet.ChunkKey;
import io.trino.parquet.Column;
import io.trino.parquet.DiskRange;
import io.trino.parquet.Field;
import io.trino.parquet.GroupField;
Expand Down Expand Up @@ -66,6 +67,7 @@
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.parquet.ParquetValidationUtils.validateParquet;
import static io.trino.parquet.ParquetWriteValidation.StatisticsValidation;
Expand Down Expand Up @@ -94,7 +96,7 @@ public class ParquetReader
private final Optional<String> fileCreatedBy;
private final List<BlockMetaData> blocks;
private final List<Long> firstRowsOfBlocks;
private final List<Field> columnFields;
private final List<Column> columnFields;
private final List<PrimitiveField> primitiveFields;
private final ParquetDataSource dataSource;
private final ColumnReaderFactory columnReaderFactory;
Expand Down Expand Up @@ -133,7 +135,7 @@ public class ParquetReader

public ParquetReader(
Optional<String> fileCreatedBy,
List<Field> columnFields,
List<Column> columnFields,
List<BlockMetaData> blocks,
List<Long> firstRowsOfBlocks,
ParquetDataSource dataSource,
Expand All @@ -148,7 +150,7 @@ public ParquetReader(

public ParquetReader(
Optional<String> fileCreatedBy,
List<Field> columnFields,
List<Column> columnFields,
List<BlockMetaData> blocks,
List<Long> firstRowsOfBlocks,
ParquetDataSource dataSource,
Expand All @@ -164,7 +166,7 @@ public ParquetReader(
this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null");
requireNonNull(columnFields, "columnFields is null");
this.columnFields = ImmutableList.copyOf(columnFields);
this.primitiveFields = getPrimitiveFields(columnFields);
this.primitiveFields = getPrimitiveFields(columnFields.stream().map(Column::field).collect(toImmutableList()));
this.blocks = requireNonNull(blocks, "blocks is null");
this.firstRowsOfBlocks = requireNonNull(firstRowsOfBlocks, "firstRowsOfBlocks is null");
this.dataSource = requireNonNull(dataSource, "dataSource is null");
Expand Down Expand Up @@ -269,7 +271,7 @@ public Page nextPage()
blockFactory.nextPage();
Block[] blocks = new Block[columnFields.size()];
for (int channel = 0; channel < columnFields.size(); channel++) {
Field field = columnFields.get(channel);
Field field = columnFields.get(channel).field();
blocks[channel] = blockFactory.createBlock(batchSize, () -> readBlock(field));
}
Page page = new Page(batchSize, blocks);
Expand Down Expand Up @@ -493,6 +495,11 @@ private ColumnChunk readPrimitive(PrimitiveField field)
return columnChunk;
}

public List<Column> getColumnFields()
{
return columnFields;
}

public Metrics getMetrics()
{
ImmutableMap.Builder<String, Metric<?>> metrics = ImmutableMap.<String, Metric<?>>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.airlift.slice.OutputStreamSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.parquet.Field;
import io.trino.parquet.Column;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
Expand Down Expand Up @@ -242,12 +242,14 @@ private ParquetReader createParquetReader(ParquetDataSource input, ParquetMetada
{
org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageColumnIO messageColumnIO = getColumnIO(fileMetaData.getSchema(), fileMetaData.getSchema());
ImmutableList.Builder<Field> columnFields = ImmutableList.builder();
ImmutableList.Builder<Column> columnFields = ImmutableList.builder();
for (int i = 0; i < writeValidation.getTypes().size(); i++) {
columnFields.add(constructField(
writeValidation.getTypes().get(i),
lookupColumnByName(messageColumnIO, writeValidation.getColumnNames().get(i)))
.orElseThrow());
columnFields.add(new Column(
messageColumnIO.getName(),
constructField(
writeValidation.getTypes().get(i),
lookupColumnByName(messageColumnIO, writeValidation.getColumnNames().get(i)))
.orElseThrow()));
}
long nextStart = 0;
ImmutableList.Builder<Long> blockStartsBuilder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ public static ParquetReader createParquetReader(
{
org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageColumnIO messageColumnIO = getColumnIO(fileMetaData.getSchema(), fileMetaData.getSchema());
ImmutableList.Builder<Field> columnFields = ImmutableList.builder();
ImmutableList.Builder<Column> columnFields = ImmutableList.builder();
for (int i = 0; i < types.size(); i++) {
columnFields.add(constructField(
types.get(i),
lookupColumnByName(messageColumnIO, columnNames.get(i)))
.orElseThrow());
columnFields.add(new Column(
messageColumnIO.getName(),
constructField(
types.get(i),
lookupColumnByName(messageColumnIO, columnNames.get(i)))
.orElseThrow()));
}
long nextStart = 0;
ImmutableList.Builder<Long> blockStartsBuilder = ImmutableList.builder();
Expand Down
Loading

0 comments on commit 6138c93

Please sign in to comment.