Skip to content

Commit

Permalink
pass arrow schema to pyarrow
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Sep 12, 2023
1 parent ef46ed3 commit 3a6fb39
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 21 deletions.
6 changes: 4 additions & 2 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ def read_parquet_into_pyarrow(
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
) -> pa.Table:

names, columns = _read_parquet_into_pyarrow(
fields, columns = _read_parquet_into_pyarrow(

Check warning on line 468 in daft/table/table.py

View check run for this annotation

Codecov / codecov/patch

daft/table/table.py#L468

Added line #L468 was not covered by tests
uri=path,
columns=columns,
start_offset=start_offset,
Expand All @@ -475,5 +475,7 @@ def read_parquet_into_pyarrow(
multithreaded_io=multithreaded_io,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
)

schema = pa.schema(fields)
columns = [pa.chunked_array(c) for c in columns]
return pa.table(columns, names=names)
return pa.table(columns, schema=schema)

Check warning on line 481 in daft/table/table.py

View check run for this annotation

Codecov / codecov/patch

daft/table/table.py#L479-L481

Added lines #L479 - L481 were not covered by tests
16 changes: 16 additions & 0 deletions src/daft-core/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ pub fn to_py_array(array: ArrayRef, py: Python, pyarrow: &PyModule) -> PyResult<
Ok(array.to_object(py))
}

pub fn field_to_py(
field: &arrow2::datatypes::Field,
py: Python,
pyarrow: &PyModule,
) -> PyResult<PyObject> {
let schema = Box::new(ffi::export_field_to_c(field));
let schema_ptr: *const ffi::ArrowSchema = &*schema;

let field = pyarrow.getattr(pyo3::intern!(py, "Field"))?.call_method1(
pyo3::intern!(py, "_import_from_c"),
(schema_ptr as Py_uintptr_t,),
)?;

Ok(field.to_object(py))
}

pub fn to_py_schema(
dtype: &arrow2::datatypes::DataType,
py: Python,
Expand Down
8 changes: 4 additions & 4 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ struct RowGroupRange {
pub(crate) struct ParquetFileReader {
uri: String,
metadata: Arc<parquet2::metadata::FileMetaData>,
arrow_schema: arrow2::datatypes::Schema,
arrow_schema: arrow2::datatypes::SchemaRef,
row_ranges: Arc<Vec<RowGroupRange>>,
}

Expand All @@ -250,12 +250,12 @@ impl ParquetFileReader {
Ok(ParquetFileReader {
uri,
metadata: Arc::new(metadata),
arrow_schema,
arrow_schema: arrow_schema.into(),
row_ranges: Arc::new(row_ranges),
})
}

pub fn arrow_schema(&self) -> &arrow2::datatypes::Schema {
pub fn arrow_schema(&self) -> &Arc<arrow2::datatypes::Schema> {
&self.arrow_schema
}

Expand Down Expand Up @@ -469,7 +469,7 @@ impl ParquetFileReader {
})?
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
let daft_schema = daft_core::schema::Schema::try_from(&self.arrow_schema)?;
let daft_schema = daft_core::schema::Schema::try_from(self.arrow_schema.as_ref())?;

Table::new(daft_schema, all_series)
}
Expand Down
17 changes: 12 additions & 5 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use pyo3::prelude::*;

pub mod pylib {
use daft_core::python::{datatype::PyTimeUnit, schema::PySchema, PySeries};
use daft_core::{
ffi::field_to_py,
python::{datatype::PyTimeUnit, schema::PySchema, PySeries},
};
use daft_io::{get_io_client, python::IOConfig};
use daft_table::python::PyTable;
use pyo3::{pyfunction, PyResult, Python};
Expand Down Expand Up @@ -56,7 +59,7 @@ pub mod pylib {
io_config: Option<IOConfig>,
multithreaded_io: Option<bool>,
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
) -> PyResult<(Vec<String>, Vec<Vec<pyo3::PyObject>>)> {
) -> PyResult<(Vec<pyo3::PyObject>, Vec<Vec<pyo3::PyObject>>)> {
let read_parquet_result = py.allow_threads(|| {
let io_client = get_io_client(
multithreaded_io.unwrap_or(true),
Expand All @@ -76,7 +79,7 @@ pub mod pylib {
&schema_infer_options,
)
})?;
let (names, all_arrays) = read_parquet_result;
let (schema, all_arrays) = read_parquet_result;
let pyarrow = py.import("pyarrow")?;
let converted_arrays = all_arrays
.into_iter()
Expand All @@ -86,8 +89,12 @@ pub mod pylib {
.collect::<PyResult<Vec<_>>>()
})
.collect::<PyResult<Vec<_>>>()?;

Ok((names, converted_arrays))
let fields = schema
.fields
.iter()
.map(|f| field_to_py(f, py, pyarrow))
.collect::<Result<Vec<_>, _>>()?;
Ok((fields, converted_arrays))
}

#[allow(clippy::too_many_arguments)]
Expand Down
18 changes: 8 additions & 10 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ async fn read_parquet_single_into_arrow(
row_groups: Option<&[i64]>,
io_client: Arc<IOClient>,
schema_infer_options: &ParquetSchemaInferenceOptions,
) -> DaftResult<(Vec<String>, Vec<Vec<Box<dyn arrow2::array::Array>>>)> {
) -> DaftResult<(
arrow2::datatypes::SchemaRef,
Vec<Vec<Box<dyn arrow2::array::Array>>>,
)> {
let builder = ParquetReaderBuilder::from_uri(uri, io_client.clone()).await?;
let builder = builder.set_infer_schema_options(schema_infer_options);

Expand Down Expand Up @@ -180,12 +183,7 @@ async fn read_parquet_single_into_arrow(

let parquet_reader = builder.build()?;

let schema = parquet_reader.arrow_schema();
let names = schema
.fields
.iter()
.map(|f| f.name.to_string())
.collect::<Vec<_>>();
let schema = parquet_reader.arrow_schema().clone();
let ranges = parquet_reader.prebuffer_ranges(io_client)?;
let all_arrays = parquet_reader
.read_from_ranges_into_arrow_arrays(ranges)
Expand Down Expand Up @@ -257,7 +255,7 @@ async fn read_parquet_single_into_arrow(
.into());
}

Ok((names, all_arrays))
Ok((schema, all_arrays))
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -298,7 +296,7 @@ pub fn read_parquet_into_pyarrow(
io_client: Arc<IOClient>,
multithreaded_io: bool,
schema_infer_options: &ParquetSchemaInferenceOptions,
) -> DaftResult<(Vec<String>, Vec<ArrowChunk>)> {
) -> DaftResult<(arrow2::datatypes::SchemaRef, Vec<ArrowChunk>)> {
let runtime_handle = get_runtime(multithreaded_io)?;
let _rt_guard = runtime_handle.enter();
runtime_handle.block_on(async {
Expand Down Expand Up @@ -383,7 +381,7 @@ pub fn read_parquet_schema(
.block_on(async { ParquetReaderBuilder::from_uri(uri, io_client.clone()).await })?;
let builder = builder.set_infer_schema_options(schema_inference_options);

Schema::try_from(builder.build()?.arrow_schema())
Schema::try_from(builder.build()?.arrow_schema().as_ref())
}

pub fn read_parquet_statistics(uris: &Series, io_client: Arc<IOClient>) -> DaftResult<Table> {
Expand Down

0 comments on commit 3a6fb39

Please sign in to comment.