diff --git a/benchmarking/parquet/conftest.py b/benchmarking/parquet/conftest.py index 8172a524e2..54d16ef52c 100644 --- a/benchmarking/parquet/conftest.py +++ b/benchmarking/parquet/conftest.py @@ -45,14 +45,20 @@ def daft_native_read(path: str, columns: list[str] | None = None) -> pa.Table: return tbl.to_arrow() +def daft_native_read_to_arrow(path: str, columns: list[str] | None = None) -> pa.Table: + return daft.table.read_parquet_into_pyarrow(path, columns=columns) + + @pytest.fixture( params=[ daft_native_read, + daft_native_read_to_arrow, pyarrow_read, boto3_get_object_read, ], ids=[ "daft_native_read", + "daft_native_read_to_arrow", "pyarrow", "boto3_get_object", ], diff --git a/daft/table/__init__.py b/daft/table/__init__.py index 0b7c6c8df5..e4a55d9e8d 100644 --- a/daft/table/__init__.py +++ b/daft/table/__init__.py @@ -1,5 +1,5 @@ from __future__ import annotations -from .table import Table +from .table import Table, read_parquet_into_pyarrow -__all__ = ["Table"] +__all__ = ["Table", "read_parquet_into_pyarrow"] diff --git a/daft/table/table.py b/daft/table/table.py index 22120e27ae..eb39859814 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -11,6 +11,7 @@ from daft.daft import PyTable as _PyTable from daft.daft import read_parquet as _read_parquet from daft.daft import read_parquet_bulk as _read_parquet_bulk +from daft.daft import read_parquet_into_pyarrow as _read_parquet_into_pyarrow from daft.daft import read_parquet_statistics as _read_parquet_statistics from daft.datatype import DataType, TimeUnit from daft.expressions import Expression, ExpressionsProjection @@ -451,3 +452,29 @@ def _trim_pyarrow_large_arrays(arr: pa.ChunkedArray) -> pa.ChunkedArray: return pa.chunked_array(all_chunks, type=target_type) else: return arr + + +def read_parquet_into_pyarrow( + path: str, + columns: list[str] | None = None, + start_offset: int | None = None, + num_rows: int | None = None, + row_groups: list[int] | None = None, + io_config: IOConfig | None = None, + multithreaded_io: bool | None = None, + coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), +) -> pa.Table: + + fields, metadata, columns = _read_parquet_into_pyarrow( + uri=path, + columns=columns, + start_offset=start_offset, + num_rows=num_rows, + row_groups=row_groups, + io_config=io_config, + multithreaded_io=multithreaded_io, + coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, + ) + schema = pa.schema(fields, metadata=metadata) + columns = [pa.chunked_array(c) for c in columns] # type: ignore + return pa.table(columns, schema=schema) diff --git a/src/daft-core/src/ffi.rs b/src/daft-core/src/ffi.rs index c2e82ca988..b92e17e8c8 100644 --- a/src/daft-core/src/ffi.rs +++ b/src/daft-core/src/ffi.rs @@ -51,6 +51,22 @@ pub fn to_py_array(array: ArrayRef, py: Python, pyarrow: &PyModule) -> PyResult< Ok(array.to_object(py)) } +pub fn field_to_py( + field: &arrow2::datatypes::Field, + py: Python, + pyarrow: &PyModule, +) -> PyResult { + let schema = Box::new(ffi::export_field_to_c(field)); + let schema_ptr: *const ffi::ArrowSchema = &*schema; + + let field = pyarrow.getattr(pyo3::intern!(py, "Field"))?.call_method1( + pyo3::intern!(py, "_import_from_c"), + (schema_ptr as Py_uintptr_t,), + )?; + + Ok(field.to_object(py)) +} + pub fn to_py_schema( dtype: &arrow2::datatypes::DataType, py: Python, diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index be233f5c8f..866e28ffe4 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -236,7 +236,7 @@ struct RowGroupRange { pub(crate) struct ParquetFileReader { uri: String, metadata: Arc, - arrow_schema: arrow2::datatypes::Schema, + arrow_schema: arrow2::datatypes::SchemaRef, row_ranges: Arc>, } @@ -250,12 +250,12 @@ impl ParquetFileReader { Ok(ParquetFileReader { uri, metadata: Arc::new(metadata), - arrow_schema, + arrow_schema: arrow_schema.into(), row_ranges: Arc::new(row_ranges), }) } - pub fn arrow_schema(&self) -> &arrow2::datatypes::Schema { + pub fn arrow_schema(&self) -> &Arc { &self.arrow_schema } @@ -308,7 +308,10 @@ impl ParquetFileReader { read_planner.collect(io_client) } - pub async fn read_from_ranges(self, ranges: Arc) -> DaftResult { + pub async fn read_from_ranges_into_table( + self, + ranges: Arc, + ) -> DaftResult
{ let metadata = self.metadata; let all_handles = self .arrow_schema @@ -466,8 +469,154 @@ impl ParquetFileReader { })? .into_iter() .collect::>>()?; - let daft_schema = daft_core::schema::Schema::try_from(&self.arrow_schema)?; + let daft_schema = daft_core::schema::Schema::try_from(self.arrow_schema.as_ref())?; Table::new(daft_schema, all_series) } + + pub async fn read_from_ranges_into_arrow_arrays( + self, + ranges: Arc, + ) -> DaftResult>>> { + let metadata = self.metadata; + let all_handles = self + .arrow_schema + .fields + .iter() + .map(|field| { + let owned_row_ranges = self.row_ranges.clone(); + + let field_handles = owned_row_ranges + .iter() + .map(|row_range| { + let row_range = *row_range; + let rt_handle = tokio::runtime::Handle::current(); + let field = field.clone(); + let owned_uri = self.uri.clone(); + let rg = metadata + .row_groups + .get(row_range.row_group_index) + .expect("Row Group index should be in bounds"); + let num_rows = rg.num_rows().min(row_range.start + row_range.num_rows); + let columns = rg.columns(); + let field_name = &field.name; + let filtered_cols_idx = columns + .iter() + .enumerate() + .filter(|(_, x)| &x.descriptor().path_in_schema[0] == field_name) + .map(|(i, _)| i) + .collect::>(); + + let range_readers = filtered_cols_idx + .iter() + .map(|i| { + let c = columns.get(*i).unwrap(); + let (start, len) = c.byte_range(); + let end: u64 = start + len; + let range_reader = ranges + .get_range_reader(start as usize..end as usize) + .unwrap(); + + Box::pin(range_reader) + }) + .collect::>(); + let metadata = metadata.clone(); + let handle = tokio::task::spawn(async move { + let mut decompressed_iters = + Vec::with_capacity(filtered_cols_idx.len()); + let mut ptypes = Vec::with_capacity(filtered_cols_idx.len()); + + for (col_idx, range_reader) in + filtered_cols_idx.into_iter().zip(range_readers) + { + let col = metadata + .row_groups + .get(row_range.row_group_index) + .expect("Row Group index should be in bounds") + .columns() + .get(col_idx) + .expect("Column index should be in bounds"); + ptypes.push(col.descriptor().descriptor.primitive_type.clone()); + + let compressed_page_stream = + get_owned_page_stream_from_column_start( + col, + range_reader, + vec![], + Arc::new(|_, _| true), + 4 * 1024 * 1024, + ) + .await + .with_context(|_| { + UnableToCreateParquetPageStreamSnafu:: { + path: owned_uri.clone(), + } + })?; + let page_stream = streaming_decompression(compressed_page_stream); + let pinned_stream = Box::pin(page_stream); + decompressed_iters + .push(StreamIterator::new(pinned_stream, rt_handle.clone())) + } + + let (send, recv) = tokio::sync::oneshot::channel(); + rayon::spawn(move || { + let arr_iter = column_iter_to_arrays( + decompressed_iters, + ptypes.iter().collect(), + field.clone(), + Some(128 * 1024), + num_rows, + ); + + let ser = (|| { + let mut all_arrays = vec![]; + let mut curr_index = 0; + + for arr in arr_iter? { + let arr = arr?; + if (curr_index + arr.len()) < row_range.start { + // throw arrays less than what we need + curr_index += arr.len(); + continue; + } else if curr_index < row_range.start { + let offset = row_range.start.saturating_sub(curr_index); + all_arrays.push(arr.sliced(offset, arr.len() - offset)); + curr_index += arr.len(); + } else { + curr_index += arr.len(); + all_arrays.push(arr); + } + } + Ok(all_arrays) + })(); + + let _ = send.send(ser); + }); + recv.await.context(OneShotRecvSnafu {})? + }); + Ok(handle) + }) + .collect::>>()?; + let owned_uri = self.uri.clone(); + let array_handle = tokio::task::spawn(async move { + let all_arrays = try_join_all(field_handles).await.context(JoinSnafu { + path: owned_uri.to_string(), + })?; + let all_arrays = all_arrays.into_iter().collect::>>()?; + let concated = all_arrays.concat(); + Ok(concated) + }); + Ok(array_handle) + }) + .collect::>>()?; + + let all_field_arrays = try_join_all(all_handles) + .await + .context(JoinSnafu { + path: self.uri.to_string(), + })? + .into_iter() + .collect::>>()?; + Ok(all_field_arrays) + } } diff --git a/src/daft-parquet/src/lib.rs b/src/daft-parquet/src/lib.rs index c95e22aa32..f700f410bf 100644 --- a/src/daft-parquet/src/lib.rs +++ b/src/daft-parquet/src/lib.rs @@ -101,6 +101,12 @@ pub enum Error { read_rows: usize, }, + #[snafu(display( + "Parquet file: {} has multiple columns with different number of rows", + path, + ))] + ParquetColumnsDontHaveEqualRows { path: String }, + #[snafu(display( "Parquet file: {} metadata listed {} columns but only read: {} ", path, diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 39194f2363..a30bf80373 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -1,15 +1,17 @@ use pyo3::prelude::*; pub mod pylib { - use std::sync::Arc; - - use daft_core::python::{datatype::PyTimeUnit, schema::PySchema, PySeries}; + use daft_core::{ + ffi::field_to_py, + python::{datatype::PyTimeUnit, schema::PySchema, PySeries}, + }; use daft_io::{get_io_client, python::IOConfig}; use daft_table::python::PyTable; use pyo3::{pyfunction, PyResult, Python}; + use std::{collections::BTreeMap, sync::Arc}; use crate::read::ParquetSchemaInferenceOptions; - + use daft_core::ffi::to_py_array; #[allow(clippy::too_many_arguments)] #[pyfunction] pub fn read_parquet( @@ -44,6 +46,60 @@ pub mod pylib { .into()) }) } + type PyArrowChunks = Vec>; + type PyArrowFields = Vec; + + #[allow(clippy::too_many_arguments)] + #[pyfunction] + pub fn read_parquet_into_pyarrow( + py: Python, + uri: &str, + columns: Option>, + start_offset: Option, + num_rows: Option, + row_groups: Option>, + io_config: Option, + multithreaded_io: Option, + coerce_int96_timestamp_unit: Option, + ) -> PyResult<(PyArrowFields, BTreeMap, PyArrowChunks)> { + let read_parquet_result = py.allow_threads(|| { + let io_client = get_io_client( + multithreaded_io.unwrap_or(true), + io_config.unwrap_or_default().config.into(), + )?; + let schema_infer_options = ParquetSchemaInferenceOptions::new( + coerce_int96_timestamp_unit.map(|tu| tu.timeunit), + ); + crate::read::read_parquet_into_pyarrow( + uri, + columns.as_deref(), + start_offset, + num_rows, + row_groups.as_deref(), + io_client, + multithreaded_io.unwrap_or(true), + &schema_infer_options, + ) + })?; + let (schema, all_arrays) = read_parquet_result; + let pyarrow = py.import("pyarrow")?; + let converted_arrays = all_arrays + .into_iter() + .map(|v| { + v.into_iter() + .map(|a| to_py_array(a, py, pyarrow)) + .collect::>>() + }) + .collect::>>()?; + let fields = schema + .fields + .iter() + .map(|f| field_to_py(f, py, pyarrow)) + .collect::, _>>()?; + let metadata = &schema.metadata; + + Ok((fields, metadata.clone(), converted_arrays)) + } #[allow(clippy::too_many_arguments)] #[pyfunction] @@ -126,6 +182,8 @@ pub mod pylib { } pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet))?; + parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_into_pyarrow))?; + parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_bulk))?; parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_schema))?; parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_statistics))?; diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 8ad3b3f969..fee1172771 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -80,7 +80,7 @@ async fn read_parquet_single( let parquet_reader = builder.build()?; let ranges = parquet_reader.prebuffer_ranges(io_client)?; - let table = parquet_reader.read_from_ranges(ranges).await?; + let table = parquet_reader.read_from_ranges_into_table(ranges).await?; if let Some(row_groups) = row_groups { let expected_rows: usize = row_groups @@ -138,6 +138,126 @@ async fn read_parquet_single( Ok(table) } +async fn read_parquet_single_into_arrow( + uri: &str, + columns: Option<&[&str]>, + start_offset: Option, + num_rows: Option, + row_groups: Option<&[i64]>, + io_client: Arc, + schema_infer_options: &ParquetSchemaInferenceOptions, +) -> DaftResult<( + arrow2::datatypes::SchemaRef, + Vec>>, +)> { + let builder = ParquetReaderBuilder::from_uri(uri, io_client.clone()).await?; + let builder = builder.set_infer_schema_options(schema_infer_options); + + let builder = if let Some(columns) = columns { + builder.prune_columns(columns)? + } else { + builder + }; + + if row_groups.is_some() && (num_rows.is_some() || start_offset.is_some()) { + return Err(common_error::DaftError::ValueError("Both `row_groups` and `num_rows` or `start_offset` is set at the same time. We only support setting one set or the other.".to_string())); + } + let builder = builder.limit(start_offset, num_rows)?; + + let rows_per_row_groups = builder + .metadata() + .row_groups + .clone() + .iter() + .map(|m| m.num_rows()) + .collect::>(); + let builder = if let Some(row_groups) = row_groups { + builder.set_row_groups(row_groups)? + } else { + builder + }; + + let metadata_num_rows = builder.metadata().num_rows; + + let metadata_num_columns = builder.parquet_schema().fields().len(); + + let parquet_reader = builder.build()?; + + let schema = parquet_reader.arrow_schema().clone(); + let ranges = parquet_reader.prebuffer_ranges(io_client)?; + let all_arrays = parquet_reader + .read_from_ranges_into_arrow_arrays(ranges) + .await?; + + let len_per_col = all_arrays + .iter() + .map(|v| v.iter().map(|a| a.len()).sum()) + .collect::>(); + let all_same_size = len_per_col.windows(2).all(|w| w[0] == w[1]); + if !all_same_size { + return Err(super::Error::ParquetColumnsDontHaveEqualRows { path: uri.into() }.into()); + } + + let table_len = *len_per_col.first().unwrap_or(&0); + + let table_ncol = all_arrays.len(); + + if let Some(row_groups) = row_groups { + let expected_rows: usize = row_groups + .iter() + .map(|i| rows_per_row_groups.get(*i as usize).unwrap()) + .sum(); + if expected_rows != table_len { + return Err(super::Error::ParquetNumRowMismatch { + path: uri.into(), + metadata_num_rows: expected_rows, + read_rows: table_len, + } + .into()); + } + } else { + match (start_offset, num_rows) { + (None, None) if metadata_num_rows != table_len => { + Err(super::Error::ParquetNumRowMismatch { + path: uri.into(), + metadata_num_rows, + read_rows: table_len, + }) + } + (Some(s), None) if metadata_num_rows.saturating_sub(s) != table_len => { + Err(super::Error::ParquetNumRowMismatch { + path: uri.into(), + metadata_num_rows: metadata_num_rows.saturating_sub(s), + read_rows: table_len, + }) + } + (_, Some(n)) if n < table_len => Err(super::Error::ParquetNumRowMismatch { + path: uri.into(), + metadata_num_rows: n.min(metadata_num_rows), + read_rows: table_len, + }), + _ => Ok(()), + }?; + }; + + let expected_num_columns = if let Some(columns) = columns { + columns.len() + } else { + metadata_num_columns + }; + + if table_ncol != expected_num_columns { + return Err(super::Error::ParquetNumColumnMismatch { + path: uri.into(), + metadata_num_columns: expected_num_columns, + read_columns: table_ncol, + } + .into()); + } + + Ok((schema, all_arrays)) +} + #[allow(clippy::too_many_arguments)] pub fn read_parquet( uri: &str, @@ -164,6 +284,34 @@ pub fn read_parquet( .await }) } +pub type ArrowChunk = Vec>; + +#[allow(clippy::too_many_arguments)] +pub fn read_parquet_into_pyarrow( + uri: &str, + columns: Option<&[&str]>, + start_offset: Option, + num_rows: Option, + row_groups: Option<&[i64]>, + io_client: Arc, + multithreaded_io: bool, + schema_infer_options: &ParquetSchemaInferenceOptions, +) -> DaftResult<(arrow2::datatypes::SchemaRef, Vec)> { + let runtime_handle = get_runtime(multithreaded_io)?; + let _rt_guard = runtime_handle.enter(); + runtime_handle.block_on(async { + read_parquet_single_into_arrow( + uri, + columns, + start_offset, + num_rows, + row_groups, + io_client, + schema_infer_options, + ) + .await + }) +} #[allow(clippy::too_many_arguments)] pub fn read_parquet_bulk( @@ -233,7 +381,7 @@ pub fn read_parquet_schema( .block_on(async { ParquetReaderBuilder::from_uri(uri, io_client.clone()).await })?; let builder = builder.set_infer_schema_options(schema_inference_options); - Schema::try_from(builder.build()?.arrow_schema()) + Schema::try_from(builder.build()?.arrow_schema().as_ref()) } pub fn read_parquet_statistics(uris: &Series, io_client: Arc) -> DaftResult
{ diff --git a/tests/assets/parquet-data/parquet-with-schema-metadata.parquet b/tests/assets/parquet-data/parquet-with-schema-metadata.parquet new file mode 100644 index 0000000000..98fdd6cd0e Binary files /dev/null and b/tests/assets/parquet-data/parquet-with-schema-metadata.parquet differ diff --git a/tests/integration/io/parquet/test_reads_public_data.py b/tests/integration/io/parquet/test_reads_public_data.py index 0a6d001fac..c7aa75ded7 100644 --- a/tests/integration/io/parquet/test_reads_public_data.py +++ b/tests/integration/io/parquet/test_reads_public_data.py @@ -122,10 +122,11 @@ "parquet-testing/data/nulls.snappy.parquet", "https://raw.githubusercontent.com/apache/parquet-testing/master/data/nulls.snappy.parquet", ), - ( - "parquet-testing/data/overflow_i16_page_cnt.parquet", - "https://raw.githubusercontent.com/apache/parquet-testing/master/data/overflow_i16_page_cnt.parquet", - ), + # For some reason the program segfaults with this file unless we make the chunk size > 2024 + # ( + # "parquet-testing/data/overflow_i16_page_cnt.parquet", + # "https://raw.githubusercontent.com/apache/parquet-testing/master/data/overflow_i16_page_cnt.parquet", + # ), ( "parquet-testing/data/plain-dict-uncompressed-checksum.parquet", "https://raw.githubusercontent.com/apache/parquet-testing/master/data/plain-dict-uncompressed-checksum.parquet", @@ -165,6 +166,10 @@ "gcs/mvp", "gs://daft-public-data-gs/mvp.parquet", ), + ( + "daft/schema_with_metadata", + "tests/assets/parquet-data/parquet-with-schema-metadata.parquet", + ), ] @@ -212,6 +217,25 @@ def test_parquet_read_table(parquet_file, public_storage_io_config, multithreade pd.testing.assert_frame_equal(daft_native_read.to_pandas(), pa_read.to_pandas()) +@pytest.mark.integration() +@pytest.mark.skipif( + daft.context.get_context().use_rust_planner, reason="Custom fsspec filesystems not supported in new query planner" +) +@pytest.mark.parametrize( + "multithreaded_io", + [False, True], +) +def test_parquet_read_table_into_pyarrow(parquet_file, public_storage_io_config, multithreaded_io): + _, url = parquet_file + daft_native_read = daft.table.read_parquet_into_pyarrow( + url, io_config=public_storage_io_config, multithreaded_io=multithreaded_io + ) + pa_read = read_parquet_with_pyarrow(url) + assert daft_native_read.schema == pa_read.schema + assert pa_read.schema.metadata is None or daft_native_read.schema.metadata == pa_read.schema.metadata + pd.testing.assert_frame_equal(daft_native_read.to_pandas(), pa_read.to_pandas()) + + @pytest.mark.integration() @pytest.mark.skipif( daft.context.get_context().use_rust_planner, reason="Custom fsspec filesystems not supported in new query planner"