diff --git a/Cargo.lock b/Cargo.lock index cccda23a76..ed902a8f79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1341,6 +1341,7 @@ dependencies = [ "bytes", "common-error", "daft-core", + "daft-dsl", "daft-io", "daft-stats", "daft-table", diff --git a/daft/daft.pyi b/daft/daft.pyi index 0128dedafe..9a48ccef87 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -570,6 +570,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, + predicate: PyExpr | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: PyTimeUnit | None = None, @@ -580,6 +581,7 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[list[int] | None] | None = None, + predicate: PyExpr | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = 128, multithreaded_io: bool | None = None, @@ -963,6 +965,7 @@ class PyMicroPartition: start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, + predicate: PyExpr | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: PyTimeUnit = PyTimeUnit.nanoseconds(), @@ -975,6 +978,7 @@ class PyMicroPartition: start_offset: int | None = None, num_rows: int | None = None, row_groups: list[list[int] | None] | None = None, + predicate: PyExpr | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = None, multithreaded_io: bool | None = None, diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index 1ebccb51d0..8b59283b6d 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -313,6 +313,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, + predicate: Expression | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), @@ -324,6 +325,7 @@ def read_parquet( start_offset, num_rows, row_groups, + predicate._expr if predicate is not None else None, io_config, multithreaded_io, coerce_int96_timestamp_unit._timeunit, @@ -338,6 +340,7 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, row_groups_per_path: list[list[int] | None] | None = None, + predicate: Expression | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = 128, multithreaded_io: bool | None = None, @@ -350,6 +353,7 @@ def read_parquet_bulk( start_offset, num_rows, row_groups_per_path, + predicate._expr if predicate is not None else None, io_config, num_parallel_tasks, multithreaded_io, diff --git a/daft/table/table.py b/daft/table/table.py index 6b87c2ee03..314662a2e4 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -392,6 +392,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, + predicate: Expression | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), @@ -403,6 +404,7 @@ def read_parquet( start_offset=start_offset, num_rows=num_rows, row_groups=row_groups, + predicate=predicate._expr if predicate is not None else None, io_config=io_config, multithreaded_io=multithreaded_io, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, @@ -417,6 +419,7 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, row_groups_per_path: list[list[int] | None] | None = None, + predicate: Expression | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = 128, multithreaded_io: bool | None = None, @@ -428,6 +431,7 @@ def read_parquet_bulk( start_offset=start_offset, num_rows=num_rows, row_groups=row_groups_per_path, + predicate=predicate._expr if predicate is not None else None, io_config=io_config, num_parallel_tasks=num_parallel_tasks, multithreaded_io=multithreaded_io, diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index c662bcd6f7..94d81ed3de 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -8,6 +8,7 @@ use common_error::DaftResult; use daft_core::schema::{Schema, SchemaRef}; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; +use daft_dsl::ExprRef; use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_parquet::read::{ read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions, @@ -120,6 +121,7 @@ fn materialize_scan_task( None, scan_task.pushdowns.limit, row_groups, + scan_task.pushdowns.filters.clone(), io_client.clone(), io_stats, 8, @@ -410,6 +412,7 @@ impl MicroPartition { None, scan_task.pushdowns.limit, row_groups, + scan_task.pushdowns.filters.clone(), cfg.io_config .clone() .map(|c| Arc::new(c.clone())) @@ -707,6 +710,7 @@ pub(crate) fn read_parquet_into_micropartition( start_offset: Option, num_rows: Option, row_groups: Option>>>, + predicate: Option, io_config: Arc, io_stats: Option, num_parallel_tasks: usize, @@ -720,6 +724,46 @@ pub(crate) fn read_parquet_into_micropartition( // Run the required I/O to retrieve all the Parquet FileMetaData let runtime_handle = daft_io::get_runtime(multithreaded_io)?; let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?; + + if let Some(predicate) = predicate { + // We have a predicate, so we will perform eager read only reading what row groups we need. + let all_tables = read_parquet_bulk( + uris, + columns, + None, + num_rows, + row_groups, + Some(predicate.clone()), + io_client, + io_stats, + num_parallel_tasks, + runtime_handle, + schema_infer_options, + )?; + + let unioned_schema = all_tables + .iter() + .map(|t| t.schema.clone()) + .try_reduce(|l, r| DaftResult::Ok(l.union(&r)?.into()))?; + let full_daft_schema = unioned_schema.expect("we need at least 1 schema"); + // Hack to avoid to owned schema + let full_daft_schema = Schema { + fields: full_daft_schema.fields.clone(), + }; + let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?; + + let all_tables = all_tables + .into_iter() + .map(|t| t.cast_to_schema(&pruned_daft_schema)) + .collect::>>()?; + // TODO: we can pass in stats here to optimize downstream workloads such as join + return Ok(MicroPartition::new_loaded( + Arc::new(pruned_daft_schema), + all_tables.into(), + None, + )); + } + let meta_io_client = io_client.clone(); let meta_io_stats = io_stats.clone(); let metadata = runtime_handle.block_on(async move { @@ -847,6 +891,7 @@ pub(crate) fn read_parquet_into_micropartition( start_offset, num_rows, row_groups, + None, io_client, io_stats, num_parallel_tasks, diff --git a/src/daft-micropartition/src/ops/filter.rs b/src/daft-micropartition/src/ops/filter.rs index b49df69c95..88ab516de9 100644 --- a/src/daft-micropartition/src/ops/filter.rs +++ b/src/daft-micropartition/src/ops/filter.rs @@ -8,7 +8,7 @@ use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu}; use daft_stats::TruthValue; impl MicroPartition { - pub fn filter(&self, predicate: &[Expr]) -> DaftResult { + pub fn filter>(&self, predicate: &[E]) -> DaftResult { let io_stats = IOStatsContext::new("MicroPartition::filter"); if predicate.is_empty() { return Ok(Self::empty(Some(self.schema.clone()))); @@ -16,7 +16,7 @@ impl MicroPartition { if let Some(statistics) = &self.statistics { let folded_expr = predicate .iter() - .cloned() + .map(|e| e.as_ref().clone()) .reduce(|a, b| a.and(&b)) .expect("should have at least 1 expr"); let eval_result = statistics.eval_expression(&folded_expr)?; diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 7e398107c2..ab6c8cdbf5 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -433,6 +433,7 @@ impl PyMicroPartition { start_offset: Option, num_rows: Option, row_groups: Option>, + predicate: Option, io_config: Option, multithreaded_io: Option, coerce_int96_timestamp_unit: Option, @@ -451,6 +452,7 @@ impl PyMicroPartition { start_offset, num_rows, row_groups.map(|rg| vec![Some(rg)]), + predicate.map(|e| e.expr.into()), io_config, Some(io_stats), 1, @@ -470,6 +472,7 @@ impl PyMicroPartition { start_offset: Option, num_rows: Option, row_groups: Option>>>, + predicate: Option, io_config: Option, num_parallel_tasks: Option, multithreaded_io: Option, @@ -489,6 +492,7 @@ impl PyMicroPartition { start_offset, num_rows, row_groups, + predicate.map(|e| e.expr.into()), io_config, Some(io_stats), num_parallel_tasks.unwrap_or(128) as usize, diff --git a/src/daft-parquet/Cargo.toml b/src/daft-parquet/Cargo.toml index b806818acf..f1e8b85aad 100644 --- a/src/daft-parquet/Cargo.toml +++ b/src/daft-parquet/Cargo.toml @@ -5,6 +5,7 @@ async-stream = {workspace = true} bytes = {workspace = true} common-error = {path = "../common/error", default-features = false} daft-core = {path = "../daft-core", default-features = false} +daft-dsl = {path = "../daft-dsl", default-features = false} daft-io = {path = "../daft-io", default-features = false} daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} @@ -24,7 +25,7 @@ tokio-util = {workspace = true} [features] default = ["python"] -python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python"] +python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python", "daft-dsl/python"] [package] edition = {workspace = true} diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 7ab626dd82..6f93ca7758 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -2,8 +2,10 @@ use std::{collections::HashSet, sync::Arc}; use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; -use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_dsl::ExprRef; use daft_io::{IOClient, IOStatsRef}; +use daft_stats::TruthValue; use daft_table::Table; use futures::{future::try_join_all, StreamExt}; use parquet2::{ @@ -17,8 +19,9 @@ use crate::{ metadata::read_parquet_metadata, read::ParquetSchemaInferenceOptions, read_planner::{CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass}, - JoinSnafu, OneShotRecvSnafu, UnableToCreateParquetPageStreamSnafu, - UnableToParseSchemaFromMetadataSnafu, + statistics, JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu, + UnableToConvertSchemaToDaftSnafu, UnableToCreateParquetPageStreamSnafu, + UnableToParseSchemaFromMetadataSnafu, UnableToRunExpressionOnStatsSnafu, }; use arrow2::io::parquet::read::column_iter_to_arrays; @@ -27,9 +30,10 @@ pub(crate) struct ParquetReaderBuilder { pub metadata: parquet2::metadata::FileMetaData, selected_columns: Option>, row_start_offset: usize, - num_rows: usize, + limit: Option, row_groups: Option>, schema_inference_options: ParquetSchemaInferenceOptions, + predicate: Option, } use parquet2::read::decompress; @@ -92,16 +96,20 @@ where } pub(crate) fn build_row_ranges( - num_rows: usize, + limit: Option, row_start_offset: usize, row_groups: Option<&[i64]>, + predicate: Option, + schema: &Schema, metadata: &parquet2::metadata::FileMetaData, uri: &str, ) -> super::Result> { + let limit = limit.map(|v| v as i64); let mut row_ranges = vec![]; let mut curr_row_index = 0; - let mut rows_to_add = num_rows; + if let Some(row_groups) = row_groups { + let mut rows_to_add: i64 = limit.unwrap_or(i64::MAX); for i in row_groups { let i = *i as usize; if !(0..metadata.row_groups.len()).contains(&i) { @@ -111,26 +119,63 @@ pub(crate) fn build_row_ranges( total_row_groups: metadata.row_groups.len() as i64, }); } + if rows_to_add <= 0 { + break; + } let rg = metadata.row_groups.get(i).unwrap(); + if let Some(ref pred) = predicate { + let stats = statistics::row_group_metadata_to_table_stats(rg, schema) + .with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu { + path: uri.to_string(), + })?; + + let evaled = stats.eval_expression(pred).with_context(|_| { + UnableToRunExpressionOnStatsSnafu { + path: uri.to_string(), + } + })?; + if evaled.to_truth_value() == TruthValue::False { + continue; + } + } + let range_to_add = RowGroupRange { row_group_index: i, start: 0, - num_rows: rg.num_rows(), + num_rows: rg.num_rows().min(rows_to_add as usize), }; + rows_to_add = rows_to_add.saturating_sub((rg.num_rows() as i64).min(rows_to_add)); row_ranges.push(range_to_add); } } else { + let mut rows_to_add = limit.unwrap_or(metadata.num_rows as i64); + for (i, rg) in metadata.row_groups.iter().enumerate() { if (curr_row_index + rg.num_rows()) < row_start_offset { curr_row_index += rg.num_rows(); continue; } else if rows_to_add > 0 { + if let Some(ref pred) = predicate { + let stats = statistics::row_group_metadata_to_table_stats(rg, schema) + .with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu { + path: uri.to_string(), + })?; + let evaled = stats.eval_expression(pred).with_context(|_| { + UnableToRunExpressionOnStatsSnafu { + path: uri.to_string(), + } + })?; + if evaled.to_truth_value() == TruthValue::False { + curr_row_index += rg.num_rows(); + continue; + } + } let range_to_add = RowGroupRange { row_group_index: i, start: row_start_offset.saturating_sub(curr_row_index), - num_rows: rg.num_rows().min(rows_to_add), + num_rows: rg.num_rows().min(rows_to_add as usize), }; - rows_to_add = rows_to_add.saturating_sub(rg.num_rows().min(rows_to_add)); + rows_to_add = rows_to_add.saturating_sub((rg.num_rows() as i64).min(rows_to_add)); row_ranges.push(range_to_add); } else { break; @@ -152,15 +197,15 @@ impl ParquetReaderBuilder { .single_url_get_size(uri.into(), io_stats.clone()) .await?; let metadata = read_parquet_metadata(uri, size, io_client, io_stats).await?; - let num_rows = metadata.num_rows; Ok(ParquetReaderBuilder { uri: uri.into(), metadata, selected_columns: None, row_start_offset: 0, - num_rows, + limit: None, row_groups: None, schema_inference_options: Default::default(), + predicate: None, }) } @@ -172,7 +217,7 @@ impl ParquetReaderBuilder { self.metadata().schema() } - pub fn prune_columns(mut self, columns: &[&str]) -> super::Result { + pub fn prune_columns>(mut self, columns: &[S]) -> super::Result { let avail_names = self .parquet_schema() .fields() @@ -181,7 +226,7 @@ impl ParquetReaderBuilder { .collect::>(); let mut names_to_keep = HashSet::new(); for col_name in columns { - if avail_names.contains(col_name) { + if avail_names.contains(col_name.as_ref()) { names_to_keep.insert(col_name.to_string()); } else { return Err(super::Error::FieldNotFound { @@ -201,9 +246,8 @@ impl ParquetReaderBuilder { num_rows: Option, ) -> super::Result { let start_offset = start_offset.unwrap_or(0); - let num_rows = num_rows.unwrap_or(self.metadata.num_rows.saturating_sub(start_offset)); self.row_start_offset = start_offset; - self.num_rows = num_rows; + self.limit = num_rows; Ok(self) } @@ -217,15 +261,13 @@ impl ParquetReaderBuilder { self } - pub fn build(self) -> super::Result { - let row_ranges = build_row_ranges( - self.num_rows, - self.row_start_offset, - self.row_groups.as_deref(), - &self.metadata, - &self.uri, - )?; + pub fn set_filter(mut self, predicate: ExprRef) -> Self { + assert_eq!(self.limit, None); + self.predicate = Some(predicate); + self + } + pub fn build(self) -> super::Result { let mut arrow_schema = infer_schema_with_options(&self.metadata, &Some(self.schema_inference_options.into())) .context(UnableToParseSchemaFromMetadataSnafu:: { @@ -237,6 +279,19 @@ impl ParquetReaderBuilder { .fields .retain(|f| names_to_keep.contains(f.name.as_str())); } + let daft_schema = + Schema::try_from(&arrow_schema).with_context(|_| UnableToConvertSchemaToDaftSnafu { + path: self.uri.to_string(), + })?; + let row_ranges = build_row_ranges( + self.limit, + self.row_start_offset, + self.row_groups.as_deref(), + self.predicate.clone(), + &daft_schema, + &self.metadata, + &self.uri, + )?; ParquetFileReader::new(self.uri, self.metadata, arrow_schema, row_ranges) } diff --git a/src/daft-parquet/src/lib.rs b/src/daft-parquet/src/lib.rs index ff0dc57691..637ab2cf91 100644 --- a/src/daft-parquet/src/lib.rs +++ b/src/daft-parquet/src/lib.rs @@ -70,6 +70,14 @@ pub enum Error { path: String, source: arrow2::error::Error, }, + + #[snafu(display( + "Unable to convert arrow schema to daft schema for file {}: {}", + path, + source + ))] + UnableToConvertSchemaToDaft { path: String, source: DaftError }, + #[snafu(display( "Field: {} not found in Parquet File: {} Available Fields: {:?}", field, @@ -147,6 +155,21 @@ pub enum Error { read_columns: usize, }, + #[snafu(display( + "Parquet file: {} unable to convert row group metadata to stats\nDetails:\n{source}", + path, + ))] + UnableToConvertRowGroupMetadataToStats { path: String, source: DaftError }, + + #[snafu(display( + "Parquet file: {} unable to evaluate predicate on stats\nDetails:\n{source}", + path, + ))] + UnableToRunExpressionOnStats { + path: String, + source: daft_stats::Error, + }, + #[snafu(display("Error joining spawned task: {} for path: {}", source, path))] JoinError { path: String, diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 73f202feb0..10f62ec09a 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -5,6 +5,7 @@ pub mod pylib { ffi::field_to_py, python::{datatype::PyTimeUnit, schema::PySchema, PySeries}, }; + use daft_dsl::python::PyExpr; use daft_io::{get_io_client, python::IOConfig, IOStatsContext}; use daft_table::python::PyTable; use pyo3::{pyfunction, types::PyModule, PyResult, Python}; @@ -21,6 +22,7 @@ pub mod pylib { start_offset: Option, num_rows: Option, row_groups: Option>, + predicate: Option, io_config: Option, multithreaded_io: Option, coerce_int96_timestamp_unit: Option, @@ -43,6 +45,7 @@ pub mod pylib { start_offset, num_rows, row_groups, + predicate.map(|e| e.expr.into()), io_client, Some(io_stats.clone()), runtime_handle, @@ -127,6 +130,7 @@ pub mod pylib { start_offset: Option, num_rows: Option, row_groups: Option>>>, + predicate: Option, io_config: Option, num_parallel_tasks: Option, multithreaded_io: Option, @@ -150,6 +154,7 @@ pub mod pylib { start_offset, num_rows, row_groups, + predicate.map(|e| e.expr.into()), io_client, Some(io_stats), num_parallel_tasks.unwrap_or(128) as usize, diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 7311b9b1fa..8e482cffb9 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -7,6 +7,7 @@ use daft_core::{ schema::Schema, DataType, IntoSeries, Series, }; +use daft_dsl::{optimization::get_required_columns, ExprRef}; use daft_io::{get_runtime, parse_url, IOClient, IOStatsRef, SourceType}; use daft_table::Table; use futures::{ @@ -61,18 +62,39 @@ async fn read_parquet_single( start_offset: Option, num_rows: Option, row_groups: Option>, + predicate: Option, io_client: Arc, io_stats: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult { + let original_columns = columns; + let original_num_rows = num_rows; + let mut num_rows = num_rows; + let mut columns = columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()); + let requested_columns = columns.as_ref().map(|v| v.len()); + if let Some(ref pred) = predicate { + if num_rows.is_some() { + num_rows = None; + } + if let Some(req_columns) = columns.as_mut() { + let needed_columns = get_required_columns(pred); + for c in needed_columns { + if !req_columns.contains(&c) { + req_columns.push(c); + } + } + } + } let (source_type, fixed_uri) = parse_url(uri)?; - let (metadata, table) = if matches!(source_type, SourceType::File) { + + let (metadata, mut table) = if matches!(source_type, SourceType::File) { crate::stream_reader::local_parquet_read_async( fixed_uri.as_ref(), - columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()), + columns, start_offset, num_rows, row_groups.clone(), + predicate.clone(), schema_infer_options, ) .await @@ -81,8 +103,8 @@ async fn read_parquet_single( ParquetReaderBuilder::from_uri(uri, io_client.clone(), io_stats.clone()).await?; let builder = builder.set_infer_schema_options(schema_infer_options); - let builder = if let Some(columns) = columns { - builder.prune_columns(columns)? + let builder = if let Some(columns) = columns.as_ref() { + builder.prune_columns(columns.as_slice())? } else { builder }; @@ -99,6 +121,12 @@ async fn read_parquet_single( builder }; + let builder = if let Some(ref predicate) = predicate { + builder.set_filter(predicate.clone()) + } else { + builder + }; + let parquet_reader = builder.build()?; let ranges = parquet_reader.prebuffer_ranges(io_client, io_stats)?; Ok(( @@ -117,7 +145,16 @@ async fn read_parquet_single( let metadata_num_columns = metadata.schema().fields().len(); - if let Some(row_groups) = row_groups { + if let Some(predicate) = predicate { + // TODO ideally pipeline this with IO and before concating, rather than after + table = table.filter(&[predicate])?; + if let Some(oc) = original_columns { + table = table.get_columns(oc)?; + } + if let Some(nr) = original_num_rows { + table = table.head(nr)?; + } + } else if let Some(row_groups) = row_groups { let expected_rows: usize = row_groups .iter() .map(|i| rows_per_row_groups.get(*i as usize).unwrap()) @@ -153,10 +190,10 @@ async fn read_parquet_single( }), _ => Ok(()), }?; - }; + } - let expected_num_columns = if let Some(columns) = columns { - columns.len() + let expected_num_columns = if let Some(columns) = requested_columns { + columns } else { metadata_num_columns }; @@ -193,6 +230,7 @@ async fn read_parquet_single_into_arrow( start_offset, num_rows, row_groups.clone(), + None, schema_infer_options, ) .await?; @@ -316,6 +354,7 @@ pub fn read_parquet( start_offset: Option, num_rows: Option, row_groups: Option>, + predicate: Option, io_client: Arc, io_stats: Option, runtime_handle: Arc, @@ -329,6 +368,7 @@ pub fn read_parquet( start_offset, num_rows, row_groups, + predicate, io_client, io_stats, schema_infer_options, @@ -373,6 +413,7 @@ pub fn read_parquet_bulk( start_offset: Option, num_rows: Option, row_groups: Option>>>, + predicate: Option, io_client: Arc, io_stats: Option, num_parallel_tasks: usize, @@ -396,6 +437,7 @@ pub fn read_parquet_bulk( let uri = uri.to_string(); let owned_columns = owned_columns.clone(); let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone()); + let owned_predicate = predicate.clone(); let io_client = io_client.clone(); let io_stats = io_stats.clone(); @@ -412,6 +454,7 @@ pub fn read_parquet_bulk( start_offset, num_rows, owned_row_group, + owned_predicate, io_client, io_stats, schema_infer_options, @@ -643,6 +686,7 @@ mod tests { None, None, None, + None, io_client, None, runtime_handle, diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index e0e7261d4c..982be9c2da 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -2,7 +2,8 @@ use std::{collections::HashSet, fs::File}; use arrow2::io::parquet::read; use common_error::DaftResult; -use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_dsl::ExprRef; use daft_table::Table; use itertools::Itertools; use rayon::prelude::{IndexedParallelIterator, IntoParallelIterator, ParallelBridge}; @@ -11,6 +12,7 @@ use snafu::ResultExt; use crate::{ file::build_row_ranges, read::{ArrowChunk, ParquetSchemaInferenceOptions}, + UnableToConvertSchemaToDaftSnafu, }; use crate::stream_reader::read::schema::infer_schema_with_options; @@ -51,6 +53,7 @@ pub(crate) fn local_parquet_read_into_arrow( start_offset: Option, num_rows: Option, row_groups: Option<&[i64]>, + predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> super::Result<( parquet2::metadata::FileMetaData, @@ -90,14 +93,20 @@ pub(crate) fn local_parquet_read_into_arrow( path: uri.to_string(), })?; let schema = prune_fields_from_schema(schema, columns, uri)?; + let daft_schema = + Schema::try_from(&schema).with_context(|_| UnableToConvertSchemaToDaftSnafu { + path: uri.to_string(), + })?; let chunk_size = 128 * 1024; - let expected_rows = metadata.num_rows.min(num_rows.unwrap_or(metadata.num_rows)); + let max_rows = metadata.num_rows.min(num_rows.unwrap_or(metadata.num_rows)); - let num_expected_arrays = f32::ceil(expected_rows as f32 / chunk_size as f32) as usize; + let num_expected_arrays = f32::ceil(max_rows as f32 / chunk_size as f32) as usize; let row_ranges = build_row_ranges( - expected_rows, + num_rows, start_offset.unwrap_or(0), row_groups, + predicate, + &daft_schema, &metadata, uri, )?; @@ -173,6 +182,7 @@ pub(crate) async fn local_parquet_read_async( start_offset: Option, num_rows: Option, row_groups: Option>, + predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult<(parquet2::metadata::FileMetaData, Table)> { let (send, recv) = tokio::sync::oneshot::channel(); @@ -185,6 +195,7 @@ pub(crate) async fn local_parquet_read_async( start_offset, num_rows, row_groups.as_deref(), + predicate, schema_infer_options, ); let (metadata, schema, arrays) = v?; @@ -221,6 +232,7 @@ pub(crate) async fn local_parquet_read_into_arrow_async( start_offset: Option, num_rows: Option, row_groups: Option>, + predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> super::Result<( parquet2::metadata::FileMetaData, @@ -236,6 +248,7 @@ pub(crate) async fn local_parquet_read_into_arrow_async( start_offset, num_rows, row_groups.as_deref(), + predicate, schema_infer_options, ); let _ = send.send(v); diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index bcb7fb2e2f..31e2794afe 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -418,7 +418,7 @@ impl ScanExternalInfo { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct Pushdowns { /// Optional filters to apply to the source data. - pub filters: Option>>, + pub filters: Option, /// Optional columns to select from the source data. pub columns: Option>>, /// Optional number of rows to read. @@ -433,7 +433,7 @@ impl Default for Pushdowns { impl Pushdowns { pub fn new( - filters: Option>>, + filters: Option, columns: Option>>, limit: Option, ) -> Self { @@ -452,7 +452,7 @@ impl Pushdowns { } } - pub fn with_filters(&self, filters: Option>>) -> Self { + pub fn with_filters(&self, filters: Option) -> Self { Self { filters, columns: self.columns.clone(), @@ -474,14 +474,7 @@ impl Pushdowns { res.push(format!("Projection pushdown = [{}]", columns.join(", "))); } if let Some(filters) = &self.filters { - res.push(format!( - "Filter pushdown = [{}]", - filters - .iter() - .map(|f| f.to_string()) - .collect::>() - .join(", ") - )); + res.push(format!("Filter pushdown = {}", filters)); } if let Some(limit) = self.limit { res.push(format!("Limit pushdown = {}", limit)); diff --git a/src/daft-stats/src/table_stats.rs b/src/daft-stats/src/table_stats.rs index 35154ddf72..b44f8c4be4 100644 --- a/src/daft-stats/src/table_stats.rs +++ b/src/daft-stats/src/table_stats.rs @@ -1,5 +1,6 @@ use std::{fmt::Display, ops::Not}; +use common_error::DaftError; use daft_dsl::Expr; use daft_table::Table; use indexmap::{IndexMap, IndexSet}; @@ -82,9 +83,15 @@ impl TableStatistics { pub fn eval_expression(&self, expr: &Expr) -> crate::Result { match expr { Expr::Alias(col, _) => self.eval_expression(col.as_ref()), - Expr::Column(col) => { - let col = self.columns.get(col.as_ref()).unwrap(); - Ok(col.clone()) + Expr::Column(col_name) => { + let col = self.columns.get(col_name.as_ref()); + if let Some(col) = col { + Ok(col.clone()) + } else { + Err(crate::Error::DaftCoreCompute { + source: DaftError::FieldNotFound(col_name.to_string()), + }) + } } Expr::Literal(lit_value) => lit_value.try_into(), Expr::Not(col) => self.eval_expression(col)?.not(), diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs index c296d7cc57..feaeaaa0e8 100644 --- a/src/daft-table/src/lib.rs +++ b/src/daft-table/src/lib.rs @@ -174,16 +174,20 @@ impl Table { Ok(column_sizes?.iter().sum()) } - pub fn filter(&self, predicate: &[Expr]) -> DaftResult { + pub fn filter>(&self, predicate: &[E]) -> DaftResult { if predicate.is_empty() { Ok(self.clone()) } else if predicate.len() == 1 { - let mask = self.eval_expression(predicate.get(0).unwrap())?; + let mask = self.eval_expression(predicate.get(0).unwrap().as_ref())?; self.mask_filter(&mask) } else { - let mut expr = predicate.get(0).unwrap().and(predicate.get(1).unwrap()); + let mut expr = predicate + .get(0) + .unwrap() + .as_ref() + .and(predicate.get(1).unwrap().as_ref()); for i in 2..predicate.len() { - let next = predicate.get(i).unwrap(); + let next = predicate.get(i).unwrap().as_ref(); expr = expr.and(next); } let mask = self.eval_expression(&expr)?; diff --git a/tests/assets/parquet-data/sampled-tpch-with-stats.parquet b/tests/assets/parquet-data/sampled-tpch-with-stats.parquet new file mode 100644 index 0000000000..3a641a8b40 Binary files /dev/null and b/tests/assets/parquet-data/sampled-tpch-with-stats.parquet differ diff --git a/tests/integration/io/parquet/test_read_pushdowns.py b/tests/integration/io/parquet/test_read_pushdowns.py new file mode 100644 index 0000000000..40f227bb4b --- /dev/null +++ b/tests/integration/io/parquet/test_read_pushdowns.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from itertools import product + +import pytest + +import daft +from daft.table import MicroPartition + +PRED_PUSHDOWN_FILES = [ + "s3://daft-public-data/test_fixtures/parquet-dev/sampled-tpch-with-stats.parquet", + "tests/assets/parquet-data/sampled-tpch-with-stats.parquet", +] + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path, pred, limit", + product( + PRED_PUSHDOWN_FILES, + [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)], + [None, 1, 1000], + ), +) +def test_parquet_filter_pushdowns(path, pred, limit, aws_public_s3_config): + with_pushdown = MicroPartition.read_parquet(path, predicate=pred, num_rows=limit, io_config=aws_public_s3_config) + after = MicroPartition.read_parquet(path, io_config=aws_public_s3_config).filter([pred]) + if limit is not None: + after = after.head(limit) + assert with_pushdown.to_arrow() == after.to_arrow() + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path, pred", + product(PRED_PUSHDOWN_FILES, [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)]), +) +def test_parquet_filter_pushdowns_disjoint_predicate(path, pred, aws_public_s3_config): + with_pushdown = MicroPartition.read_parquet( + path, predicate=pred, columns=["L_QUANTITY"], io_config=aws_public_s3_config + ) + after = ( + MicroPartition.read_parquet(path, io_config=aws_public_s3_config) + .filter([pred]) + .eval_expression_list([daft.col("L_QUANTITY")]) + ) + assert with_pushdown.to_arrow() == after.to_arrow() + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path, pred", + product( + ["tests/assets/parquet-data/mvp.parquet", "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"], + [daft.col("a") == 1, daft.col("a") == 10000, daft.lit(True)], + ), +) +def test_parquet_filter_pushdowns_disjoint_predicate_no_stats(path, pred, aws_public_s3_config): + with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["b"], io_config=aws_public_s3_config) + after = ( + MicroPartition.read_parquet(path, io_config=aws_public_s3_config) + .filter([pred]) + .eval_expression_list([daft.col("b")]) + ) + assert with_pushdown.to_arrow() == after.to_arrow()