diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetCompressionUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetCompressionUtils.java index d482f627caac..0c8cf74883e1 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetCompressionUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetCompressionUtils.java @@ -40,7 +40,7 @@ public final class ParquetCompressionUtils private ParquetCompressionUtils() {} - public static Slice decompress(CompressionCodec codec, Slice input, int uncompressedSize) + public static Slice decompress(ParquetDataSourceId dataSourceId, CompressionCodec codec, Slice input, int uncompressedSize) throws IOException { requireNonNull(input, "input is null"); @@ -67,7 +67,7 @@ public static Slice decompress(CompressionCodec codec, Slice input, int uncompre // unsupported break; } - throw new ParquetCorruptionException("Codec not supported in Parquet: " + codec); + throw new ParquetCorruptionException(dataSourceId, "Codec not supported in Parquet: %s", codec); } private static Slice decompressSnappy(Slice input, int uncompressedSize) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetCorruptionException.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetCorruptionException.java index 451c9d5c7c57..7cbddf8a07c4 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetCorruptionException.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetCorruptionException.java @@ -22,21 +22,15 @@ public class ParquetCorruptionException extends IOException { - public ParquetCorruptionException(String message) + public ParquetCorruptionException(ParquetDataSourceId dataSourceId, String message) { - super(message); + this(dataSourceId, "%s", message); } @FormatMethod - public ParquetCorruptionException(String messageFormat, Object... args) + public ParquetCorruptionException(Throwable cause, ParquetDataSourceId dataSourceId, String messageFormat, Object... args) { - super(format(messageFormat, args)); - } - - @FormatMethod - public ParquetCorruptionException(Throwable cause, String messageFormat, Object... args) - { - super(format(messageFormat, args), cause); + super(formatMessage(dataSourceId, messageFormat, args), cause); } @FormatMethod diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetValidationUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetValidationUtils.java index 9be1c4477947..5e31a08f704e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetValidationUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetValidationUtils.java @@ -15,21 +15,10 @@ import com.google.errorprone.annotations.FormatMethod; -import static java.lang.String.format; - public final class ParquetValidationUtils { private ParquetValidationUtils() {} - @FormatMethod - public static void validateParquet(boolean condition, String formatString, Object... args) - throws ParquetCorruptionException - { - if (!condition) { - throw new ParquetCorruptionException(format(formatString, args)); - } - } - @FormatMethod public static void validateParquet(boolean condition, ParquetDataSourceId dataSourceId, String formatString, Object... args) throws ParquetCorruptionException diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java index 9deb275b32f1..3b507aa24685 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java @@ -21,6 +21,7 @@ import io.trino.parquet.BloomFilterStore; import io.trino.parquet.DictionaryPage; import io.trino.parquet.ParquetDataSource; +import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetEncoding; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.DecimalType; @@ -254,7 +255,7 @@ private static Optional readDictionaryPage( } // Get the dictionary page header and the dictionary in single read Slice buffer = dataSource.readFully(columnMetaData.getStartingPos(), dictionaryPageSize); - return readPageHeaderWithData(buffer.getInput()).map(data -> decodeDictionaryPage(data, columnMetaData)); + return readPageHeaderWithData(buffer.getInput()).map(data -> decodeDictionaryPage(dataSource.getId(), data, columnMetaData)); } private static Optional getDictionaryPageSize(ColumnIndexStore columnIndexStore, ColumnChunkMetaData columnMetaData) @@ -293,7 +294,7 @@ private static Optional readPageHeaderWithData(SliceInput in inputStream.readSlice(pageHeader.getCompressed_page_size()))); } - private static DictionaryPage decodeDictionaryPage(PageHeaderWithData pageHeaderWithData, ColumnChunkMetaData chunkMetaData) + private static DictionaryPage decodeDictionaryPage(ParquetDataSourceId dataSourceId, PageHeaderWithData pageHeaderWithData, ColumnChunkMetaData chunkMetaData) { PageHeader pageHeader = pageHeaderWithData.pageHeader(); DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); @@ -302,7 +303,7 @@ private static DictionaryPage decodeDictionaryPage(PageHeaderWithData pageHeader Slice compressedData = pageHeaderWithData.compressedData(); try { - return new DictionaryPage(decompress(chunkMetaData.getCodec().getParquetCompressionCodec(), compressedData, pageHeader.getUncompressed_page_size()), dictionarySize, encoding); + return new DictionaryPage(decompress(dataSourceId, chunkMetaData.getCodec().getParquetCompressionCodec(), compressedData, pageHeader.getUncompressed_page_size()), dictionarySize, encoding); } catch (IOException e) { throw new ParquetDecodingException("Could not decode the dictionary for " + chunkMetaData.getPath(), e); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java index 107bdf241b64..7f23d4a0fa7b 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java @@ -595,12 +595,12 @@ private static Domain getDomain(Type type, DictionaryDescriptor dictionaryDescri private static ParquetCorruptionException corruptionException(String column, ParquetDataSourceId id, Statistics statistics, Exception cause) { - return new ParquetCorruptionException(cause, "Corrupted statistics for column \"%s\" in Parquet file \"%s\": [%s]", column, id, statistics); + return new ParquetCorruptionException(cause, id, "Corrupted statistics for column \"%s\": [%s]", column, statistics); } private static ParquetCorruptionException corruptionException(String column, ParquetDataSourceId id, ColumnIndex columnIndex, Exception cause) { - return new ParquetCorruptionException(cause, "Corrupted statistics for column \"%s\" in Parquet file \"%s\". Corrupted column index: [%s]", column, id, columnIndex); + return new ParquetCorruptionException(cause, id, "Corrupted statistics for column \"%s\". Corrupted column index: [%s]", column, columnIndex); } private static boolean isCorruptedColumnIndex( diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java index 9dd28b8614a5..3768645bf199 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java @@ -90,7 +90,7 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional< // 4 bytes: MetadataLength // MAGIC - validateParquet(dataSource.getEstimatedSize() >= MAGIC.length() + POST_SCRIPT_SIZE, "%s is not a valid Parquet File", dataSource.getId()); + validateParquet(dataSource.getEstimatedSize() >= MAGIC.length() + POST_SCRIPT_SIZE, dataSource.getId(), "%s is not a valid Parquet File", dataSource.getId()); // Read the tail of the file long estimatedFileSize = dataSource.getEstimatedSize(); @@ -98,14 +98,14 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional< Slice buffer = dataSource.readTail(toIntExact(expectedReadSize)); Slice magic = buffer.slice(buffer.length() - MAGIC.length(), MAGIC.length()); - validateParquet(MAGIC.equals(magic), "Not valid Parquet file: %s expected magic number: %s got: %s", dataSource.getId(), MAGIC.toStringUtf8(), magic.toStringUtf8()); + validateParquet(MAGIC.equals(magic), dataSource.getId(), "Expected magic number: %s got: %s", MAGIC.toStringUtf8(), magic.toStringUtf8()); int metadataLength = buffer.getInt(buffer.length() - POST_SCRIPT_SIZE); long metadataIndex = estimatedFileSize - POST_SCRIPT_SIZE - metadataLength; validateParquet( metadataIndex >= MAGIC.length() && metadataIndex < estimatedFileSize - POST_SCRIPT_SIZE, - "Corrupted Parquet file: %s metadata index: %s out of range", dataSource.getId(), + "Metadata index: %s out of range", metadataIndex); int completeFooterSize = metadataLength + POST_SCRIPT_SIZE; @@ -116,16 +116,16 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional< InputStream metadataStream = buffer.slice(buffer.length() - completeFooterSize, metadataLength).getInput(); FileMetaData fileMetaData = readFileMetaData(metadataStream); - ParquetMetadata parquetMetadata = createParquetMetadata(fileMetaData, dataSource.getId().toString()); + ParquetMetadata parquetMetadata = createParquetMetadata(fileMetaData, dataSource.getId()); validateFileMetadata(dataSource.getId(), parquetMetadata.getFileMetaData(), parquetWriteValidation); return parquetMetadata; } - public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, String filename) + public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId) throws ParquetCorruptionException { List schema = fileMetaData.getSchema(); - validateParquet(!schema.isEmpty(), "Empty Parquet schema in file: %s", filename); + validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); MessageType messageType = readParquetSchema(schema); List blocks = new ArrayList<>(); @@ -136,12 +136,13 @@ public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, S blockMetaData.setRowCount(rowGroup.getNum_rows()); blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size()); List columns = rowGroup.getColumns(); - validateParquet(!columns.isEmpty(), "No columns in row group: %s", rowGroup); + validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup); String filePath = columns.get(0).getFile_path(); for (ColumnChunk columnChunk : columns) { validateParquet( (filePath == null && columnChunk.getFile_path() == null) || (filePath != null && filePath.equals(columnChunk.getFile_path())), + dataSourceId, "all column chunks of the same row group must be in the same file"); ColumnMetaData metaData = columnChunk.meta_data; String[] path = metaData.path_in_schema.stream() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java index 1c19115cffdd..7dddae62af4d 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java @@ -21,6 +21,7 @@ import io.trino.parquet.DataPageV2; import io.trino.parquet.DictionaryPage; import io.trino.parquet.Page; +import io.trino.parquet.ParquetDataSourceId; import jakarta.annotation.Nullable; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.statistics.Statistics; @@ -36,9 +37,11 @@ import static com.google.common.base.Preconditions.checkState; import static io.trino.parquet.ParquetCompressionUtils.decompress; import static io.trino.parquet.ParquetReaderUtils.isOnlyDictionaryEncodingPages; +import static java.util.Objects.requireNonNull; public final class PageReader { + private final ParquetDataSourceId dataSourceId; private final CompressionCodec codec; private final boolean hasOnlyDictionaryEncodedPages; private final boolean hasNoNulls; @@ -48,6 +51,7 @@ public final class PageReader private int dataPageReadCount; public static PageReader createPageReader( + ParquetDataSourceId dataSourceId, ChunkedInputStream columnChunk, ColumnChunkMetaData metadata, ColumnDescriptor columnDescriptor, @@ -61,21 +65,30 @@ public static PageReader createPageReader( boolean hasNoNulls = columnStatistics != null && columnStatistics.getNumNulls() == 0; boolean hasOnlyDictionaryEncodedPages = isOnlyDictionaryEncodingPages(metadata); ParquetColumnChunkIterator compressedPages = new ParquetColumnChunkIterator( + dataSourceId, fileCreatedBy, columnDescriptor, metadata, columnChunk, offsetIndex); - return new PageReader(metadata.getCodec().getParquetCompressionCodec(), compressedPages, hasOnlyDictionaryEncodedPages, hasNoNulls); + + return new PageReader( + dataSourceId, + metadata.getCodec().getParquetCompressionCodec(), + compressedPages, + hasOnlyDictionaryEncodedPages, + hasNoNulls); } @VisibleForTesting public PageReader( + ParquetDataSourceId dataSourceId, CompressionCodec codec, Iterator compressedPages, boolean hasOnlyDictionaryEncodedPages, boolean hasNoNulls) { + this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null"); this.codec = codec; this.compressedPages = Iterators.peekingIterator(compressedPages); this.hasOnlyDictionaryEncodedPages = hasOnlyDictionaryEncodedPages; @@ -106,7 +119,7 @@ public DataPage readPage() return dataPageV1; } return new DataPageV1( - decompress(codec, dataPageV1.getSlice(), dataPageV1.getUncompressedSize()), + decompress(dataSourceId, codec, dataPageV1.getSlice(), dataPageV1.getUncompressedSize()), dataPageV1.getValueCount(), dataPageV1.getUncompressedSize(), dataPageV1.getFirstRowIndex(), @@ -128,7 +141,7 @@ public DataPage readPage() dataPageV2.getRepetitionLevels(), dataPageV2.getDefinitionLevels(), dataPageV2.getDataEncoding(), - decompress(codec, dataPageV2.getSlice(), uncompressedSize), + decompress(dataSourceId, codec, dataPageV2.getSlice(), uncompressedSize), dataPageV2.getUncompressedSize(), dataPageV2.getFirstRowIndex(), dataPageV2.getStatistics(), @@ -150,7 +163,7 @@ public DictionaryPage readDictionaryPage() try { DictionaryPage compressedDictionaryPage = (DictionaryPage) compressedPages.next(); return new DictionaryPage( - decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()), + decompress(dataSourceId, codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()), compressedDictionaryPage.getDictionarySize(), compressedDictionaryPage.getEncoding()); } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java index 1a14d579bc59..577e5cb602fa 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java @@ -18,6 +18,7 @@ import io.trino.parquet.DictionaryPage; import io.trino.parquet.Page; import io.trino.parquet.ParquetCorruptionException; +import io.trino.parquet.ParquetDataSourceId; import jakarta.annotation.Nullable; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -41,6 +42,7 @@ public final class ParquetColumnChunkIterator implements Iterator { + private final ParquetDataSourceId dataSourceId; private final Optional fileCreatedBy; private final ColumnDescriptor descriptor; private final ColumnChunkMetaData metadata; @@ -51,12 +53,14 @@ public final class ParquetColumnChunkIterator private int dataPageCount; public ParquetColumnChunkIterator( + ParquetDataSourceId dataSourceId, Optional fileCreatedBy, ColumnDescriptor descriptor, ColumnChunkMetaData metadata, ChunkedInputStream input, @Nullable OffsetIndex offsetIndex) { + this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null"); this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null"); this.descriptor = requireNonNull(descriptor, "descriptor is null"); this.metadata = requireNonNull(metadata, "metadata is null"); @@ -83,7 +87,7 @@ public Page next() switch (pageHeader.type) { case DICTIONARY_PAGE: if (dataPageCount != 0) { - throw new ParquetCorruptionException("Column (%s) has a dictionary page after the first position in column chunk", descriptor); + throw new ParquetCorruptionException(dataSourceId, "Column (%s) has a dictionary page after the first position in column chunk", descriptor); } result = readDictionaryPage(pageHeader, pageHeader.getUncompressed_page_size(), pageHeader.getCompressed_page_size()); break; diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java index f02084c451da..e650f7f31f50 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java @@ -394,7 +394,7 @@ private ColumnChunk readStruct(GroupField field) } if (columnChunk == null) { - throw new ParquetCorruptionException("Struct field does not have any children: " + field); + throw new ParquetCorruptionException(dataSource.getId(), "Struct field does not have any children: %s", field); } StructColumnReader.RowBlockPositions structIsNull = StructColumnReader.calculateStructOffsets(field, columnChunk.getDefinitionLevels(), columnChunk.getRepetitionLevels()); @@ -467,7 +467,7 @@ private ColumnChunk readPrimitive(PrimitiveField field) int fieldId = field.getId(); ColumnReader columnReader = columnReaders.get(fieldId); if (!columnReader.hasPageReader()) { - validateParquet(currentBlockMetadata.getRowCount() > 0, "Row group has 0 rows"); + validateParquet(currentBlockMetadata.getRowCount() > 0, dataSource.getId(), "Row group has 0 rows"); ColumnChunkMetaData metadata = getColumnChunkMetaData(currentBlockMetadata, columnDescriptor); FilteredRowRanges rowRanges = blockRowRanges[currentRowGroup]; OffsetIndex offsetIndex = null; @@ -476,7 +476,7 @@ private ColumnChunk readPrimitive(PrimitiveField field) } ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup)); columnReader.setPageReader( - createPageReader(columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy), + createPageReader(dataSource.getId(), columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy), Optional.ofNullable(rowRanges)); } ColumnChunk columnChunk = columnReader.readPrimitive(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/TestTupleDomainParquetPredicate.java b/lib/trino-parquet/src/test/java/io/trino/parquet/TestTupleDomainParquetPredicate.java index bbb409273a64..43bd3959a87a 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/TestTupleDomainParquetPredicate.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/TestTupleDomainParquetPredicate.java @@ -159,7 +159,7 @@ public void testBigint() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, BIGINT, 10, longColumnStats(100L, 10L), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required int64 BigintColumn\" in Parquet file \"testFile\": [min: 100, max: 10, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required int64 BigintColumn\": [min: 100, max: 10, num_nulls: 0] [testFile]"); } @Test @@ -179,7 +179,7 @@ public void testInteger() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, INTEGER, 10, longColumnStats(2147483648L, 10), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required int32 IntegerColumn\" in Parquet file \"testFile\": [min: 2147483648, max: 10, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required int32 IntegerColumn\": [min: 2147483648, max: 10, num_nulls: 0] [testFile]"); } @Test @@ -199,7 +199,7 @@ public void testSmallint() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, SMALLINT, 10, longColumnStats(2147483648L, 10), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required int32 SmallintColumn\" in Parquet file \"testFile\": [min: 2147483648, max: 10, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required int32 SmallintColumn\": [min: 2147483648, max: 10, num_nulls: 0] [testFile]"); } @Test @@ -219,7 +219,7 @@ public void testTinyint() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, TINYINT, 10, longColumnStats(2147483648L, 10), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required int32 TinyintColumn\" in Parquet file \"testFile\": [min: 2147483648, max: 10, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required int32 TinyintColumn\": [min: 2147483648, max: 10, num_nulls: 0] [testFile]"); } @Test @@ -238,7 +238,7 @@ public void testShortDecimal() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, type, 10, longColumnStats(100L, 10L), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required int32 ShortDecimalColumn\" in Parquet file \"testFile\": [min: 100, max: 10, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required int32 ShortDecimalColumn\": [min: 100, max: 10, num_nulls: 0] [testFile]"); } @Test @@ -257,7 +257,7 @@ public void testShortDecimalWithNoScale() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, type, 10, longColumnStats(100L, 10L), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required int32 ShortDecimalColumnWithNoScale\" in Parquet file \"testFile\": [min: 100, max: 10, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required int32 ShortDecimalColumnWithNoScale\": [min: 100, max: 10, num_nulls: 0] [testFile]"); } @Test @@ -281,7 +281,7 @@ public void testLongDecimal() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, type, 10, binaryColumnStats(100L, 10L), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required fixed_len_byte_array(0) LongDecimalColumn\" in Parquet file \"testFile\": [min: 0x64, max: 0x0A, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required fixed_len_byte_array(0) LongDecimalColumn\": [min: 0x64, max: 0x0A, num_nulls: 0] [testFile]"); } @Test @@ -300,7 +300,7 @@ public void testLongDecimalWithNoScale() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, type, 10, binaryColumnStats(100L, 10L), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required fixed_len_byte_array(0) LongDecimalColumnWithNoScale\" in Parquet file \"testFile\": [min: 0x64, max: 0x0A, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required fixed_len_byte_array(0) LongDecimalColumnWithNoScale\": [min: 0x64, max: 0x0A, num_nulls: 0] [testFile]"); } @Test @@ -329,7 +329,7 @@ public void testDouble() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, DOUBLE, 10, doubleColumnStats(42.24, 3.3), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required double DoubleColumn\" in Parquet file \"testFile\": [min: 42.24, max: 3.3, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required double DoubleColumn\": [min: 42.24, max: 3.3, num_nulls: 0] [testFile]"); } @Test @@ -348,7 +348,7 @@ public void testString() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, createUnboundedVarcharType(), 10, stringColumnStats("taco", "apple"), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required binary StringColumn\" in Parquet file \"testFile\": [min: 0x7461636F, max: 0x6170706C65, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required binary StringColumn\": [min: 0x7461636F, max: 0x6170706C65, num_nulls: 0] [testFile]"); } private static BinaryStatistics stringColumnStats(String minimum, String maximum) @@ -391,7 +391,7 @@ public void testFloat() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, REAL, 10, floatColumnStats(maximum, minimum), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required float FloatColumn\" in Parquet file \"testFile\": [min: 40.3, max: 4.3, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required float FloatColumn\": [min: 40.3, max: 4.3, num_nulls: 0] [testFile]"); } @Test @@ -406,7 +406,7 @@ public void testDate() // fail on corrupted statistics assertThatExceptionOfType(ParquetCorruptionException.class) .isThrownBy(() -> getDomain(columnDescriptor, DATE, 10, intColumnStats(200, 100), ID, UTC)) - .withMessage("Corrupted statistics for column \"[] required int32 DateColumn\" in Parquet file \"testFile\": [min: 200, max: 100, num_nulls: 0]"); + .withMessage("Malformed Parquet file. Corrupted statistics for column \"[] required int32 DateColumn\": [min: 200, max: 100, num_nulls: 0] [testFile]"); } @DataProvider diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java index 3fbfa3f18161..7335d3ef19af 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java @@ -17,6 +17,7 @@ import io.airlift.slice.Slices; import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV1; +import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.PrimitiveField; import org.apache.parquet.column.values.ValuesWriter; import org.openjdk.jmh.annotations.Benchmark; @@ -103,7 +104,8 @@ public int read() throws IOException { ColumnReader columnReader = columnReaderFactory.create(field, newSimpleAggregatedMemoryContext()); - columnReader.setPageReader(new PageReader(UNCOMPRESSED, dataPages.iterator(), false, false), Optional.empty()); + PageReader pageReader = new PageReader(new ParquetDataSourceId("test"), UNCOMPRESSED, dataPages.iterator(), false, false); + columnReader.setPageReader(pageReader, Optional.empty()); int rowsRead = 0; while (rowsRead < dataPositions) { int remaining = dataPositions - rowsRead; diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderRowRangesTest.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderRowRangesTest.java index 36f502a9f1b4..22c57a04dc71 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderRowRangesTest.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderRowRangesTest.java @@ -18,6 +18,7 @@ import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV2; import io.trino.parquet.Page; +import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.PrimitiveField; import io.trino.parquet.reader.decoders.ValueDecoder; import io.trino.parquet.reader.decoders.ValueDecoders; @@ -558,6 +559,7 @@ else if (dictionaryEncoding == DictionaryEncoding.MIXED) { inputPages = ImmutableList.builder().add(toTrinoDictionaryPage(encoder.toDictPageAndClose())).addAll(inputPages).build(); } return new PageReader( + new ParquetDataSourceId("test"), UNCOMPRESSED, inputPages.iterator(), dictionaryEncoding == DictionaryEncoding.ALL || (dictionaryEncoding == DictionaryEncoding.MIXED && testingPages.size() == 1), diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderTest.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderTest.java index 0359a113640e..5712562b7a72 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderTest.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderTest.java @@ -23,6 +23,7 @@ import io.trino.parquet.DataPageV2; import io.trino.parquet.DictionaryPage; import io.trino.parquet.Page; +import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetEncoding; import io.trino.parquet.PrimitiveField; import io.trino.parquet.reader.TestingColumnReader.ColumnReaderFormat; @@ -686,6 +687,7 @@ protected static PageReader getPageReaderMock(List dataPages, @Nullabl pagesBuilder.add(dictionaryPage); } return new PageReader( + new ParquetDataSourceId("test"), UNCOMPRESSED, pagesBuilder.addAll(dataPages).build().iterator(), dataPages.stream() diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java index 5939dd0253d1..2e2b9fe08d74 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java @@ -17,6 +17,7 @@ import io.airlift.slice.Slices; import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV2; +import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.PrimitiveField; import io.trino.plugin.base.type.DecodedTimestamp; import io.trino.spi.block.Block; @@ -108,9 +109,8 @@ public void testVariousTimestamps(TimestampType type, BiFunction chunkReader = dataSource.planRead(ImmutableListMultimap.of(0, range), newSimpleAggregatedMemoryContext()); PageReader pageReader = PageReader.createPageReader( + new ParquetDataSourceId("test"), chunkReader.get(0), chunkMetaData, new ColumnDescriptor(new String[] {"columna"}, new PrimitiveType(REQUIRED, INT32, "columna"), 0, 0), @@ -265,7 +267,11 @@ public void testDictionaryPageOffset() assertThat(pageHeader.getType()).isEqualTo(PageType.DICTIONARY_PAGE); assertThat(pageHeader.getDictionary_page_header().getNum_values()).isEqualTo(100); Slice compressedData = inputStream.readSlice(pageHeader.getCompressed_page_size()); - Slice uncompressedData = decompress(chunkMetaData.getCodec().getParquetCompressionCodec(), compressedData, pageHeader.getUncompressed_page_size()); + Slice uncompressedData = decompress( + new ParquetDataSourceId("test"), + chunkMetaData.getCodec().getParquetCompressionCodec(), + compressedData, + pageHeader.getUncompressed_page_size()); int[] ids = new int[100]; uncompressedData.getInts(0, ids, 0, 100); for (int i = 0; i < 100; i++) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index 335d28ed217c..b73f58111914 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.reader.MetadataReader; import io.trino.plugin.deltalake.DataFileInfo.DataFileType; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics; @@ -196,7 +197,7 @@ public DataFileInfo getDataFileInfo() private static DeltaLakeJsonFileStatistics readStatistics(FileMetaData fileMetaData, Location path, Map typeForColumn, long rowCount) throws IOException { - ParquetMetadata parquetMetadata = MetadataReader.createParquetMetadata(fileMetaData, path.fileName()); + ParquetMetadata parquetMetadata = MetadataReader.createParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString())); ImmutableMultimap.Builder metadataForColumn = ImmutableMultimap.builder(); for (BlockMetaData blockMetaData : parquetMetadata.getBlocks()) {