From 6138c93b4be4c7a2e37bdb01a4f03e1271eac21e Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 14 Nov 2023 13:30:00 +0900 Subject: [PATCH] Extract fields by names in CheckpointEntryIterator --- .../main/java/io/trino/parquet/Column.java | 25 ++ .../trino/parquet/reader/ParquetReader.java | 17 +- .../trino/parquet/writer/ParquetWriter.java | 14 +- .../io/trino/parquet/ParquetTestUtils.java | 12 +- .../checkpoint/CheckpointEntryIterator.java | 266 +++++++----------- .../checkpoint/CheckpointFieldReader.java | 172 +++++++++++ .../hive/parquet/ParquetPageSource.java | 6 + .../parquet/ParquetPageSourceFactory.java | 7 +- .../iceberg/IcebergPageSourceProvider.java | 5 +- 9 files changed, 339 insertions(+), 185 deletions(-) create mode 100644 lib/trino-parquet/src/main/java/io/trino/parquet/Column.java create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointFieldReader.java diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/Column.java b/lib/trino-parquet/src/main/java/io/trino/parquet/Column.java new file mode 100644 index 000000000000..a6c703cafb90 --- /dev/null +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/Column.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet; + +import static java.util.Objects.requireNonNull; + +public record Column(String name, Field field) +{ + public Column + { + requireNonNull(name, "name is null"); + requireNonNull(field, "field is null"); + } +} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index e650f7f31f50..914942f5c77b 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -21,6 +21,7 @@ import io.airlift.log.Logger; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.parquet.ChunkKey; +import io.trino.parquet.Column; import io.trino.parquet.DiskRange; import io.trino.parquet.Field; import io.trino.parquet.GroupField; @@ -66,6 +67,7 @@ import java.util.function.Function; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.parquet.ParquetValidationUtils.validateParquet; import static io.trino.parquet.ParquetWriteValidation.StatisticsValidation; @@ -94,7 +96,7 @@ public class ParquetReader private final Optional fileCreatedBy; private final List blocks; private final List firstRowsOfBlocks; - private final List columnFields; + private final List columnFields; private final List primitiveFields; private final ParquetDataSource dataSource; private final ColumnReaderFactory columnReaderFactory; @@ -133,7 +135,7 @@ public class ParquetReader public ParquetReader( Optional fileCreatedBy, - List columnFields, + List columnFields, List blocks, List firstRowsOfBlocks, ParquetDataSource dataSource, @@ -148,7 +150,7 @@ public ParquetReader( public ParquetReader( Optional fileCreatedBy, - List columnFields, + List columnFields, List blocks, List firstRowsOfBlocks, ParquetDataSource dataSource, @@ -164,7 +166,7 @@ public ParquetReader( this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null"); requireNonNull(columnFields, "columnFields is null"); this.columnFields = ImmutableList.copyOf(columnFields); - this.primitiveFields = getPrimitiveFields(columnFields); + this.primitiveFields = getPrimitiveFields(columnFields.stream().map(Column::field).collect(toImmutableList())); this.blocks = requireNonNull(blocks, "blocks is null"); this.firstRowsOfBlocks = requireNonNull(firstRowsOfBlocks, "firstRowsOfBlocks is null"); this.dataSource = requireNonNull(dataSource, "dataSource is null"); @@ -269,7 +271,7 @@ public Page nextPage() blockFactory.nextPage(); Block[] blocks = new Block[columnFields.size()]; for (int channel = 0; channel < columnFields.size(); channel++) { - Field field = columnFields.get(channel); + Field field = columnFields.get(channel).field(); blocks[channel] = blockFactory.createBlock(batchSize, () -> readBlock(field)); } Page page = new Page(batchSize, blocks); @@ -493,6 +495,11 @@ private ColumnChunk readPrimitive(PrimitiveField field) return columnChunk; } + public List getColumnFields() + { + return columnFields; + } + public Metrics getMetrics() { ImmutableMap.Builder> metrics = ImmutableMap.>builder() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index 1f7acbac194d..c9ddb8e7f94b 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -19,7 +19,7 @@ import io.airlift.slice.OutputStreamSliceOutput; import io.airlift.slice.Slice; import io.airlift.slice.Slices; -import io.trino.parquet.Field; +import io.trino.parquet.Column; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; @@ -242,12 +242,14 @@ private ParquetReader createParquetReader(ParquetDataSource input, ParquetMetada { org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageColumnIO messageColumnIO = getColumnIO(fileMetaData.getSchema(), fileMetaData.getSchema()); - ImmutableList.Builder columnFields = ImmutableList.builder(); + ImmutableList.Builder columnFields = ImmutableList.builder(); for (int i = 0; i < writeValidation.getTypes().size(); i++) { - columnFields.add(constructField( - writeValidation.getTypes().get(i), - lookupColumnByName(messageColumnIO, writeValidation.getColumnNames().get(i))) - .orElseThrow()); + columnFields.add(new Column( + messageColumnIO.getName(), + constructField( + writeValidation.getTypes().get(i), + lookupColumnByName(messageColumnIO, writeValidation.getColumnNames().get(i))) + .orElseThrow())); } long nextStart = 0; ImmutableList.Builder blockStartsBuilder = ImmutableList.builder(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java index 6f6b5e70b8ca..cceb67158af1 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java @@ -104,12 +104,14 @@ public static ParquetReader createParquetReader( { org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageColumnIO messageColumnIO = getColumnIO(fileMetaData.getSchema(), fileMetaData.getSchema()); - ImmutableList.Builder columnFields = ImmutableList.builder(); + ImmutableList.Builder columnFields = ImmutableList.builder(); for (int i = 0; i < types.size(); i++) { - columnFields.add(constructField( - types.get(i), - lookupColumnByName(messageColumnIO, columnNames.get(i))) - .orElseThrow()); + columnFields.add(new Column( + messageColumnIO.getName(), + constructField( + types.get(i), + lookupColumnByName(messageColumnIO, columnNames.get(i))) + .orElseThrow())); } long nextStart = 0; ImmutableList.Builder blockStartsBuilder = ImmutableList.builder(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index c2dca1f4e295..ebf150012df4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -17,10 +17,10 @@ import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.math.LongMath; import io.airlift.log.Logger; import io.trino.filesystem.TrinoInputFile; +import io.trino.parquet.Column; import io.trino.parquet.ParquetReaderOptions; import io.trino.plugin.deltalake.DeltaHiveTypeTranslator; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; @@ -40,20 +40,14 @@ import io.trino.plugin.hive.HiveColumnProjectionInfo; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.ReaderPageSource; +import io.trino.plugin.hive.parquet.ParquetPageSource; import io.trino.plugin.hive.parquet.ParquetPageSourceFactory; import io.trino.spi.Page; import io.trino.spi.TrinoException; -import io.trino.spi.block.ArrayBlock; import io.trino.spi.block.Block; -import io.trino.spi.block.ByteArrayBlock; -import io.trino.spi.block.IntArrayBlock; import io.trino.spi.block.LongArrayBlock; -import io.trino.spi.block.MapBlock; -import io.trino.spi.block.RowBlock; import io.trino.spi.block.SqlRow; import io.trino.spi.block.ValueBlock; -import io.trino.spi.block.VariableWidthBlock; -import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; @@ -67,8 +61,6 @@ import jakarta.annotation.Nullable; import org.joda.time.DateTimeZone; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayDeque; import java.util.List; import java.util.Map; @@ -81,6 +73,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; @@ -136,7 +130,8 @@ public String getColumnName() private final String checkpointPath; private final ConnectorSession session; - private final ConnectorPageSource pageSource; + private final ParquetPageSource pageSource; + private final Map parquetFields; private final MapType stringMap; private final ArrayType stringList; private final Queue nextEntries; @@ -215,8 +210,11 @@ public CheckpointEntryIterator( verify(pageSource.getReaderColumns().isEmpty(), "All columns expected to be base columns"); - this.pageSource = pageSource.get(); + this.pageSource = (ParquetPageSource) pageSource.get(); this.nextEntries = new ArrayDeque<>(); + // The size between parquetFields and extractors may not match when the requested field doesn't exist in Parquet file + this.parquetFields = this.pageSource.getColumnFields().stream() + .collect(toImmutableMap(Column::name, e -> e.field().getType())); this.extractors = fields.stream() .map(field -> requireNonNull(extractors.get(field), "No extractor found for field " + field)) .collect(toImmutableList()); @@ -309,44 +307,52 @@ private DeltaLakeTransactionLogEntry buildCommitInfoEntry(ConnectorSession sessi if (block.isNull(pagePosition)) { return null; } + RowType type = (RowType) parquetFields.get("commitinfo"); int commitInfoFields = 12; int jobFields = 5; int notebookFields = 1; SqlRow commitInfoRow = block.getObject(pagePosition, SqlRow.class); + CheckpointFieldReader commitInfo = new CheckpointFieldReader(session, commitInfoRow, type); log.debug("Block %s has %s fields", block, commitInfoRow.getFieldCount()); if (commitInfoRow.getFieldCount() != commitInfoFields) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("Expected block %s to have %d children, but found %s", block, commitInfoFields, commitInfoRow.getFieldCount())); } - SqlRow jobRow = getRowField(commitInfoRow, 9); + SqlRow jobRow = commitInfo.getRow("job"); if (jobRow.getFieldCount() != jobFields) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("Expected block %s to have %d children, but found %s", jobRow, jobFields, jobRow.getFieldCount())); } - SqlRow notebookRow = getRowField(commitInfoRow, 7); + RowType.Field jobField = type.getFields().stream().filter(field -> field.getName().orElseThrow().equals("job")).collect(onlyElement()); + CheckpointFieldReader job = new CheckpointFieldReader(session, jobRow, (RowType) jobField.getType()); + + SqlRow notebookRow = commitInfo.getRow("notebook"); if (notebookRow.getFieldCount() != notebookFields) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("Expected block %s to have %d children, but found %s", notebookRow, notebookFields, notebookRow.getFieldCount())); } + RowType.Field notebookField = type.getFields().stream().filter(field -> field.getName().orElseThrow().equals("notebook")).collect(onlyElement()); + CheckpointFieldReader notebook = new CheckpointFieldReader(session, notebookRow, (RowType) notebookField.getType()); + CommitInfoEntry result = new CommitInfoEntry( - getLongField(commitInfoRow, 0), - getLongField(commitInfoRow, 1), - getStringField(commitInfoRow, 2), - getStringField(commitInfoRow, 3), - getStringField(commitInfoRow, 4), - getMapField(commitInfoRow, 5), + commitInfo.getLong("version"), + commitInfo.getLong("timestamp"), + commitInfo.getString("userId"), + commitInfo.getString("userName"), + commitInfo.getString("operation"), + commitInfo.getMap(stringMap, "operationParameters"), new CommitInfoEntry.Job( - getStringField(jobRow, 0), - getStringField(jobRow, 1), - getStringField(jobRow, 2), - getStringField(jobRow, 3), - getStringField(jobRow, 4)), + job.getString("jobId"), + job.getString("jobName"), + job.getString("runId"), + job.getString("jobOwnerId"), + job.getString("triggerType")), new CommitInfoEntry.Notebook( - getStringField(notebookRow, 0)), - getStringField(commitInfoRow, 8), - getLongField(commitInfoRow, 9), - getStringField(commitInfoRow, 10), - Optional.of(getBooleanField(commitInfoRow, 11))); + notebook.getString("notebookId")), + commitInfo.getString("clusterId"), + commitInfo.getInt("readVersion"), + commitInfo.getString("isolationLevel"), + Optional.of(commitInfo.getBoolean("isBlindAppend"))); log.debug("Result: %s", result); return DeltaLakeTransactionLogEntry.commitInfoEntry(result); } @@ -357,6 +363,7 @@ private DeltaLakeTransactionLogEntry buildProtocolEntry(ConnectorSession session if (block.isNull(pagePosition)) { return null; } + RowType type = (RowType) parquetFields.get("protocol"); int minProtocolFields = 2; int maxProtocolFields = 4; SqlRow protocolEntryRow = block.getObject(pagePosition, SqlRow.class); @@ -366,14 +373,13 @@ private DeltaLakeTransactionLogEntry buildProtocolEntry(ConnectorSession session throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("Expected block %s to have between %d and %d children, but found %s", block, minProtocolFields, maxProtocolFields, fieldCount)); } - Optional> readerFeatures = getOptionalSetField(protocolEntryRow, 2); - // The last entry should be writer feature when protocol entry size is 3 https://github.com/delta-io/delta/blob/master/PROTOCOL.md#disabled-features - Optional> writerFeatures = fieldCount != 4 ? readerFeatures : getOptionalSetField(protocolEntryRow, 3); + + CheckpointFieldReader protocol = new CheckpointFieldReader(session, protocolEntryRow, type); ProtocolEntry result = new ProtocolEntry( - getIntField(protocolEntryRow, 0), - getIntField(protocolEntryRow, 1), - readerFeatures, - writerFeatures); + protocol.getInt("minReaderVersion"), + protocol.getInt("minWriterVersion"), + protocol.getOptionalSet(stringList, "readerFeatures"), + protocol.getOptionalSet(stringList, "writerFeatures")); log.debug("Result: %s", result); return DeltaLakeTransactionLogEntry.protocolEntry(result); } @@ -384,30 +390,35 @@ private DeltaLakeTransactionLogEntry buildMetadataEntry(ConnectorSession session if (block.isNull(pagePosition)) { return null; } + RowType type = (RowType) parquetFields.get("metadata"); int metadataFields = 8; int formatFields = 2; SqlRow metadataEntryRow = block.getObject(pagePosition, SqlRow.class); + CheckpointFieldReader metadata = new CheckpointFieldReader(session, metadataEntryRow, type); log.debug("Block %s has %s fields", block, metadataEntryRow.getFieldCount()); if (metadataEntryRow.getFieldCount() != metadataFields) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("Expected block %s to have %d children, but found %s", block, metadataFields, metadataEntryRow.getFieldCount())); } - SqlRow formatRow = getRowField(metadataEntryRow, 3); + SqlRow formatRow = metadata.getRow("format"); if (formatRow.getFieldCount() != formatFields) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("Expected block %s to have %d children, but found %s", formatRow, formatFields, formatRow.getFieldCount())); } + + RowType.Field formatField = type.getFields().stream().filter(field -> field.getName().orElseThrow().equals("format")).collect(onlyElement()); + CheckpointFieldReader format = new CheckpointFieldReader(session, formatRow, (RowType) formatField.getType()); MetadataEntry result = new MetadataEntry( - getStringField(metadataEntryRow, 0), - getStringField(metadataEntryRow, 1), - getStringField(metadataEntryRow, 2), + metadata.getString("id"), + metadata.getString("name"), + metadata.getString("description"), new MetadataEntry.Format( - getStringField(formatRow, 0), - getMapField(formatRow, 1)), - getStringField(metadataEntryRow, 4), - getListField(metadataEntryRow, 5), - getMapField(metadataEntryRow, 6), - getLongField(metadataEntryRow, 7)); + format.getString("provider"), + format.getMap(stringMap, "options")), + metadata.getString("schemaString"), + metadata.getList(stringList, "partitionColumns"), + metadata.getMap(stringMap, "configuration"), + metadata.getLong("createdTime")); log.debug("Result: %s", result); return DeltaLakeTransactionLogEntry.metadataEntry(result); } @@ -418,6 +429,7 @@ private DeltaLakeTransactionLogEntry buildRemoveEntry(ConnectorSession session, if (block.isNull(pagePosition)) { return null; } + RowType type = (RowType) parquetFields.get("remove"); int removeFields = 3; SqlRow removeEntryRow = block.getObject(pagePosition, SqlRow.class); log.debug("Block %s has %s fields", block, removeEntryRow.getFieldCount()); @@ -425,10 +437,11 @@ private DeltaLakeTransactionLogEntry buildRemoveEntry(ConnectorSession session, throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("Expected block %s to have %d children, but found %s", block, removeFields, removeEntryRow.getFieldCount())); } + CheckpointFieldReader remove = new CheckpointFieldReader(session, removeEntryRow, type); RemoveFileEntry result = new RemoveFileEntry( - getStringField(removeEntryRow, 0), - getLongField(removeEntryRow, 1), - getBooleanField(removeEntryRow, 2)); + remove.getString("path"), + remove.getLong("deletionTimestamp"), + remove.getBoolean("dataChange")); log.debug("Result: %s", result); return DeltaLakeTransactionLogEntry.removeFileEntry(result); } @@ -439,36 +452,37 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo if (block.isNull(pagePosition)) { return null; } + RowType type = (RowType) parquetFields.get("add"); boolean deletionVectorsEnabled = isDeletionVectorEnabled(metadataEntry, protocolEntry); SqlRow addEntryRow = block.getObject(pagePosition, SqlRow.class); log.debug("Block %s has %s fields", block, addEntryRow.getFieldCount()); + CheckpointFieldReader add = new CheckpointFieldReader(session, addEntryRow, type); - String path = getStringField(addEntryRow, 0); - Map partitionValues = getMapField(addEntryRow, 1); - long size = getLongField(addEntryRow, 2); - long modificationTime = getLongField(addEntryRow, 3); - boolean dataChange = getBooleanField(addEntryRow, 4); + String path = add.getString("path"); + Map partitionValues = add.getMap(stringMap, "partitionValues"); + long size = add.getLong("size"); + long modificationTime = add.getLong("modificationTime"); + boolean dataChange = add.getBoolean("dataChange"); Optional deletionVector = Optional.empty(); - int statsFieldIndex; if (deletionVectorsEnabled) { - deletionVector = Optional.ofNullable(getRowField(addEntryRow, 5)).map(CheckpointEntryIterator::parseDeletionVectorFromParquet); - statsFieldIndex = 6; - } - else { - statsFieldIndex = 5; + deletionVector = Optional.ofNullable(add.getRow("deletionVector")) + .map(row -> { + RowType.Field deletionVectorField = type.getFields().stream().filter(field -> field.getName().orElseThrow().equals("deletionVector")).collect(onlyElement()); + return parseDeletionVectorFromParquet(session, row, (RowType) deletionVectorField.getType()); + }); } - boolean partitionValuesParsedExists = addEntryRow.getUnderlyingFieldBlock(statsFieldIndex + 1) instanceof RowBlock && // partitionValues_parsed - addEntryRow.getUnderlyingFieldBlock(statsFieldIndex + 2) instanceof RowBlock; // stats_parsed - int parsedStatsIndex = partitionValuesParsedExists ? statsFieldIndex + 1 : statsFieldIndex; - Optional parsedStats = Optional.ofNullable(getRowField(addEntryRow, parsedStatsIndex + 1)).map(this::parseStatisticsFromParquet); + Optional parsedStats = Optional.ofNullable(add.getRow("stats_parsed")).map(row -> { + RowType.Field parsedStatsField = type.getFields().stream().filter(field -> field.getName().orElseThrow().equals("stats_parsed")).collect(onlyElement()); + return parseStatisticsFromParquet(session, row, (RowType) parsedStatsField.getType()); + }); Optional stats = Optional.empty(); if (parsedStats.isEmpty()) { - stats = Optional.ofNullable(getStringField(addEntryRow, statsFieldIndex)); + stats = Optional.ofNullable(add.getString("stats")); } - Map tags = getMapField(addEntryRow, parsedStatsIndex + 2); + Map tags = add.getMap(stringMap, "tags"); AddFileEntry result = new AddFileEntry( path, partitionValues, @@ -484,32 +498,34 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo return DeltaLakeTransactionLogEntry.addFileEntry(result); } - private static DeletionVectorEntry parseDeletionVectorFromParquet(SqlRow row) + private DeletionVectorEntry parseDeletionVectorFromParquet(ConnectorSession session, SqlRow row, RowType type) { checkArgument(row.getFieldCount() == 5, "Deletion vector entry must have 5 fields"); - String storageType = getStringField(row, 0); - String pathOrInlineDv = getStringField(row, 1); - OptionalInt offset = getOptionalIntField(row, 2); - int sizeInBytes = getIntField(row, 3); - long cardinality = getLongField(row, 4); + CheckpointFieldReader deletionVector = new CheckpointFieldReader(session, row, type); + String storageType = deletionVector.getString("storageType"); + String pathOrInlineDv = deletionVector.getString("pathOrInlineDv"); + OptionalInt offset = deletionVector.getOptionalInt("offset"); + int sizeInBytes = deletionVector.getInt("sizeInBytes"); + long cardinality = deletionVector.getLong("cardinality"); return new DeletionVectorEntry(storageType, pathOrInlineDv, offset, sizeInBytes, cardinality); } - private DeltaLakeParquetFileStatistics parseStatisticsFromParquet(SqlRow statsRow) + private DeltaLakeParquetFileStatistics parseStatisticsFromParquet(ConnectorSession session, SqlRow statsRow, RowType type) { - long numRecords = getLongField(statsRow, 0); + CheckpointFieldReader stats = new CheckpointFieldReader(session, statsRow, type); + long numRecords = stats.getLong("numRecords"); Optional> minValues = Optional.empty(); Optional> maxValues = Optional.empty(); Optional> nullCount; if (!columnsWithMinMaxStats.isEmpty()) { - minValues = Optional.of(parseMinMax(getRowField(statsRow, 1), columnsWithMinMaxStats)); - maxValues = Optional.of(parseMinMax(getRowField(statsRow, 2), columnsWithMinMaxStats)); - nullCount = Optional.of(parseNullCount(getRowField(statsRow, 3), schema)); + minValues = Optional.of(parseMinMax(stats.getRow("minValues"), columnsWithMinMaxStats)); + maxValues = Optional.of(parseMinMax(stats.getRow("maxValues"), columnsWithMinMaxStats)); + nullCount = Optional.of(parseNullCount(stats.getRow("nullCount"), schema)); } else { - nullCount = Optional.of(parseNullCount(getRowField(statsRow, 1), schema)); + nullCount = Optional.of(parseNullCount(stats.getRow("nullCount"), schema)); } return new DeltaLakeParquetFileStatistics( @@ -592,6 +608,7 @@ private DeltaLakeTransactionLogEntry buildTxnEntry(ConnectorSession session, Blo if (block.isNull(pagePosition)) { return null; } + RowType type = (RowType) parquetFields.get("txn"); int txnFields = 3; SqlRow txnEntryRow = block.getObject(pagePosition, SqlRow.class); log.debug("Block %s has %s fields", block, txnEntryRow.getFieldCount()); @@ -599,90 +616,21 @@ private DeltaLakeTransactionLogEntry buildTxnEntry(ConnectorSession session, Blo throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("Expected block %s to have %d children, but found %s", block, txnFields, txnEntryRow.getFieldCount())); } + CheckpointFieldReader txn = new CheckpointFieldReader(session, txnEntryRow, type); TransactionEntry result = new TransactionEntry( - getStringField(txnEntryRow, 0), - getLongField(txnEntryRow, 1), - getLongField(txnEntryRow, 2)); + txn.getString("appId"), + txn.getLong("version"), + txn.getLong("lastUpdated")); log.debug("Result: %s", result); return DeltaLakeTransactionLogEntry.transactionEntry(result); } - @Nullable - private static SqlRow getRowField(SqlRow row, int field) - { - RowBlock valueBlock = (RowBlock) row.getUnderlyingFieldBlock(field); - int index = row.getUnderlyingFieldPosition(field); - if (valueBlock.isNull(index)) { - return null; - } - return valueBlock.getRow(index); - } - - @Nullable - private static String getStringField(SqlRow row, int field) - { - VariableWidthBlock valueBlock = (VariableWidthBlock) row.getUnderlyingFieldBlock(field); - int index = row.getUnderlyingFieldPosition(field); - if (valueBlock.isNull(index)) { - return null; - } - return valueBlock.getSlice(index).toStringUtf8(); - } - private static long getLongField(SqlRow row, int field) { LongArrayBlock valueBlock = (LongArrayBlock) row.getUnderlyingFieldBlock(field); return valueBlock.getLong(row.getUnderlyingFieldPosition(field)); } - private static int getIntField(SqlRow row, int field) - { - IntArrayBlock valueBlock = (IntArrayBlock) row.getUnderlyingFieldBlock(field); - return valueBlock.getInt(row.getUnderlyingFieldPosition(field)); - } - - private static OptionalInt getOptionalIntField(SqlRow row, int field) - { - IntArrayBlock valueBlock = (IntArrayBlock) row.getUnderlyingFieldBlock(field); - int index = row.getUnderlyingFieldPosition(field); - if (valueBlock.isNull(index)) { - return OptionalInt.empty(); - } - return OptionalInt.of(valueBlock.getInt(index)); - } - - private static boolean getBooleanField(SqlRow row, int field) - { - ByteArrayBlock valueBlock = (ByteArrayBlock) row.getUnderlyingFieldBlock(field); - return valueBlock.getByte(row.getUnderlyingFieldPosition(field)) != 0; - } - - @SuppressWarnings("unchecked") - private Map getMapField(SqlRow row, int field) - { - MapBlock valueBlock = (MapBlock) row.getUnderlyingFieldBlock(field); - return (Map) stringMap.getObjectValue(session, valueBlock, row.getUnderlyingFieldPosition(field)); - } - - @SuppressWarnings("unchecked") - private List getListField(SqlRow row, int field) - { - ArrayBlock valueBlock = (ArrayBlock) row.getUnderlyingFieldBlock(field); - return (List) stringList.getObjectValue(session, valueBlock, row.getUnderlyingFieldPosition(field)); - } - - @SuppressWarnings("unchecked") - private Optional> getOptionalSetField(SqlRow row, int field) - { - ArrayBlock valueBlock = (ArrayBlock) row.getUnderlyingFieldBlock(field); - int index = row.getUnderlyingFieldPosition(field); - if (valueBlock.isNull(index)) { - return Optional.empty(); - } - List list = (List) stringList.getObjectValue(session, valueBlock, index); - return Optional.of(ImmutableSet.copyOf(list)); - } - @Override protected DeltaLakeTransactionLogEntry computeNext() { @@ -692,24 +640,14 @@ protected DeltaLakeTransactionLogEntry computeNext() if (!nextEntries.isEmpty()) { return nextEntries.remove(); } - try { - pageSource.close(); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + pageSource.close(); return endOfData(); } private boolean tryAdvancePage() { if (pageSource.isFinished()) { - try { - pageSource.close(); - } - catch (IOException ioe) { - throw new UncheckedIOException(ioe); - } + pageSource.close(); return false; } page = pageSource.getNextPage(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointFieldReader.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointFieldReader.java new file mode 100644 index 000000000000..d306e8edc16a --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointFieldReader.java @@ -0,0 +1,172 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog.checkpoint; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.spi.block.ArrayBlock; +import io.trino.spi.block.ByteArrayBlock; +import io.trino.spi.block.IntArrayBlock; +import io.trino.spi.block.LongArrayBlock; +import io.trino.spi.block.MapBlock; +import io.trino.spi.block.RowBlock; +import io.trino.spi.block.SqlRow; +import io.trino.spi.block.VariableWidthBlock; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.MapType; +import io.trino.spi.type.RowType; +import jakarta.annotation.Nullable; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +public class CheckpointFieldReader +{ + private final ConnectorSession session; + private final SqlRow row; + private final Map fieldNameToIndex; + + public CheckpointFieldReader(ConnectorSession session, SqlRow row, RowType type) + { + this.session = requireNonNull(session, "session is null"); + this.row = requireNonNull(row, "row is null"); + checkArgument(row.getFieldCount() == type.getFields().size(), "row and type sizes don't match"); + Map fieldNames = new HashMap<>(); + for (int i = 0; i < type.getFields().size(); i++) { + String fieldName = type.getFields().get(i).getName().orElseThrow(); + checkState(!fieldNames.containsKey(fieldName), "Duplicated field '%s' exists in %s", fieldName, type); + fieldNames.put(fieldName, i); + } + this.fieldNameToIndex = ImmutableMap.copyOf(fieldNames); + } + + public boolean getBoolean(String fieldName) + { + int field = requireField(fieldName); + ByteArrayBlock valueBlock = (ByteArrayBlock) row.getUnderlyingFieldBlock(field); + return valueBlock.getByte(row.getUnderlyingFieldPosition(field)) != 0; + } + + public int getInt(String fieldName) + { + int field = requireField(fieldName); + IntArrayBlock valueBlock = (IntArrayBlock) row.getUnderlyingFieldBlock(field); + return valueBlock.getInt(row.getUnderlyingFieldPosition(field)); + } + + public OptionalInt getOptionalInt(String fieldName) + { + OptionalInt index = findField(fieldName); + if (index.isEmpty()) { + return OptionalInt.empty(); + } + + IntArrayBlock valueBlock = (IntArrayBlock) row.getUnderlyingFieldBlock(index.getAsInt()); + int position = row.getUnderlyingFieldPosition(index.getAsInt()); + if (valueBlock.isNull(position)) { + return OptionalInt.empty(); + } + return OptionalInt.of(valueBlock.getInt(position)); + } + + public long getLong(String fieldName) + { + int field = requireField(fieldName); + LongArrayBlock valueBlock = (LongArrayBlock) row.getUnderlyingFieldBlock(field); + return valueBlock.getLong(row.getUnderlyingFieldPosition(field)); + } + + @Nullable + public String getString(String fieldName) + { + int field = requireField(fieldName); + VariableWidthBlock valueBlock = (VariableWidthBlock) row.getUnderlyingFieldBlock(field); + int index = row.getUnderlyingFieldPosition(field); + if (valueBlock.isNull(index)) { + return null; + } + return valueBlock.getSlice(index).toStringUtf8(); + } + + @SuppressWarnings("unchecked") + public List getList(ArrayType stringList, String fieldName) + { + int field = requireField(fieldName); + ArrayBlock valueBlock = (ArrayBlock) row.getUnderlyingFieldBlock(field); + return (List) stringList.getObjectValue(session, valueBlock, row.getUnderlyingFieldPosition(field)); + } + + @SuppressWarnings("unchecked") + public Optional> getOptionalSet(ArrayType stringList, String fieldName) + { + OptionalInt index = findField(fieldName); + if (index.isEmpty()) { + return Optional.empty(); + } + ArrayBlock valueBlock = (ArrayBlock) row.getUnderlyingFieldBlock(index.getAsInt()); + int position = row.getUnderlyingFieldPosition(index.getAsInt()); + if (valueBlock.isNull(position)) { + return Optional.empty(); + } + List list = (List) stringList.getObjectValue(session, valueBlock, position); + return Optional.of(ImmutableSet.copyOf(list)); + } + + @SuppressWarnings("unchecked") + public Map getMap(MapType stringMap, String fieldName) + { + int field = requireField(fieldName); + MapBlock valueBlock = (MapBlock) row.getUnderlyingFieldBlock(field); + return (Map) stringMap.getObjectValue(session, valueBlock, row.getUnderlyingFieldPosition(field)); + } + + @Nullable + public SqlRow getRow(String fieldName) + { + OptionalInt index = findField(fieldName); + if (index.isEmpty()) { + return null; + } + RowBlock valueBlock = (RowBlock) row.getUnderlyingFieldBlock(index.getAsInt()); + int position = row.getUnderlyingFieldPosition(index.getAsInt()); + if (valueBlock.isNull(position)) { + return null; + } + return valueBlock.getRow(position); + } + + private int requireField(String fieldName) + { + return findField(fieldName) + .orElseThrow(() -> new IllegalArgumentException("Field '%s' doesn't exist in %s".formatted(fieldName, fieldNameToIndex.keySet()))); + } + + private OptionalInt findField(String fieldName) + { + Integer index = fieldNameToIndex.get(fieldName); + if (index == null) { + return OptionalInt.empty(); + } + return OptionalInt.of(index); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java index 283eab238afa..c3aac320db62 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive.parquet; import com.google.common.collect.ImmutableList; +import io.trino.parquet.Column; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.reader.ParquetReader; @@ -58,6 +59,11 @@ private ParquetPageSource( this.isColumnAdaptationRequired = isColumnAdaptationRequired(columnAdaptations); } + public List getColumnFields() + { + return parquetReader.getColumnFields(); + } + @Override public long getCompletedBytes() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index 6c2ce5cde12a..f7919c4da1bf 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -23,6 +23,7 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.parquet.BloomFilterStore; +import io.trino.parquet.Column; import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; @@ -473,7 +474,7 @@ public static TupleDomain getParquetTupleDomain( public interface ParquetReaderProvider { - ParquetReader createParquetReader(List fields) + ParquetReader createParquetReader(List fields) throws IOException; } @@ -486,7 +487,7 @@ public static ConnectorPageSource createParquetPageSource( throws IOException { ParquetPageSource.Builder pageSourceBuilder = ParquetPageSource.builder(); - ImmutableList.Builder parquetColumnFieldsBuilder = ImmutableList.builder(); + ImmutableList.Builder parquetColumnFieldsBuilder = ImmutableList.builder(); int sourceChannel = 0; for (HiveColumnHandle column : baseColumns) { if (column == PARQUET_ROW_INDEX_COLUMN) { @@ -505,7 +506,7 @@ public static ConnectorPageSource createParquetPageSource( pageSourceBuilder.addNullColumn(column.getBaseType()); continue; } - parquetColumnFieldsBuilder.add(field.get()); + parquetColumnFieldsBuilder.add(new Column(columnName, field.get())); pageSourceBuilder.addSourceColumn(sourceChannel); sourceChannel++; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 1342adb698af..cd49d7f11366 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -39,6 +39,7 @@ import io.trino.orc.TupleDomainOrcPredicate; import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder; import io.trino.parquet.BloomFilterStore; +import io.trino.parquet.Column; import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; @@ -985,7 +986,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( ParquetPageSource.Builder pageSourceBuilder = ParquetPageSource.builder(); int parquetSourceChannel = 0; - ImmutableList.Builder parquetColumnFieldsBuilder = ImmutableList.builder(); + ImmutableList.Builder parquetColumnFieldsBuilder = ImmutableList.builder(); for (int columnIndex = 0; columnIndex < readBaseColumns.size(); columnIndex++) { IcebergColumnHandle column = readBaseColumns.get(columnIndex); if (column.isIsDeletedColumn()) { @@ -1030,7 +1031,7 @@ else if (column.getId() == TRINO_MERGE_PARTITION_DATA) { pageSourceBuilder.addNullColumn(trinoType); continue; } - parquetColumnFieldsBuilder.add(field.get()); + parquetColumnFieldsBuilder.add(new Column(parquetField.getName(), field.get())); pageSourceBuilder.addSourceColumn(parquetSourceChannel); parquetSourceChannel++; }