Skip to content

Commit

Permalink
Require filename for Parquet corruption exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Nov 7, 2023
1 parent 0a8dca8 commit 7d664e1
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class ParquetCompressionUtils

private ParquetCompressionUtils() {}

public static Slice decompress(CompressionCodec codec, Slice input, int uncompressedSize)
public static Slice decompress(ParquetDataSourceId dataSourceId, CompressionCodec codec, Slice input, int uncompressedSize)
throws IOException
{
requireNonNull(input, "input is null");
Expand All @@ -67,7 +67,7 @@ public static Slice decompress(CompressionCodec codec, Slice input, int uncompre
// unsupported
break;
}
throw new ParquetCorruptionException("Codec not supported in Parquet: " + codec);
throw new ParquetCorruptionException(dataSourceId, "Codec not supported in Parquet: %s", codec);
}

private static Slice decompressSnappy(Slice input, int uncompressedSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,15 @@
public class ParquetCorruptionException
extends IOException
{
public ParquetCorruptionException(String message)
public ParquetCorruptionException(ParquetDataSourceId dataSourceId, String message)
{
super(message);
this(dataSourceId, "%s", message);
}

@FormatMethod
public ParquetCorruptionException(String messageFormat, Object... args)
public ParquetCorruptionException(Throwable cause, ParquetDataSourceId dataSourceId, String messageFormat, Object... args)
{
super(format(messageFormat, args));
}

@FormatMethod
public ParquetCorruptionException(Throwable cause, String messageFormat, Object... args)
{
super(format(messageFormat, args), cause);
super(formatMessage(dataSourceId, messageFormat, args), cause);
}

@FormatMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,10 @@

import com.google.errorprone.annotations.FormatMethod;

import static java.lang.String.format;

public final class ParquetValidationUtils
{
private ParquetValidationUtils() {}

@FormatMethod
public static void validateParquet(boolean condition, String formatString, Object... args)
throws ParquetCorruptionException
{
if (!condition) {
throw new ParquetCorruptionException(format(formatString, args));
}
}

@FormatMethod
public static void validateParquet(boolean condition, ParquetDataSourceId dataSourceId, String formatString, Object... args)
throws ParquetCorruptionException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.parquet.BloomFilterStore;
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetEncoding;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.DecimalType;
Expand Down Expand Up @@ -254,7 +255,7 @@ private static Optional<DictionaryPage> readDictionaryPage(
}
// Get the dictionary page header and the dictionary in single read
Slice buffer = dataSource.readFully(columnMetaData.getStartingPos(), dictionaryPageSize);
return readPageHeaderWithData(buffer.getInput()).map(data -> decodeDictionaryPage(data, columnMetaData));
return readPageHeaderWithData(buffer.getInput()).map(data -> decodeDictionaryPage(dataSource.getId(), data, columnMetaData));
}

private static Optional<Integer> getDictionaryPageSize(ColumnIndexStore columnIndexStore, ColumnChunkMetaData columnMetaData)
Expand Down Expand Up @@ -293,7 +294,7 @@ private static Optional<PageHeaderWithData> readPageHeaderWithData(SliceInput in
inputStream.readSlice(pageHeader.getCompressed_page_size())));
}

private static DictionaryPage decodeDictionaryPage(PageHeaderWithData pageHeaderWithData, ColumnChunkMetaData chunkMetaData)
private static DictionaryPage decodeDictionaryPage(ParquetDataSourceId dataSourceId, PageHeaderWithData pageHeaderWithData, ColumnChunkMetaData chunkMetaData)
{
PageHeader pageHeader = pageHeaderWithData.pageHeader();
DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
Expand All @@ -302,7 +303,7 @@ private static DictionaryPage decodeDictionaryPage(PageHeaderWithData pageHeader

Slice compressedData = pageHeaderWithData.compressedData();
try {
return new DictionaryPage(decompress(chunkMetaData.getCodec().getParquetCompressionCodec(), compressedData, pageHeader.getUncompressed_page_size()), dictionarySize, encoding);
return new DictionaryPage(decompress(dataSourceId, chunkMetaData.getCodec().getParquetCompressionCodec(), compressedData, pageHeader.getUncompressed_page_size()), dictionarySize, encoding);
}
catch (IOException e) {
throw new ParquetDecodingException("Could not decode the dictionary for " + chunkMetaData.getPath(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,12 +595,12 @@ private static Domain getDomain(Type type, DictionaryDescriptor dictionaryDescri

private static ParquetCorruptionException corruptionException(String column, ParquetDataSourceId id, Statistics<?> statistics, Exception cause)
{
return new ParquetCorruptionException(cause, "Corrupted statistics for column \"%s\" in Parquet file \"%s\": [%s]", column, id, statistics);
return new ParquetCorruptionException(cause, id, "Corrupted statistics for column \"%s\": [%s]", column, statistics);
}

private static ParquetCorruptionException corruptionException(String column, ParquetDataSourceId id, ColumnIndex columnIndex, Exception cause)
{
return new ParquetCorruptionException(cause, "Corrupted statistics for column \"%s\" in Parquet file \"%s\". Corrupted column index: [%s]", column, id, columnIndex);
return new ParquetCorruptionException(cause, id, "Corrupted statistics for column \"%s\". Corrupted column index: [%s]", column, columnIndex);
}

private static boolean isCorruptedColumnIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,22 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional<
// 4 bytes: MetadataLength
// MAGIC

validateParquet(dataSource.getEstimatedSize() >= MAGIC.length() + POST_SCRIPT_SIZE, "%s is not a valid Parquet File", dataSource.getId());
validateParquet(dataSource.getEstimatedSize() >= MAGIC.length() + POST_SCRIPT_SIZE, dataSource.getId(), "%s is not a valid Parquet File", dataSource.getId());

// Read the tail of the file
long estimatedFileSize = dataSource.getEstimatedSize();
long expectedReadSize = min(estimatedFileSize, EXPECTED_FOOTER_SIZE);
Slice buffer = dataSource.readTail(toIntExact(expectedReadSize));

Slice magic = buffer.slice(buffer.length() - MAGIC.length(), MAGIC.length());
validateParquet(MAGIC.equals(magic), "Not valid Parquet file: %s expected magic number: %s got: %s", dataSource.getId(), MAGIC.toStringUtf8(), magic.toStringUtf8());
validateParquet(MAGIC.equals(magic), dataSource.getId(), "Expected magic number: %s got: %s", MAGIC.toStringUtf8(), magic.toStringUtf8());

int metadataLength = buffer.getInt(buffer.length() - POST_SCRIPT_SIZE);
long metadataIndex = estimatedFileSize - POST_SCRIPT_SIZE - metadataLength;
validateParquet(
metadataIndex >= MAGIC.length() && metadataIndex < estimatedFileSize - POST_SCRIPT_SIZE,
"Corrupted Parquet file: %s metadata index: %s out of range",
dataSource.getId(),
"Metadata index: %s out of range",
metadataIndex);

int completeFooterSize = metadataLength + POST_SCRIPT_SIZE;
Expand All @@ -116,16 +116,16 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional<
InputStream metadataStream = buffer.slice(buffer.length() - completeFooterSize, metadataLength).getInput();

FileMetaData fileMetaData = readFileMetaData(metadataStream);
ParquetMetadata parquetMetadata = createParquetMetadata(fileMetaData, dataSource.getId().toString());
ParquetMetadata parquetMetadata = createParquetMetadata(fileMetaData, dataSource.getId());
validateFileMetadata(dataSource.getId(), parquetMetadata.getFileMetaData(), parquetWriteValidation);
return parquetMetadata;
}

public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, String filename)
public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId)
throws ParquetCorruptionException
{
List<SchemaElement> schema = fileMetaData.getSchema();
validateParquet(!schema.isEmpty(), "Empty Parquet schema in file: %s", filename);
validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty");

MessageType messageType = readParquetSchema(schema);
List<BlockMetaData> blocks = new ArrayList<>();
Expand All @@ -136,12 +136,13 @@ public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, S
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
List<ColumnChunk> columns = rowGroup.getColumns();
validateParquet(!columns.isEmpty(), "No columns in row group: %s", rowGroup);
validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup);
String filePath = columns.get(0).getFile_path();
for (ColumnChunk columnChunk : columns) {
validateParquet(
(filePath == null && columnChunk.getFile_path() == null)
|| (filePath != null && filePath.equals(columnChunk.getFile_path())),
dataSourceId,
"all column chunks of the same row group must be in the same file");
ColumnMetaData metaData = columnChunk.meta_data;
String[] path = metaData.path_in_schema.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.parquet.DataPageV2;
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.Page;
import io.trino.parquet.ParquetDataSourceId;
import jakarta.annotation.Nullable;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.statistics.Statistics;
Expand All @@ -36,9 +37,11 @@
import static com.google.common.base.Preconditions.checkState;
import static io.trino.parquet.ParquetCompressionUtils.decompress;
import static io.trino.parquet.ParquetReaderUtils.isOnlyDictionaryEncodingPages;
import static java.util.Objects.requireNonNull;

public final class PageReader
{
private final ParquetDataSourceId dataSourceId;
private final CompressionCodec codec;
private final boolean hasOnlyDictionaryEncodedPages;
private final boolean hasNoNulls;
Expand All @@ -48,6 +51,7 @@ public final class PageReader
private int dataPageReadCount;

public static PageReader createPageReader(
ParquetDataSourceId dataSourceId,
ChunkedInputStream columnChunk,
ColumnChunkMetaData metadata,
ColumnDescriptor columnDescriptor,
Expand All @@ -61,21 +65,30 @@ public static PageReader createPageReader(
boolean hasNoNulls = columnStatistics != null && columnStatistics.getNumNulls() == 0;
boolean hasOnlyDictionaryEncodedPages = isOnlyDictionaryEncodingPages(metadata);
ParquetColumnChunkIterator compressedPages = new ParquetColumnChunkIterator(
dataSourceId,
fileCreatedBy,
columnDescriptor,
metadata,
columnChunk,
offsetIndex);
return new PageReader(metadata.getCodec().getParquetCompressionCodec(), compressedPages, hasOnlyDictionaryEncodedPages, hasNoNulls);

return new PageReader(
dataSourceId,
metadata.getCodec().getParquetCompressionCodec(),
compressedPages,
hasOnlyDictionaryEncodedPages,
hasNoNulls);
}

@VisibleForTesting
public PageReader(
ParquetDataSourceId dataSourceId,
CompressionCodec codec,
Iterator<? extends Page> compressedPages,
boolean hasOnlyDictionaryEncodedPages,
boolean hasNoNulls)
{
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
this.codec = codec;
this.compressedPages = Iterators.peekingIterator(compressedPages);
this.hasOnlyDictionaryEncodedPages = hasOnlyDictionaryEncodedPages;
Expand Down Expand Up @@ -106,7 +119,7 @@ public DataPage readPage()
return dataPageV1;
}
return new DataPageV1(
decompress(codec, dataPageV1.getSlice(), dataPageV1.getUncompressedSize()),
decompress(dataSourceId, codec, dataPageV1.getSlice(), dataPageV1.getUncompressedSize()),
dataPageV1.getValueCount(),
dataPageV1.getUncompressedSize(),
dataPageV1.getFirstRowIndex(),
Expand All @@ -128,7 +141,7 @@ public DataPage readPage()
dataPageV2.getRepetitionLevels(),
dataPageV2.getDefinitionLevels(),
dataPageV2.getDataEncoding(),
decompress(codec, dataPageV2.getSlice(), uncompressedSize),
decompress(dataSourceId, codec, dataPageV2.getSlice(), uncompressedSize),
dataPageV2.getUncompressedSize(),
dataPageV2.getFirstRowIndex(),
dataPageV2.getStatistics(),
Expand All @@ -150,7 +163,7 @@ public DictionaryPage readDictionaryPage()
try {
DictionaryPage compressedDictionaryPage = (DictionaryPage) compressedPages.next();
return new DictionaryPage(
decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()),
decompress(dataSourceId, codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()),
compressedDictionaryPage.getDictionarySize(),
compressedDictionaryPage.getEncoding());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.Page;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSourceId;
import jakarta.annotation.Nullable;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
Expand All @@ -41,6 +42,7 @@
public final class ParquetColumnChunkIterator
implements Iterator<Page>
{
private final ParquetDataSourceId dataSourceId;
private final Optional<String> fileCreatedBy;
private final ColumnDescriptor descriptor;
private final ColumnChunkMetaData metadata;
Expand All @@ -51,12 +53,14 @@ public final class ParquetColumnChunkIterator
private int dataPageCount;

public ParquetColumnChunkIterator(
ParquetDataSourceId dataSourceId,
Optional<String> fileCreatedBy,
ColumnDescriptor descriptor,
ColumnChunkMetaData metadata,
ChunkedInputStream input,
@Nullable OffsetIndex offsetIndex)
{
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null");
this.descriptor = requireNonNull(descriptor, "descriptor is null");
this.metadata = requireNonNull(metadata, "metadata is null");
Expand All @@ -83,7 +87,7 @@ public Page next()
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dataPageCount != 0) {
throw new ParquetCorruptionException("Column (%s) has a dictionary page after the first position in column chunk", descriptor);
throw new ParquetCorruptionException(dataSourceId, "Column (%s) has a dictionary page after the first position in column chunk", descriptor);
}
result = readDictionaryPage(pageHeader, pageHeader.getUncompressed_page_size(), pageHeader.getCompressed_page_size());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ private ColumnChunk readStruct(GroupField field)
}

if (columnChunk == null) {
throw new ParquetCorruptionException("Struct field does not have any children: " + field);
throw new ParquetCorruptionException(dataSource.getId(), "Struct field does not have any children: %s", field);
}

StructColumnReader.RowBlockPositions structIsNull = StructColumnReader.calculateStructOffsets(field, columnChunk.getDefinitionLevels(), columnChunk.getRepetitionLevels());
Expand Down Expand Up @@ -467,7 +467,7 @@ private ColumnChunk readPrimitive(PrimitiveField field)
int fieldId = field.getId();
ColumnReader columnReader = columnReaders.get(fieldId);
if (!columnReader.hasPageReader()) {
validateParquet(currentBlockMetadata.getRowCount() > 0, "Row group has 0 rows");
validateParquet(currentBlockMetadata.getRowCount() > 0, dataSource.getId(), "Row group has 0 rows");
ColumnChunkMetaData metadata = getColumnChunkMetaData(currentBlockMetadata, columnDescriptor);
FilteredRowRanges rowRanges = blockRowRanges[currentRowGroup];
OffsetIndex offsetIndex = null;
Expand All @@ -476,7 +476,7 @@ private ColumnChunk readPrimitive(PrimitiveField field)
}
ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup));
columnReader.setPageReader(
createPageReader(columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy),
createPageReader(dataSource.getId(), columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy),
Optional.ofNullable(rowRanges));
}
ColumnChunk columnChunk = columnReader.readPrimitive();
Expand Down
Loading

0 comments on commit 7d664e1

Please sign in to comment.