Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] bulk parquet pyarrow reader #1396

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions benchmarking/parquet/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ def daft_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[p
return [t.to_arrow() for t in tables]


def daft_into_pyarrow_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[pa.Table]:
return daft.table.read_parquet_into_pyarrow_bulk(paths, columns=columns)


def pyarrow_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[pa.Table]:
return [pyarrow_read(f, columns=columns) for f in paths]

Expand All @@ -91,11 +95,13 @@ def boto_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[p
@pytest.fixture(
params=[
daft_bulk_read,
daft_into_pyarrow_bulk_read,
pyarrow_bulk_read,
boto_bulk_read,
],
ids=[
"daft_bulk_read",
"daft_into_pyarrow_bulk_read",
"pyarrow_bulk_read",
"boto3_bulk_read",
],
Expand Down
10 changes: 10 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,16 @@ def read_parquet_into_pyarrow(
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: PyTimeUnit | None = None,
): ...
def read_parquet_into_pyarrow_bulk(
uris: list[str],
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[list[int]] | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: PyTimeUnit | None = None,
): ...
def read_parquet_schema(
uri: str,
io_config: IOConfig | None = None,
Expand Down
4 changes: 2 additions & 2 deletions daft/table/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations

from .table import Table, read_parquet_into_pyarrow
from .table import Table, read_parquet_into_pyarrow, read_parquet_into_pyarrow_bulk

__all__ = ["Table", "read_parquet_into_pyarrow"]
__all__ = ["Table", "read_parquet_into_pyarrow", "read_parquet_into_pyarrow_bulk"]
27 changes: 27 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from daft.daft import read_parquet as _read_parquet
from daft.daft import read_parquet_bulk as _read_parquet_bulk
from daft.daft import read_parquet_into_pyarrow as _read_parquet_into_pyarrow
from daft.daft import read_parquet_into_pyarrow_bulk as _read_parquet_into_pyarrow_bulk
from daft.daft import read_parquet_statistics as _read_parquet_statistics
from daft.datatype import DataType, TimeUnit
from daft.expressions import Expression, ExpressionsProjection
Expand Down Expand Up @@ -476,3 +477,29 @@
schema = pa.schema(fields, metadata=metadata)
columns = [pa.chunked_array(c) for c in columns] # type: ignore
return pa.table(columns, schema=schema)


def read_parquet_into_pyarrow_bulk(
paths: list[str],
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int]] | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
) -> list[pa.Table]:
bulk_result = _read_parquet_into_pyarrow_bulk(

Check warning on line 492 in daft/table/table.py

View check run for this annotation

Codecov / codecov/patch

daft/table/table.py#L492

Added line #L492 was not covered by tests
uris=paths,
columns=columns,
start_offset=start_offset,
num_rows=num_rows,
row_groups=row_groups_per_path,
io_config=io_config,
multithreaded_io=multithreaded_io,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
)
return [

Check warning on line 502 in daft/table/table.py

View check run for this annotation

Codecov / codecov/patch

daft/table/table.py#L502

Added line #L502 was not covered by tests
pa.table([pa.chunked_array(c) for c in columns], schema=pa.schema(fields, metadata=metadata)) # type: ignore
for fields, metadata, columns in bulk_result
]
91 changes: 70 additions & 21 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ pub mod pylib {
};
use daft_io::{get_io_client, python::IOConfig};
use daft_table::python::PyTable;
use pyo3::{pyfunction, PyResult, Python};
use pyo3::{pyfunction, types::PyModule, PyResult, Python};
use std::{collections::BTreeMap, sync::Arc};

use crate::read::ParquetSchemaInferenceOptions;
use crate::read::{ArrowChunk, ParquetSchemaInferenceOptions};
use daft_core::ffi::to_py_array;
#[allow(clippy::too_many_arguments)]
#[pyfunction]
Expand Down Expand Up @@ -48,6 +48,29 @@ pub mod pylib {
}
type PyArrowChunks = Vec<Vec<pyo3::PyObject>>;
type PyArrowFields = Vec<pyo3::PyObject>;
type PyArrowParquetType = (PyArrowFields, BTreeMap<String, String>, PyArrowChunks);
fn convert_pyarrow_parquet_read_result_into_py(
py: Python,
schema: arrow2::datatypes::SchemaRef,
all_arrays: Vec<ArrowChunk>,
pyarrow: &PyModule,
) -> PyResult<PyArrowParquetType> {
let converted_arrays = all_arrays
.into_iter()
.map(|v| {
v.into_iter()
.map(|a| to_py_array(a, py, pyarrow))
.collect::<PyResult<Vec<_>>>()
})
.collect::<PyResult<Vec<_>>>()?;
let fields = schema
.fields
.iter()
.map(|f| field_to_py(f, py, pyarrow))
.collect::<Result<Vec<_>, _>>()?;
let metadata = &schema.metadata;
Ok((fields, metadata.clone(), converted_arrays))
}

#[allow(clippy::too_many_arguments)]
#[pyfunction]
Expand All @@ -61,7 +84,7 @@ pub mod pylib {
io_config: Option<IOConfig>,
multithreaded_io: Option<bool>,
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
) -> PyResult<(PyArrowFields, BTreeMap<String, String>, PyArrowChunks)> {
) -> PyResult<PyArrowParquetType> {
let read_parquet_result = py.allow_threads(|| {
let io_client = get_io_client(
multithreaded_io.unwrap_or(true),
Expand All @@ -83,24 +106,8 @@ pub mod pylib {
})?;
let (schema, all_arrays) = read_parquet_result;
let pyarrow = py.import("pyarrow")?;
let converted_arrays = all_arrays
.into_iter()
.map(|v| {
v.into_iter()
.map(|a| to_py_array(a, py, pyarrow))
.collect::<PyResult<Vec<_>>>()
})
.collect::<PyResult<Vec<_>>>()?;
let fields = schema
.fields
.iter()
.map(|f| field_to_py(f, py, pyarrow))
.collect::<Result<Vec<_>, _>>()?;
let metadata = &schema.metadata;

Ok((fields, metadata.clone(), converted_arrays))
convert_pyarrow_parquet_read_result_into_py(py, schema, all_arrays, pyarrow)
}

#[allow(clippy::too_many_arguments)]
#[pyfunction]
pub fn read_parquet_bulk(
Expand Down Expand Up @@ -139,6 +146,48 @@ pub mod pylib {
})
}

#[allow(clippy::too_many_arguments)]
#[pyfunction]
pub fn read_parquet_into_pyarrow_bulk(
py: Python,
uris: Vec<&str>,
columns: Option<Vec<&str>>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
io_config: Option<IOConfig>,
multithreaded_io: Option<bool>,
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
) -> PyResult<Vec<PyArrowParquetType>> {
let parquet_read_results = py.allow_threads(|| {
let io_client = get_io_client(
multithreaded_io.unwrap_or(true),
io_config.unwrap_or_default().config.into(),
)?;
let schema_infer_options = ParquetSchemaInferenceOptions::new(
coerce_int96_timestamp_unit.map(|tu| tu.timeunit),
);

crate::read::read_parquet_into_pyarrow_bulk(
uris.as_ref(),
columns.as_deref(),
start_offset,
num_rows,
row_groups,
io_client,
multithreaded_io.unwrap_or(true),
&schema_infer_options,
)
})?;
let pyarrow = py.import("pyarrow")?;
parquet_read_results
.into_iter()
.map(|(s, all_arrays)| {
convert_pyarrow_parquet_read_result_into_py(py, s, all_arrays, pyarrow)
})
.collect::<PyResult<Vec<_>>>()
}

#[pyfunction]
pub fn read_parquet_schema(
py: Python,
Expand Down Expand Up @@ -183,7 +232,7 @@ pub mod pylib {
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_into_pyarrow))?;

parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_into_pyarrow_bulk))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_bulk))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_schema))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_statistics))?;
Expand Down
61 changes: 59 additions & 2 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ pub fn read_parquet(
})
}
pub type ArrowChunk = Vec<Box<dyn arrow2::array::Array>>;

pub type ParquetPyarrowChunk = (arrow2::datatypes::SchemaRef, Vec<ArrowChunk>);
#[allow(clippy::too_many_arguments)]
pub fn read_parquet_into_pyarrow(
uri: &str,
Expand All @@ -296,7 +296,7 @@ pub fn read_parquet_into_pyarrow(
io_client: Arc<IOClient>,
multithreaded_io: bool,
schema_infer_options: &ParquetSchemaInferenceOptions,
) -> DaftResult<(arrow2::datatypes::SchemaRef, Vec<ArrowChunk>)> {
) -> DaftResult<ParquetPyarrowChunk> {
let runtime_handle = get_runtime(multithreaded_io)?;
let _rt_guard = runtime_handle.enter();
runtime_handle.block_on(async {
Expand Down Expand Up @@ -370,6 +370,63 @@ pub fn read_parquet_bulk(
tables.into_iter().collect::<DaftResult<Vec<_>>>()
}

#[allow(clippy::too_many_arguments)]
pub fn read_parquet_into_pyarrow_bulk(
uris: &[&str],
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
io_client: Arc<IOClient>,
multithreaded_io: bool,
schema_infer_options: &ParquetSchemaInferenceOptions,
) -> DaftResult<Vec<ParquetPyarrowChunk>> {
let runtime_handle = get_runtime(multithreaded_io)?;
let _rt_guard = runtime_handle.enter();
let owned_columns = columns.map(|s| s.iter().map(|v| String::from(*v)).collect::<Vec<_>>());
if let Some(ref row_groups) = row_groups {
if row_groups.len() != uris.len() {
return Err(common_error::DaftError::ValueError(format!(
"Mismatch of length of `uris` and `row_groups`. {} vs {}",
uris.len(),
row_groups.len()
)));
}
}
let tables = runtime_handle
.block_on(async move {
try_join_all(uris.iter().enumerate().map(|(i, uri)| {
let uri = uri.to_string();
let owned_columns = owned_columns.clone();
let owned_row_group = match &row_groups {
None => None,
Some(v) => v.get(i).cloned(),
};

let io_client = io_client.clone();
let schema_infer_options = schema_infer_options.clone();
tokio::task::spawn(async move {
let columns = owned_columns
.as_ref()
.map(|s| s.iter().map(AsRef::as_ref).collect::<Vec<_>>());
read_parquet_single_into_arrow(
&uri,
columns.as_deref(),
start_offset,
num_rows,
owned_row_group.as_deref(),
io_client,
&schema_infer_options,
)
.await
})
}))
.await
})
.context(JoinSnafu { path: "UNKNOWN" })?;
tables.into_iter().collect::<DaftResult<Vec<_>>>()
}

pub fn read_parquet_schema(
uri: &str,
io_client: Arc<IOClient>,
Expand Down
39 changes: 39 additions & 0 deletions tests/integration/io/parquet/test_reads_public_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,26 @@ def test_parquet_read_table_bulk(parquet_file, public_storage_io_config, multith
pd.testing.assert_frame_equal(daft_native_read.to_pandas(), pa_read.to_pandas())


@pytest.mark.integration()
@pytest.mark.skipif(
daft.context.get_context().use_rust_planner, reason="Custom fsspec filesystems not supported in new query planner"
)
@pytest.mark.parametrize(
"multithreaded_io",
[False, True],
)
def test_parquet_into_pyarrow_bulk(parquet_file, public_storage_io_config, multithreaded_io):
_, url = parquet_file
daft_native_reads = daft.table.read_parquet_into_pyarrow_bulk(
[url] * 2, io_config=public_storage_io_config, multithreaded_io=multithreaded_io
)
pa_read = read_parquet_with_pyarrow(url)

for daft_native_read in daft_native_reads:
assert daft_native_read.schema == pa_read.schema
pd.testing.assert_frame_equal(daft_native_read.to_pandas(), pa_read.to_pandas())


@pytest.mark.integration()
def test_parquet_read_df(parquet_file, public_storage_io_config):
_, url = parquet_file
Expand Down Expand Up @@ -321,3 +341,22 @@ def test_row_groups_selection_bulk(public_storage_io_config, multithreaded_io):
for i, t in enumerate(rest):
assert len(t) == 10
assert first.to_arrow()[i * 10 : (i + 1) * 10] == t.to_arrow()


@pytest.mark.integration()
@pytest.mark.parametrize(
"multithreaded_io",
[False, True],
)
def test_row_groups_selection_into_pyarrow_bulk(public_storage_io_config, multithreaded_io):
url = ["s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"] * 11
row_groups = [list(range(10))] + [[i] for i in range(10)]
first, *rest = daft.table.read_parquet_into_pyarrow_bulk(
url, io_config=public_storage_io_config, multithreaded_io=multithreaded_io, row_groups_per_path=row_groups
)
assert len(first) == 100
assert len(rest) == 10

for i, t in enumerate(rest):
assert len(t) == 10
assert first[i * 10 : (i + 1) * 10] == t
Loading