Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Enable Predicates in Parquet Reader #1702

Merged
merged 12 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ def read_parquet(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[int] | None = None,
predicate: PyExpr | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: PyTimeUnit | None = None,
Expand All @@ -580,6 +581,7 @@ def read_parquet_bulk(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[list[int] | None] | None = None,
predicate: PyExpr | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand Down Expand Up @@ -963,6 +965,7 @@ class PyMicroPartition:
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[int] | None = None,
predicate: PyExpr | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: PyTimeUnit = PyTimeUnit.nanoseconds(),
Expand All @@ -975,6 +978,7 @@ class PyMicroPartition:
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[list[int] | None] | None = None,
predicate: PyExpr | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = None,
multithreaded_io: bool | None = None,
Expand Down
4 changes: 4 additions & 0 deletions daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def read_parquet(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[int] | None = None,
predicate: Expression | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
Expand All @@ -324,6 +325,7 @@ def read_parquet(
start_offset,
num_rows,
row_groups,
predicate._expr if predicate is not None else None,
io_config,
multithreaded_io,
coerce_int96_timestamp_unit._timeunit,
Expand All @@ -338,6 +340,7 @@ def read_parquet_bulk(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
predicate: Expression | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand All @@ -350,6 +353,7 @@ def read_parquet_bulk(
start_offset,
num_rows,
row_groups_per_path,
predicate._expr if predicate is not None else None,
io_config,
num_parallel_tasks,
multithreaded_io,
Expand Down
4 changes: 4 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ def read_parquet(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[int] | None = None,
predicate: Expression | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
Expand All @@ -403,6 +404,7 @@ def read_parquet(
start_offset=start_offset,
num_rows=num_rows,
row_groups=row_groups,
predicate=predicate._expr if predicate is not None else None,
io_config=io_config,
multithreaded_io=multithreaded_io,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
Expand All @@ -417,6 +419,7 @@ def read_parquet_bulk(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
predicate: Expression | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand All @@ -428,6 +431,7 @@ def read_parquet_bulk(
start_offset=start_offset,
num_rows=num_rows,
row_groups=row_groups_per_path,
predicate=predicate._expr if predicate is not None else None,
io_config=io_config,
num_parallel_tasks=num_parallel_tasks,
multithreaded_io=multithreaded_io,
Expand Down
45 changes: 45 additions & 0 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use common_error::DaftResult;
use daft_core::schema::{Schema, SchemaRef};

use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions};
use daft_dsl::ExprRef;
use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions};
use daft_parquet::read::{
read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions,
Expand Down Expand Up @@ -120,6 +121,7 @@ fn materialize_scan_task(
None,
scan_task.pushdowns.limit,
row_groups,
scan_task.pushdowns.filters.clone(),
io_client.clone(),
io_stats,
8,
Expand Down Expand Up @@ -410,6 +412,7 @@ impl MicroPartition {
None,
scan_task.pushdowns.limit,
row_groups,
scan_task.pushdowns.filters.clone(),
cfg.io_config
.clone()
.map(|c| Arc::new(c.clone()))
Expand Down Expand Up @@ -707,6 +710,7 @@ pub(crate) fn read_parquet_into_micropartition(
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
predicate: Option<ExprRef>,
io_config: Arc<IOConfig>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
Expand All @@ -720,6 +724,46 @@ pub(crate) fn read_parquet_into_micropartition(
// Run the required I/O to retrieve all the Parquet FileMetaData
let runtime_handle = daft_io::get_runtime(multithreaded_io)?;
let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?;

if let Some(predicate) = predicate {
// We have a predicate, so we will perform eager read only reading what row groups we need.
let all_tables = read_parquet_bulk(
uris,
columns,
None,
num_rows,
row_groups,
Some(predicate.clone()),
io_client,
io_stats,
num_parallel_tasks,
runtime_handle,
schema_infer_options,
)?;

let unioned_schema = all_tables
.iter()
.map(|t| t.schema.clone())
.try_reduce(|l, r| DaftResult::Ok(l.union(&r)?.into()))?;
let full_daft_schema = unioned_schema.expect("we need at least 1 schema");
// Hack to avoid to owned schema
let full_daft_schema = Schema {
fields: full_daft_schema.fields.clone(),
};
let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?;

let all_tables = all_tables
.into_iter()
.map(|t| t.cast_to_schema(&pruned_daft_schema))
.collect::<DaftResult<Vec<_>>>()?;
// TODO: we can pass in stats here to optimize downstream workloads such as join
return Ok(MicroPartition::new_loaded(
Arc::new(pruned_daft_schema),
all_tables.into(),
None,
));
}

let meta_io_client = io_client.clone();
let meta_io_stats = io_stats.clone();
let metadata = runtime_handle.block_on(async move {
Expand Down Expand Up @@ -847,6 +891,7 @@ pub(crate) fn read_parquet_into_micropartition(
start_offset,
num_rows,
row_groups,
None,
io_client,
io_stats,
num_parallel_tasks,
Expand Down
4 changes: 2 additions & 2 deletions src/daft-micropartition/src/ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu};
use daft_stats::TruthValue;

impl MicroPartition {
pub fn filter(&self, predicate: &[Expr]) -> DaftResult<Self> {
pub fn filter<E: AsRef<Expr>>(&self, predicate: &[E]) -> DaftResult<Self> {
let io_stats = IOStatsContext::new("MicroPartition::filter");
if predicate.is_empty() {
return Ok(Self::empty(Some(self.schema.clone())));
}
if let Some(statistics) = &self.statistics {
let folded_expr = predicate
.iter()
.cloned()
.map(|e| e.as_ref().clone())
.reduce(|a, b| a.and(&b))
.expect("should have at least 1 expr");
let eval_result = statistics.eval_expression(&folded_expr)?;
Expand Down
4 changes: 4 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ impl PyMicroPartition {
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<i64>>,
predicate: Option<PyExpr>,
io_config: Option<IOConfig>,
multithreaded_io: Option<bool>,
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
Expand All @@ -451,6 +452,7 @@ impl PyMicroPartition {
start_offset,
num_rows,
row_groups.map(|rg| vec![Some(rg)]),
predicate.map(|e| e.expr.into()),
io_config,
Some(io_stats),
1,
Expand All @@ -470,6 +472,7 @@ impl PyMicroPartition {
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
predicate: Option<PyExpr>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
Expand All @@ -489,6 +492,7 @@ impl PyMicroPartition {
start_offset,
num_rows,
row_groups,
predicate.map(|e| e.expr.into()),
io_config,
Some(io_stats),
num_parallel_tasks.unwrap_or(128) as usize,
Expand Down
3 changes: 2 additions & 1 deletion src/daft-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ async-stream = {workspace = true}
bytes = {workspace = true}
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
Expand All @@ -24,7 +25,7 @@ tokio-util = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python"]
python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python", "daft-dsl/python"]

[package]
edition = {workspace = true}
Expand Down
Loading
Loading