From 707148345a9de49e4639bf4da134c64d2e973478 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 17 Oct 2024 15:03:07 -0700 Subject: [PATCH 1/4] Bump up max_header_size that's being used to read parquet page-header --- src/daft-parquet/src/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 84dfa5a83e..2f9ed19ace 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -316,7 +316,7 @@ impl ParquetFileReader { // Set to a very high number 256MB to guard against unbounded large // downloads from remote storage, which likely indicates corrupted Parquet data // See: https://github.com/Eventual-Inc/Daft/issues/1551 - const MAX_HEADER_SIZE: usize = 256 * 1024 * 1024; + const MAX_HEADER_SIZE: usize = 2 * 1024 * 1024 * 1024; fn new( uri: String, From 0ca23bad44f3acaf40555776cc6ffc74af1e5a23 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 17 Oct 2024 17:50:47 -0700 Subject: [PATCH 2/4] Add comment; updates names of parameters to functions --- src/daft-parquet/src/file.rs | 13 ++++++------- src/parquet2/src/read/page/stream.rs | 8 ++++---- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 2f9ed19ace..730d0b3315 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -313,10 +313,9 @@ pub struct ParquetFileReader { impl ParquetFileReader { const DEFAULT_CHUNK_SIZE: usize = 2048; - // Set to a very high number 256MB to guard against unbounded large - // downloads from remote storage, which likely indicates corrupted Parquet data - // See: https://github.com/Eventual-Inc/Daft/issues/1551 - const MAX_HEADER_SIZE: usize = 2 * 1024 * 1024 * 1024; + // Set to 2GB because that's the maximum size of strings allowable by Parquet (using i32 offsets). + // See issue: https://github.com/Eventual-Inc/Daft/issues/3007 + const MAX_PAGE_SIZE: usize = 2 * 1024 * 1024 * 1024; fn new( uri: String, @@ -473,7 +472,7 @@ impl ParquetFileReader { range_reader, vec![], Arc::new(|_, _| true), - Self::MAX_HEADER_SIZE, + Self::MAX_PAGE_SIZE, ) .with_context( |_| UnableToCreateParquetPageStreamSnafu:: { @@ -638,7 +637,7 @@ impl ParquetFileReader { range_reader, vec![], Arc::new(|_, _| true), - Self::MAX_HEADER_SIZE, + Self::MAX_PAGE_SIZE, ) .with_context(|_| { UnableToCreateParquetPageStreamSnafu:: { @@ -821,7 +820,7 @@ impl ParquetFileReader { range_reader, vec![], Arc::new(|_, _| true), - Self::MAX_HEADER_SIZE, + Self::MAX_PAGE_SIZE, ) .with_context(|_| { UnableToCreateParquetPageStreamSnafu:: { diff --git a/src/parquet2/src/read/page/stream.rs b/src/parquet2/src/read/page/stream.rs index 25fb0fe6fc..523fc335b6 100644 --- a/src/parquet2/src/read/page/stream.rs +++ b/src/parquet2/src/read/page/stream.rs @@ -37,7 +37,7 @@ pub async fn get_page_stream_from_column_start<'a, R: AsyncRead + Unpin + Send>( reader: &'a mut R, scratch: Vec, pages_filter: PageFilter, - max_header_size: usize, + max_page_size: usize, ) -> Result> + 'a> { let page_metadata: PageMetaData = column_metadata.into(); Ok(_get_page_stream( @@ -47,7 +47,7 @@ pub async fn get_page_stream_from_column_start<'a, R: AsyncRead + Unpin + Send>( page_metadata.descriptor, scratch, pages_filter, - max_header_size, + max_page_size, )) } @@ -56,7 +56,7 @@ pub fn get_owned_page_stream_from_column_start( reader: R, scratch: Vec, pages_filter: PageFilter, - max_header_size: usize, + max_page_size: usize, ) -> Result>> { let page_metadata: PageMetaData = column_metadata.into(); Ok(_get_owned_page_stream( @@ -66,7 +66,7 @@ pub fn get_owned_page_stream_from_column_start( page_metadata.descriptor, scratch, pages_filter, - max_header_size, + max_page_size, )) } From dd3774157c4fefd42914adb29534a5bd271a5d0a Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 22 Oct 2024 21:40:58 -0700 Subject: [PATCH 3/4] Add logic to force convert all utf8's and binary's to large their variants --- src/arrow2/src/io/parquet/read/deserialize/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/arrow2/src/io/parquet/read/deserialize/mod.rs b/src/arrow2/src/io/parquet/read/deserialize/mod.rs index c05978d748..4086b5f7f5 100644 --- a/src/arrow2/src/io/parquet/read/deserialize/mod.rs +++ b/src/arrow2/src/io/parquet/read/deserialize/mod.rs @@ -204,7 +204,7 @@ pub fn n_columns(data_type: &DataType) -> usize { pub fn column_iter_to_arrays<'a, I>( columns: Vec, types: Vec<&PrimitiveType>, - field: Field, + mut field: Field, chunk_size: Option, num_rows: usize, num_values: Vec, @@ -212,6 +212,15 @@ pub fn column_iter_to_arrays<'a, I>( where I: Pages + 'a, { + let utf_and_binary_to_large = true; + if utf_and_binary_to_large { + match field.data_type { + DataType::Utf8 => field.data_type = DataType::LargeUtf8, + DataType::Binary => field.data_type = DataType::LargeBinary, + _ => (), + } + }; + Ok(Box::new( columns_to_iter_recursive( columns, From d39da8e8cec527184fda590d5f8291fcde402435 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Wed, 23 Oct 2024 00:18:23 -0700 Subject: [PATCH 4/4] Revert force conversion for utf8 and binaries to their large counterparts --- src/arrow2/src/io/parquet/read/deserialize/mod.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/arrow2/src/io/parquet/read/deserialize/mod.rs b/src/arrow2/src/io/parquet/read/deserialize/mod.rs index 4086b5f7f5..c05978d748 100644 --- a/src/arrow2/src/io/parquet/read/deserialize/mod.rs +++ b/src/arrow2/src/io/parquet/read/deserialize/mod.rs @@ -204,7 +204,7 @@ pub fn n_columns(data_type: &DataType) -> usize { pub fn column_iter_to_arrays<'a, I>( columns: Vec, types: Vec<&PrimitiveType>, - mut field: Field, + field: Field, chunk_size: Option, num_rows: usize, num_values: Vec, @@ -212,15 +212,6 @@ pub fn column_iter_to_arrays<'a, I>( where I: Pages + 'a, { - let utf_and_binary_to_large = true; - if utf_and_binary_to_large { - match field.data_type { - DataType::Utf8 => field.data_type = DataType::LargeUtf8, - DataType::Binary => field.data_type = DataType::LargeBinary, - _ => (), - } - }; - Ok(Box::new( columns_to_iter_recursive( columns,