From 6bf9417ee769c1b906b23abef52387ad376f53ba Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 4 Dec 2023 17:26:09 -0800 Subject: [PATCH 01/11] WIP row group prunning --- Cargo.lock | 1 + src/daft-parquet/Cargo.toml | 3 +- src/daft-parquet/src/file.rs | 92 +++++++++++++++++++++------ src/daft-parquet/src/lib.rs | 15 +++++ src/daft-parquet/src/stream_reader.rs | 13 ++-- 5 files changed, 98 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 72d6487017..274d1ad248 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1277,6 +1277,7 @@ dependencies = [ "bytes", "common-error", "daft-core", + "daft-dsl", "daft-io", "daft-stats", "daft-table", diff --git a/src/daft-parquet/Cargo.toml b/src/daft-parquet/Cargo.toml index b806818acf..f1e8b85aad 100644 --- a/src/daft-parquet/Cargo.toml +++ b/src/daft-parquet/Cargo.toml @@ -5,6 +5,7 @@ async-stream = {workspace = true} bytes = {workspace = true} common-error = {path = "../common/error", default-features = false} daft-core = {path = "../daft-core", default-features = false} +daft-dsl = {path = "../daft-dsl", default-features = false} daft-io = {path = "../daft-io", default-features = false} daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} @@ -24,7 +25,7 @@ tokio-util = {workspace = true} [features] default = ["python"] -python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python"] +python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python", "daft-dsl/python"] [package] edition = {workspace = true} diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 7ab626dd82..246069077d 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -2,8 +2,10 @@ use std::{collections::HashSet, sync::Arc}; use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; -use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_dsl::Expr; use daft_io::{IOClient, IOStatsRef}; +use daft_stats::TruthValue; use daft_table::Table; use futures::{future::try_join_all, StreamExt}; use parquet2::{ @@ -17,8 +19,9 @@ use crate::{ metadata::read_parquet_metadata, read::ParquetSchemaInferenceOptions, read_planner::{CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass}, - JoinSnafu, OneShotRecvSnafu, UnableToCreateParquetPageStreamSnafu, - UnableToParseSchemaFromMetadataSnafu, + statistics, JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu, + UnableToCreateParquetPageStreamSnafu, UnableToParseSchemaFromMetadataSnafu, + UnableToRunExpressionOnStatsSnafu, }; use arrow2::io::parquet::read::column_iter_to_arrays; @@ -27,9 +30,10 @@ pub(crate) struct ParquetReaderBuilder { pub metadata: parquet2::metadata::FileMetaData, selected_columns: Option>, row_start_offset: usize, - num_rows: usize, + limit: Option, row_groups: Option>, schema_inference_options: ParquetSchemaInferenceOptions, + predicate: Option, } use parquet2::read::decompress; @@ -92,16 +96,19 @@ where } pub(crate) fn build_row_ranges( - num_rows: usize, + limit: Option, row_start_offset: usize, row_groups: Option<&[i64]>, + predicate: Option<&Expr>, + schema: &Schema, metadata: &parquet2::metadata::FileMetaData, uri: &str, ) -> super::Result> { + let limit = limit.map(|v| v as i64); let mut row_ranges = vec![]; let mut curr_row_index = 0; - let mut rows_to_add = num_rows; if let Some(row_groups) = row_groups { + let mut rows_to_add: i64 = limit.unwrap_or(i64::MAX); for i in row_groups { let i = *i as usize; if !(0..metadata.row_groups.len()).contains(&i) { @@ -111,26 +118,62 @@ pub(crate) fn build_row_ranges( total_row_groups: metadata.row_groups.len() as i64, }); } + if rows_to_add <= 0 { + break; + } let rg = metadata.row_groups.get(i).unwrap(); + if let Some(pred) = predicate { + let stats = statistics::row_group_metadata_to_table_stats(rg, schema) + .with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu { + path: uri.to_string(), + })?; + let evaled = stats.eval_expression(pred).with_context(|_| { + UnableToRunExpressionOnStatsSnafu { + path: uri.to_string(), + } + })?; + if evaled.to_truth_value() == TruthValue::False { + continue; + } + } + let range_to_add = RowGroupRange { row_group_index: i, start: 0, - num_rows: rg.num_rows(), + num_rows: rg.num_rows().min(rows_to_add as usize), }; + rows_to_add = rows_to_add.saturating_sub((rg.num_rows() as i64).min(rows_to_add)); row_ranges.push(range_to_add); } } else { + let mut rows_to_add = limit.unwrap_or(metadata.num_rows as i64); + for (i, rg) in metadata.row_groups.iter().enumerate() { if (curr_row_index + rg.num_rows()) < row_start_offset { curr_row_index += rg.num_rows(); continue; } else if rows_to_add > 0 { + if let Some(pred) = predicate { + let stats = statistics::row_group_metadata_to_table_stats(rg, schema) + .with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu { + path: uri.to_string(), + })?; + let evaled = stats.eval_expression(pred).with_context(|_| { + UnableToRunExpressionOnStatsSnafu { + path: uri.to_string(), + } + })?; + if evaled.to_truth_value() == TruthValue::False { + curr_row_index += rg.num_rows(); + continue; + } + } let range_to_add = RowGroupRange { row_group_index: i, start: row_start_offset.saturating_sub(curr_row_index), - num_rows: rg.num_rows().min(rows_to_add), + num_rows: rg.num_rows().min(rows_to_add as usize), }; - rows_to_add = rows_to_add.saturating_sub(rg.num_rows().min(rows_to_add)); + rows_to_add = rows_to_add.saturating_sub((rg.num_rows() as i64).min(rows_to_add)); row_ranges.push(range_to_add); } else { break; @@ -152,15 +195,15 @@ impl ParquetReaderBuilder { .single_url_get_size(uri.into(), io_stats.clone()) .await?; let metadata = read_parquet_metadata(uri, size, io_client, io_stats).await?; - let num_rows = metadata.num_rows; Ok(ParquetReaderBuilder { uri: uri.into(), metadata, selected_columns: None, row_start_offset: 0, - num_rows, + limit: None, row_groups: None, schema_inference_options: Default::default(), + predicate: None, }) } @@ -201,9 +244,8 @@ impl ParquetReaderBuilder { num_rows: Option, ) -> super::Result { let start_offset = start_offset.unwrap_or(0); - let num_rows = num_rows.unwrap_or(self.metadata.num_rows.saturating_sub(start_offset)); self.row_start_offset = start_offset; - self.num_rows = num_rows; + self.limit = num_rows; Ok(self) } @@ -217,15 +259,12 @@ impl ParquetReaderBuilder { self } - pub fn build(self) -> super::Result { - let row_ranges = build_row_ranges( - self.num_rows, - self.row_start_offset, - self.row_groups.as_deref(), - &self.metadata, - &self.uri, - )?; + pub fn set_filter(mut self, predicate: Expr) -> Self { + self.predicate = Some(predicate); + self + } + pub fn build(self) -> super::Result { let mut arrow_schema = infer_schema_with_options(&self.metadata, &Some(self.schema_inference_options.into())) .context(UnableToParseSchemaFromMetadataSnafu:: { @@ -237,6 +276,17 @@ impl ParquetReaderBuilder { .fields .retain(|f| names_to_keep.contains(f.name.as_str())); } + // DONT UNWRAP + let daft_schema = Schema::try_from(&arrow_schema).unwrap(); + let row_ranges = build_row_ranges( + self.limit, + self.row_start_offset, + self.row_groups.as_deref(), + self.predicate.as_ref(), + &daft_schema, + &self.metadata, + &self.uri, + )?; ParquetFileReader::new(self.uri, self.metadata, arrow_schema, row_ranges) } diff --git a/src/daft-parquet/src/lib.rs b/src/daft-parquet/src/lib.rs index ff0dc57691..87df58a1a8 100644 --- a/src/daft-parquet/src/lib.rs +++ b/src/daft-parquet/src/lib.rs @@ -147,6 +147,21 @@ pub enum Error { read_columns: usize, }, + #[snafu(display( + "Parquet file: {} unable to convert row group metadata to stats\nDetails:\n{source}", + path, + ))] + UnableToConvertRowGroupMetadataToStats { path: String, source: DaftError }, + + #[snafu(display( + "Parquet file: {} unable to evaluate predicate on stats\nDetails:\n{source}", + path, + ))] + UnableToRunExpressionOnStats { + path: String, + source: daft_stats::Error, + }, + #[snafu(display("Error joining spawned task: {} for path: {}", source, path))] JoinError { path: String, diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index e0e7261d4c..5cf4bea97a 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -1,8 +1,9 @@ +use core::num; use std::{collections::HashSet, fs::File}; use arrow2::io::parquet::read; use common_error::DaftResult; -use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; use daft_table::Table; use itertools::Itertools; use rayon::prelude::{IndexedParallelIterator, IntoParallelIterator, ParallelBridge}; @@ -90,14 +91,18 @@ pub(crate) fn local_parquet_read_into_arrow( path: uri.to_string(), })?; let schema = prune_fields_from_schema(schema, columns, uri)?; + let daft_schema = Schema::try_from(&schema).unwrap(); let chunk_size = 128 * 1024; - let expected_rows = metadata.num_rows.min(num_rows.unwrap_or(metadata.num_rows)); + let max_rows = metadata.num_rows.min(num_rows.unwrap_or(metadata.num_rows)); - let num_expected_arrays = f32::ceil(expected_rows as f32 / chunk_size as f32) as usize; + let num_expected_arrays = f32::ceil(max_rows as f32 / chunk_size as f32) as usize; let row_ranges = build_row_ranges( - expected_rows, + num_rows, start_offset.unwrap_or(0), row_groups, + // TODO THREAD IN PREDICATES + None, + &daft_schema, &metadata, uri, )?; From bbd52e065a2d92291d62f26a5535a8c01313ba7d Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 5 Dec 2023 17:34:48 -0800 Subject: [PATCH 02/11] e2e micropartition --- daft/table/micropartition.py | 4 + daft/table/table.py | 4 + src/daft-micropartition/src/micropartition.rs | 56 ++++++++++++ src/daft-micropartition/src/ops/filter.rs | 4 +- src/daft-micropartition/src/python.rs | 6 ++ src/daft-parquet/src/file.rs | 31 +++++-- src/daft-parquet/src/python.rs | 7 ++ src/daft-parquet/src/read.rs | 91 ++++++++++++------- src/daft-table/src/lib.rs | 12 ++- 9 files changed, 165 insertions(+), 50 deletions(-) diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index f931fda2bd..de14ab9921 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -310,6 +310,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, + predicates: list[Expression] | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), @@ -321,6 +322,7 @@ def read_parquet( start_offset, num_rows, row_groups, + [pred._expr for pred in predicates] if predicates is not None else None, io_config, multithreaded_io, coerce_int96_timestamp_unit._timeunit, @@ -335,6 +337,7 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, row_groups_per_path: list[list[int] | None] | None = None, + predicates: list[Expression] | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = 128, multithreaded_io: bool | None = None, @@ -347,6 +350,7 @@ def read_parquet_bulk( start_offset, num_rows, row_groups_per_path, + [pred._expr for pred in predicates] if predicates is not None else None, io_config, num_parallel_tasks, multithreaded_io, diff --git a/daft/table/table.py b/daft/table/table.py index 1669170e23..0ef1333a9e 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -383,6 +383,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, + predicates: list[Expression] | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), @@ -394,6 +395,7 @@ def read_parquet( start_offset=start_offset, num_rows=num_rows, row_groups=row_groups, + predicates=[pred._expr for pred in predicates] if predicates is not None else None, io_config=io_config, multithreaded_io=multithreaded_io, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, @@ -408,6 +410,7 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, row_groups_per_path: list[list[int] | None] | None = None, + predicates: list[Expression] | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = 128, multithreaded_io: bool | None = None, @@ -419,6 +422,7 @@ def read_parquet_bulk( start_offset=start_offset, num_rows=num_rows, row_groups=row_groups_per_path, + predicates=[pred._expr for pred in predicates] if predicates is not None else None, io_config=io_config, num_parallel_tasks=num_parallel_tasks, multithreaded_io=multithreaded_io, diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index b272023d46..bf7d216cbc 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -8,6 +8,7 @@ use common_error::DaftResult; use daft_core::schema::{Schema, SchemaRef}; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; +use daft_dsl::{Expr, ExprRef}; use daft_parquet::read::{ read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions, }; @@ -119,6 +120,11 @@ fn materialize_scan_task( None, scan_task.pushdowns.limit, row_groups, + scan_task + .pushdowns + .filters + .as_ref() + .map(|v| v.as_ref().clone()), io_client.clone(), io_stats, 8, @@ -387,6 +393,11 @@ impl MicroPartition { None, scan_task.pushdowns.limit, row_groups, + scan_task + .pushdowns + .filters + .as_ref() + .map(|v| v.as_ref().clone()), cfg.io_config .clone() .map(|c| Arc::new(c.clone())) @@ -635,6 +646,7 @@ pub(crate) fn read_parquet_into_micropartition( start_offset: Option, num_rows: Option, row_groups: Option>>>, + predicates: Option>, io_config: Arc, io_stats: Option, num_parallel_tasks: usize, @@ -648,6 +660,49 @@ pub(crate) fn read_parquet_into_micropartition( // Run the required I/O to retrieve all the Parquet FileMetaData let runtime_handle = daft_io::get_runtime(multithreaded_io)?; let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?; + + if let Some(ref predicates) = predicates { + // We have a predicate, so we will perform eager read with the predicate + // Since we currently + let all_tables = read_parquet_bulk( + uris, + columns, + None, + None, + row_groups, + Some(predicates.clone()), + io_client, + io_stats, + num_parallel_tasks, + runtime_handle, + schema_infer_options, + )?; + + let unioned_schema = all_tables + .iter() + .map(|t| t.schema.clone()) + .try_reduce(|l, r| DaftResult::Ok(l.union(&r)?.into()))?; + let full_daft_schema = unioned_schema.expect("we need at least 1 schema"); + // Hack to avoid to owned schema + let full_daft_schema = Schema { + fields: full_daft_schema.fields.clone(), + }; + let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?; + + let all_tables = all_tables + .into_iter() + .map(|t| t.cast_to_schema(&pruned_daft_schema)) + .collect::>>()?; + let loaded = + MicroPartition::new_loaded(Arc::new(pruned_daft_schema), all_tables.into(), None); + + if let Some(num_rows) = num_rows { + return loaded.head(num_rows); + } else { + return Ok(loaded); + } + } + let meta_io_client = io_client.clone(); let meta_io_stats = io_stats.clone(); let metadata = runtime_handle.block_on(async move { @@ -775,6 +830,7 @@ pub(crate) fn read_parquet_into_micropartition( start_offset, num_rows, row_groups, + None, io_client, io_stats, num_parallel_tasks, diff --git a/src/daft-micropartition/src/ops/filter.rs b/src/daft-micropartition/src/ops/filter.rs index b49df69c95..88ab516de9 100644 --- a/src/daft-micropartition/src/ops/filter.rs +++ b/src/daft-micropartition/src/ops/filter.rs @@ -8,7 +8,7 @@ use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu}; use daft_stats::TruthValue; impl MicroPartition { - pub fn filter(&self, predicate: &[Expr]) -> DaftResult { + pub fn filter>(&self, predicate: &[E]) -> DaftResult { let io_stats = IOStatsContext::new("MicroPartition::filter"); if predicate.is_empty() { return Ok(Self::empty(Some(self.schema.clone()))); @@ -16,7 +16,7 @@ impl MicroPartition { if let Some(statistics) = &self.statistics { let folded_expr = predicate .iter() - .cloned() + .map(|e| e.as_ref().clone()) .reduce(|a, b| a.and(&b)) .expect("should have at least 1 expr"); let eval_result = statistics.eval_expression(&folded_expr)?; diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index f2a6796ffc..56b8691673 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -405,6 +405,7 @@ impl PyMicroPartition { start_offset: Option, num_rows: Option, row_groups: Option>, + predicates: Option>, io_config: Option, multithreaded_io: Option, coerce_int96_timestamp_unit: Option, @@ -423,6 +424,8 @@ impl PyMicroPartition { start_offset, num_rows, row_groups.map(|rg| vec![Some(rg)]), + predicates + .map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::>()), io_config, Some(io_stats), 1, @@ -442,6 +445,7 @@ impl PyMicroPartition { start_offset: Option, num_rows: Option, row_groups: Option>>>, + predicates: Option>, io_config: Option, num_parallel_tasks: Option, multithreaded_io: Option, @@ -461,6 +465,8 @@ impl PyMicroPartition { start_offset, num_rows, row_groups, + predicates + .map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::>()), io_config, Some(io_stats), num_parallel_tasks.unwrap_or(128) as usize, diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 246069077d..e81332c25b 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, sync::Arc}; use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; -use daft_dsl::Expr; +use daft_dsl::{Expr, ExprRef}; use daft_io::{IOClient, IOStatsRef}; use daft_stats::TruthValue; use daft_table::Table; @@ -33,7 +33,7 @@ pub(crate) struct ParquetReaderBuilder { limit: Option, row_groups: Option>, schema_inference_options: ParquetSchemaInferenceOptions, - predicate: Option, + predicates: Option>, } use parquet2::read::decompress; @@ -99,7 +99,7 @@ pub(crate) fn build_row_ranges( limit: Option, row_start_offset: usize, row_groups: Option<&[i64]>, - predicate: Option<&Expr>, + predicates: Option<&[ExprRef]>, schema: &Schema, metadata: &parquet2::metadata::FileMetaData, uri: &str, @@ -107,6 +107,15 @@ pub(crate) fn build_row_ranges( let limit = limit.map(|v| v as i64); let mut row_ranges = vec![]; let mut curr_row_index = 0; + + let folded_expr = predicates.map(|preds| { + preds + .iter() + .cloned() + .reduce(|a, b| a.and(&b).into()) + .expect("should have at least 1 expr") + }); + if let Some(row_groups) = row_groups { let mut rows_to_add: i64 = limit.unwrap_or(i64::MAX); for i in row_groups { @@ -122,11 +131,12 @@ pub(crate) fn build_row_ranges( break; } let rg = metadata.row_groups.get(i).unwrap(); - if let Some(pred) = predicate { + if let Some(ref pred) = folded_expr { let stats = statistics::row_group_metadata_to_table_stats(rg, schema) .with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu { path: uri.to_string(), })?; + let evaled = stats.eval_expression(pred).with_context(|_| { UnableToRunExpressionOnStatsSnafu { path: uri.to_string(), @@ -153,7 +163,7 @@ pub(crate) fn build_row_ranges( curr_row_index += rg.num_rows(); continue; } else if rows_to_add > 0 { - if let Some(pred) = predicate { + if let Some(ref pred) = folded_expr { let stats = statistics::row_group_metadata_to_table_stats(rg, schema) .with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu { path: uri.to_string(), @@ -203,7 +213,7 @@ impl ParquetReaderBuilder { limit: None, row_groups: None, schema_inference_options: Default::default(), - predicate: None, + predicates: None, }) } @@ -259,8 +269,9 @@ impl ParquetReaderBuilder { self } - pub fn set_filter(mut self, predicate: Expr) -> Self { - self.predicate = Some(predicate); + pub fn set_filter(mut self, predicates: Vec) -> Self { + assert_eq!(self.limit, None); + self.predicates = Some(predicates); self } @@ -276,13 +287,13 @@ impl ParquetReaderBuilder { .fields .retain(|f| names_to_keep.contains(f.name.as_str())); } - // DONT UNWRAP + // TODO: DONT UNWRAP let daft_schema = Schema::try_from(&arrow_schema).unwrap(); let row_ranges = build_row_ranges( self.limit, self.row_start_offset, self.row_groups.as_deref(), - self.predicate.as_ref(), + self.predicates.as_deref(), &daft_schema, &self.metadata, &self.uri, diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 73f202feb0..05c63f5cb7 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -5,6 +5,7 @@ pub mod pylib { ffi::field_to_py, python::{datatype::PyTimeUnit, schema::PySchema, PySeries}, }; + use daft_dsl::python::PyExpr; use daft_io::{get_io_client, python::IOConfig, IOStatsContext}; use daft_table::python::PyTable; use pyo3::{pyfunction, types::PyModule, PyResult, Python}; @@ -21,6 +22,7 @@ pub mod pylib { start_offset: Option, num_rows: Option, row_groups: Option>, + predicates: Option>, io_config: Option, multithreaded_io: Option, coerce_int96_timestamp_unit: Option, @@ -43,6 +45,8 @@ pub mod pylib { start_offset, num_rows, row_groups, + predicates + .map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::>()), io_client, Some(io_stats.clone()), runtime_handle, @@ -127,6 +131,7 @@ pub mod pylib { start_offset: Option, num_rows: Option, row_groups: Option>>>, + predicates: Option>, io_config: Option, num_parallel_tasks: Option, multithreaded_io: Option, @@ -150,6 +155,8 @@ pub mod pylib { start_offset, num_rows, row_groups, + predicates + .map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::>()), io_client, Some(io_stats), num_parallel_tasks.unwrap_or(128) as usize, diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 7311b9b1fa..fa91f49ba1 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{fmt::format, sync::Arc}; use common_error::DaftResult; @@ -7,6 +7,7 @@ use daft_core::{ schema::Schema, DataType, IntoSeries, Series, }; +use daft_dsl::{Expr, ExprRef}; use daft_io::{get_runtime, parse_url, IOClient, IOStatsRef, SourceType}; use daft_table::Table; use futures::{ @@ -61,12 +62,18 @@ async fn read_parquet_single( start_offset: Option, num_rows: Option, row_groups: Option>, + predicates: Option>, io_client: Arc, io_stats: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult { + let pred_set = predicates.is_some(); + if pred_set && num_rows.is_some() { + return Err(common_error::DaftError::ValueError("Parquet Reader Currently doesn't support having both `num_rows` and `predicate` set at the same time".to_string())); + } let (source_type, fixed_uri) = parse_url(uri)?; - let (metadata, table) = if matches!(source_type, SourceType::File) { + let (metadata, mut table) = if matches!(source_type, SourceType::File) { + // TODO thread predicate to local parquet read crate::stream_reader::local_parquet_read_async( fixed_uri.as_ref(), columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()), @@ -99,6 +106,12 @@ async fn read_parquet_single( builder }; + let builder = if let Some(ref predicates) = predicates { + builder.set_filter(predicates.clone()) + } else { + builder + }; + let parquet_reader = builder.build()?; let ranges = parquet_reader.prebuffer_ranges(io_client, io_stats)?; Ok(( @@ -117,44 +130,48 @@ async fn read_parquet_single( let metadata_num_columns = metadata.schema().fields().len(); - if let Some(row_groups) = row_groups { - let expected_rows: usize = row_groups - .iter() - .map(|i| rows_per_row_groups.get(*i as usize).unwrap()) - .sum(); - if expected_rows != table.len() { - return Err(super::Error::ParquetNumRowMismatch { - path: uri.into(), - metadata_num_rows: expected_rows, - read_rows: table.len(), - } - .into()); - } + if let Some(predicates) = predicates { + // TODO ideally pipeline this with IO and before concating, rather than after + table = table.filter(predicates.as_slice())?; } else { - match (start_offset, num_rows) { - (None, None) if metadata_num_rows != table.len() => { - Err(super::Error::ParquetNumRowMismatch { + if let Some(row_groups) = row_groups { + let expected_rows: usize = row_groups + .iter() + .map(|i| rows_per_row_groups.get(*i as usize).unwrap()) + .sum(); + if expected_rows != table.len() { + return Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows, + metadata_num_rows: expected_rows, read_rows: table.len(), - }) + } + .into()); } - (Some(s), None) if metadata_num_rows.saturating_sub(s) != table.len() => { - Err(super::Error::ParquetNumRowMismatch { + } else { + match (start_offset, num_rows) { + (None, None) if metadata_num_rows != table.len() => { + Err(super::Error::ParquetNumRowMismatch { + path: uri.into(), + metadata_num_rows, + read_rows: table.len(), + }) + } + (Some(s), None) if metadata_num_rows.saturating_sub(s) != table.len() => { + Err(super::Error::ParquetNumRowMismatch { + path: uri.into(), + metadata_num_rows: metadata_num_rows.saturating_sub(s), + read_rows: table.len(), + }) + } + (_, Some(n)) if n < table.len() => Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows: metadata_num_rows.saturating_sub(s), + metadata_num_rows: n.min(metadata_num_rows), read_rows: table.len(), - }) - } - (_, Some(n)) if n < table.len() => Err(super::Error::ParquetNumRowMismatch { - path: uri.into(), - metadata_num_rows: n.min(metadata_num_rows), - read_rows: table.len(), - }), - _ => Ok(()), - }?; - }; - + }), + _ => Ok(()), + }?; + }; + } let expected_num_columns = if let Some(columns) = columns { columns.len() } else { @@ -316,6 +333,7 @@ pub fn read_parquet( start_offset: Option, num_rows: Option, row_groups: Option>, + predicates: Option>, io_client: Arc, io_stats: Option, runtime_handle: Arc, @@ -329,6 +347,7 @@ pub fn read_parquet( start_offset, num_rows, row_groups, + predicates, io_client, io_stats, schema_infer_options, @@ -373,6 +392,7 @@ pub fn read_parquet_bulk( start_offset: Option, num_rows: Option, row_groups: Option>>>, + predicates: Option>, io_client: Arc, io_stats: Option, num_parallel_tasks: usize, @@ -396,6 +416,7 @@ pub fn read_parquet_bulk( let uri = uri.to_string(); let owned_columns = owned_columns.clone(); let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone()); + let owned_predicates = predicates.clone(); let io_client = io_client.clone(); let io_stats = io_stats.clone(); @@ -412,6 +433,7 @@ pub fn read_parquet_bulk( start_offset, num_rows, owned_row_group, + owned_predicates, io_client, io_stats, schema_infer_options, @@ -643,6 +665,7 @@ mod tests { None, None, None, + None, io_client, None, runtime_handle, diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs index c296d7cc57..feaeaaa0e8 100644 --- a/src/daft-table/src/lib.rs +++ b/src/daft-table/src/lib.rs @@ -174,16 +174,20 @@ impl Table { Ok(column_sizes?.iter().sum()) } - pub fn filter(&self, predicate: &[Expr]) -> DaftResult { + pub fn filter>(&self, predicate: &[E]) -> DaftResult { if predicate.is_empty() { Ok(self.clone()) } else if predicate.len() == 1 { - let mask = self.eval_expression(predicate.get(0).unwrap())?; + let mask = self.eval_expression(predicate.get(0).unwrap().as_ref())?; self.mask_filter(&mask) } else { - let mut expr = predicate.get(0).unwrap().and(predicate.get(1).unwrap()); + let mut expr = predicate + .get(0) + .unwrap() + .as_ref() + .and(predicate.get(1).unwrap().as_ref()); for i in 2..predicate.len() { - let next = predicate.get(i).unwrap(); + let next = predicate.get(i).unwrap().as_ref(); expr = expr.and(next); } let mask = self.eval_expression(&expr)?; From 1d19fff30a321e50307b41bbdb5e23ec62dd7b45 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 5 Dec 2023 19:21:57 -0800 Subject: [PATCH 03/11] refactor pushdowns to use single Expr --- daft/daft.pyi | 4 ++++ daft/table/micropartition.py | 8 +++---- daft/table/table.py | 8 +++---- src/daft-micropartition/src/micropartition.rs | 18 ++++---------- src/daft-micropartition/src/python.rs | 10 ++++---- src/daft-parquet/src/file.rs | 24 +++++++------------ src/daft-parquet/src/python.rs | 10 ++++---- src/daft-parquet/src/read.rs | 22 ++++++++--------- src/daft-scan/src/lib.rs | 15 ++++-------- 9 files changed, 48 insertions(+), 71 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index 758ebad4d5..2dcca0bded 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -526,6 +526,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, + predicate: PyExpr | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: PyTimeUnit | None = None, @@ -536,6 +537,7 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[list[int] | None] | None = None, + predicate: PyExpr | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = 128, multithreaded_io: bool | None = None, @@ -904,6 +906,7 @@ class PyMicroPartition: start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, + predicate: PyExpr | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: PyTimeUnit = PyTimeUnit.nanoseconds(), @@ -916,6 +919,7 @@ class PyMicroPartition: start_offset: int | None = None, num_rows: int | None = None, row_groups: list[list[int] | None] | None = None, + predicate: PyExpr | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = None, multithreaded_io: bool | None = None, diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index de14ab9921..602ad5fca5 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -310,7 +310,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, - predicates: list[Expression] | None = None, + predicate: Expression | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), @@ -322,7 +322,7 @@ def read_parquet( start_offset, num_rows, row_groups, - [pred._expr for pred in predicates] if predicates is not None else None, + predicate._expr if predicate is not None else None, io_config, multithreaded_io, coerce_int96_timestamp_unit._timeunit, @@ -337,7 +337,7 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, row_groups_per_path: list[list[int] | None] | None = None, - predicates: list[Expression] | None = None, + predicate: Expression | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = 128, multithreaded_io: bool | None = None, @@ -350,7 +350,7 @@ def read_parquet_bulk( start_offset, num_rows, row_groups_per_path, - [pred._expr for pred in predicates] if predicates is not None else None, + predicate._expr if predicate is not None else None, io_config, num_parallel_tasks, multithreaded_io, diff --git a/daft/table/table.py b/daft/table/table.py index 0ef1333a9e..1c6ed7c9a5 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -383,7 +383,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, row_groups: list[int] | None = None, - predicates: list[Expression] | None = None, + predicate: Expression | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), @@ -395,7 +395,7 @@ def read_parquet( start_offset=start_offset, num_rows=num_rows, row_groups=row_groups, - predicates=[pred._expr for pred in predicates] if predicates is not None else None, + predicate=predicate._expr if predicate is not None else None, io_config=io_config, multithreaded_io=multithreaded_io, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, @@ -410,7 +410,7 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, row_groups_per_path: list[list[int] | None] | None = None, - predicates: list[Expression] | None = None, + predicate: Expression | None = None, io_config: IOConfig | None = None, num_parallel_tasks: int | None = 128, multithreaded_io: bool | None = None, @@ -422,7 +422,7 @@ def read_parquet_bulk( start_offset=start_offset, num_rows=num_rows, row_groups=row_groups_per_path, - predicates=[pred._expr for pred in predicates] if predicates is not None else None, + predicate=predicate._expr if predicate is not None else None, io_config=io_config, num_parallel_tasks=num_parallel_tasks, multithreaded_io=multithreaded_io, diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index bf7d216cbc..64b4ee0acc 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -120,11 +120,7 @@ fn materialize_scan_task( None, scan_task.pushdowns.limit, row_groups, - scan_task - .pushdowns - .filters - .as_ref() - .map(|v| v.as_ref().clone()), + scan_task.pushdowns.filters.clone(), io_client.clone(), io_stats, 8, @@ -393,11 +389,7 @@ impl MicroPartition { None, scan_task.pushdowns.limit, row_groups, - scan_task - .pushdowns - .filters - .as_ref() - .map(|v| v.as_ref().clone()), + scan_task.pushdowns.filters.clone(), cfg.io_config .clone() .map(|c| Arc::new(c.clone())) @@ -646,7 +638,7 @@ pub(crate) fn read_parquet_into_micropartition( start_offset: Option, num_rows: Option, row_groups: Option>>>, - predicates: Option>, + predicate: Option, io_config: Arc, io_stats: Option, num_parallel_tasks: usize, @@ -661,7 +653,7 @@ pub(crate) fn read_parquet_into_micropartition( let runtime_handle = daft_io::get_runtime(multithreaded_io)?; let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?; - if let Some(ref predicates) = predicates { + if let Some(predicate) = predicate { // We have a predicate, so we will perform eager read with the predicate // Since we currently let all_tables = read_parquet_bulk( @@ -670,7 +662,7 @@ pub(crate) fn read_parquet_into_micropartition( None, None, row_groups, - Some(predicates.clone()), + Some(predicate.clone()), io_client, io_stats, num_parallel_tasks, diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 56b8691673..1515fffe9f 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -405,7 +405,7 @@ impl PyMicroPartition { start_offset: Option, num_rows: Option, row_groups: Option>, - predicates: Option>, + predicate: Option, io_config: Option, multithreaded_io: Option, coerce_int96_timestamp_unit: Option, @@ -424,8 +424,7 @@ impl PyMicroPartition { start_offset, num_rows, row_groups.map(|rg| vec![Some(rg)]), - predicates - .map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::>()), + predicate.map(|e| e.expr.into()), io_config, Some(io_stats), 1, @@ -445,7 +444,7 @@ impl PyMicroPartition { start_offset: Option, num_rows: Option, row_groups: Option>>>, - predicates: Option>, + predicate: Option, io_config: Option, num_parallel_tasks: Option, multithreaded_io: Option, @@ -465,8 +464,7 @@ impl PyMicroPartition { start_offset, num_rows, row_groups, - predicates - .map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::>()), + predicate.map(|e| e.expr.into()), io_config, Some(io_stats), num_parallel_tasks.unwrap_or(128) as usize, diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index e81332c25b..3daa5298bb 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -33,7 +33,7 @@ pub(crate) struct ParquetReaderBuilder { limit: Option, row_groups: Option>, schema_inference_options: ParquetSchemaInferenceOptions, - predicates: Option>, + predicate: Option, } use parquet2::read::decompress; @@ -99,7 +99,7 @@ pub(crate) fn build_row_ranges( limit: Option, row_start_offset: usize, row_groups: Option<&[i64]>, - predicates: Option<&[ExprRef]>, + predicate: Option, schema: &Schema, metadata: &parquet2::metadata::FileMetaData, uri: &str, @@ -108,14 +108,6 @@ pub(crate) fn build_row_ranges( let mut row_ranges = vec![]; let mut curr_row_index = 0; - let folded_expr = predicates.map(|preds| { - preds - .iter() - .cloned() - .reduce(|a, b| a.and(&b).into()) - .expect("should have at least 1 expr") - }); - if let Some(row_groups) = row_groups { let mut rows_to_add: i64 = limit.unwrap_or(i64::MAX); for i in row_groups { @@ -131,7 +123,7 @@ pub(crate) fn build_row_ranges( break; } let rg = metadata.row_groups.get(i).unwrap(); - if let Some(ref pred) = folded_expr { + if let Some(ref pred) = predicate { let stats = statistics::row_group_metadata_to_table_stats(rg, schema) .with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu { path: uri.to_string(), @@ -163,7 +155,7 @@ pub(crate) fn build_row_ranges( curr_row_index += rg.num_rows(); continue; } else if rows_to_add > 0 { - if let Some(ref pred) = folded_expr { + if let Some(ref pred) = predicate { let stats = statistics::row_group_metadata_to_table_stats(rg, schema) .with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu { path: uri.to_string(), @@ -213,7 +205,7 @@ impl ParquetReaderBuilder { limit: None, row_groups: None, schema_inference_options: Default::default(), - predicates: None, + predicate: None, }) } @@ -269,9 +261,9 @@ impl ParquetReaderBuilder { self } - pub fn set_filter(mut self, predicates: Vec) -> Self { + pub fn set_filter(mut self, predicate: ExprRef) -> Self { assert_eq!(self.limit, None); - self.predicates = Some(predicates); + self.predicate = Some(predicate); self } @@ -293,7 +285,7 @@ impl ParquetReaderBuilder { self.limit, self.row_start_offset, self.row_groups.as_deref(), - self.predicates.as_deref(), + self.predicate.clone(), &daft_schema, &self.metadata, &self.uri, diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 05c63f5cb7..10f62ec09a 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -22,7 +22,7 @@ pub mod pylib { start_offset: Option, num_rows: Option, row_groups: Option>, - predicates: Option>, + predicate: Option, io_config: Option, multithreaded_io: Option, coerce_int96_timestamp_unit: Option, @@ -45,8 +45,7 @@ pub mod pylib { start_offset, num_rows, row_groups, - predicates - .map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::>()), + predicate.map(|e| e.expr.into()), io_client, Some(io_stats.clone()), runtime_handle, @@ -131,7 +130,7 @@ pub mod pylib { start_offset: Option, num_rows: Option, row_groups: Option>>>, - predicates: Option>, + predicate: Option, io_config: Option, num_parallel_tasks: Option, multithreaded_io: Option, @@ -155,8 +154,7 @@ pub mod pylib { start_offset, num_rows, row_groups, - predicates - .map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::>()), + predicate.map(|e| e.expr.into()), io_client, Some(io_stats), num_parallel_tasks.unwrap_or(128) as usize, diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index fa91f49ba1..8d488bc1a8 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -62,12 +62,12 @@ async fn read_parquet_single( start_offset: Option, num_rows: Option, row_groups: Option>, - predicates: Option>, + predicate: Option, io_client: Arc, io_stats: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult
{ - let pred_set = predicates.is_some(); + let pred_set = predicate.is_some(); if pred_set && num_rows.is_some() { return Err(common_error::DaftError::ValueError("Parquet Reader Currently doesn't support having both `num_rows` and `predicate` set at the same time".to_string())); } @@ -106,8 +106,8 @@ async fn read_parquet_single( builder }; - let builder = if let Some(ref predicates) = predicates { - builder.set_filter(predicates.clone()) + let builder = if let Some(ref predicate) = predicate { + builder.set_filter(predicate.clone()) } else { builder }; @@ -130,9 +130,9 @@ async fn read_parquet_single( let metadata_num_columns = metadata.schema().fields().len(); - if let Some(predicates) = predicates { + if let Some(predicate) = predicate { // TODO ideally pipeline this with IO and before concating, rather than after - table = table.filter(predicates.as_slice())?; + table = table.filter(&[predicate])?; } else { if let Some(row_groups) = row_groups { let expected_rows: usize = row_groups @@ -333,7 +333,7 @@ pub fn read_parquet( start_offset: Option, num_rows: Option, row_groups: Option>, - predicates: Option>, + predicate: Option, io_client: Arc, io_stats: Option, runtime_handle: Arc, @@ -347,7 +347,7 @@ pub fn read_parquet( start_offset, num_rows, row_groups, - predicates, + predicate, io_client, io_stats, schema_infer_options, @@ -392,7 +392,7 @@ pub fn read_parquet_bulk( start_offset: Option, num_rows: Option, row_groups: Option>>>, - predicates: Option>, + predicate: Option, io_client: Arc, io_stats: Option, num_parallel_tasks: usize, @@ -416,7 +416,7 @@ pub fn read_parquet_bulk( let uri = uri.to_string(); let owned_columns = owned_columns.clone(); let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone()); - let owned_predicates = predicates.clone(); + let owned_predicate = predicate.clone(); let io_client = io_client.clone(); let io_stats = io_stats.clone(); @@ -433,7 +433,7 @@ pub fn read_parquet_bulk( start_offset, num_rows, owned_row_group, - owned_predicates, + owned_predicate, io_client, io_stats, schema_infer_options, diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index e3ca5056be..3af8ae5000 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -313,7 +313,7 @@ impl ScanExternalInfo { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct Pushdowns { /// Optional filters to apply to the source data. - pub filters: Option>>, + pub filters: Option, /// Optional columns to select from the source data. pub columns: Option>>, /// Optional number of rows to read. @@ -328,7 +328,7 @@ impl Default for Pushdowns { impl Pushdowns { pub fn new( - filters: Option>>, + filters: Option, columns: Option>>, limit: Option, ) -> Self { @@ -347,7 +347,7 @@ impl Pushdowns { } } - pub fn with_filters(&self, filters: Option>>) -> Self { + pub fn with_filters(&self, filters: Option) -> Self { Self { filters, columns: self.columns.clone(), @@ -369,14 +369,7 @@ impl Pushdowns { res.push(format!("Projection pushdown = [{}]", columns.join(", "))); } if let Some(filters) = &self.filters { - res.push(format!( - "Filter pushdown = [{}]", - filters - .iter() - .map(|f| f.to_string()) - .collect::>() - .join(", ") - )); + res.push(format!("Filter pushdown = {}", filters)); } if let Some(limit) = self.limit { res.push(format!("Limit pushdown = {}", limit)); From bc2b11019bd81e27bf3fd4e9e289c08789cc642b Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 5 Dec 2023 19:23:07 -0800 Subject: [PATCH 04/11] clippy fixes --- src/daft-micropartition/src/micropartition.rs | 2 +- src/daft-parquet/src/file.rs | 2 +- src/daft-parquet/src/read.rs | 66 +++++++++---------- src/daft-parquet/src/stream_reader.rs | 1 - 4 files changed, 34 insertions(+), 37 deletions(-) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 64b4ee0acc..b717191b2c 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -8,7 +8,7 @@ use common_error::DaftResult; use daft_core::schema::{Schema, SchemaRef}; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; -use daft_dsl::{Expr, ExprRef}; +use daft_dsl::ExprRef; use daft_parquet::read::{ read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions, }; diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 3daa5298bb..1b44d9a89a 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, sync::Arc}; use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; -use daft_dsl::{Expr, ExprRef}; +use daft_dsl::ExprRef; use daft_io::{IOClient, IOStatsRef}; use daft_stats::TruthValue; use daft_table::Table; diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 8d488bc1a8..1f79cbcec7 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -1,4 +1,4 @@ -use std::{fmt::format, sync::Arc}; +use std::sync::Arc; use common_error::DaftResult; @@ -7,7 +7,7 @@ use daft_core::{ schema::Schema, DataType, IntoSeries, Series, }; -use daft_dsl::{Expr, ExprRef}; +use daft_dsl::ExprRef; use daft_io::{get_runtime, parse_url, IOClient, IOStatsRef, SourceType}; use daft_table::Table; use futures::{ @@ -133,44 +133,42 @@ async fn read_parquet_single( if let Some(predicate) = predicate { // TODO ideally pipeline this with IO and before concating, rather than after table = table.filter(&[predicate])?; + } else if let Some(row_groups) = row_groups { + let expected_rows: usize = row_groups + .iter() + .map(|i| rows_per_row_groups.get(*i as usize).unwrap()) + .sum(); + if expected_rows != table.len() { + return Err(super::Error::ParquetNumRowMismatch { + path: uri.into(), + metadata_num_rows: expected_rows, + read_rows: table.len(), + } + .into()); + } } else { - if let Some(row_groups) = row_groups { - let expected_rows: usize = row_groups - .iter() - .map(|i| rows_per_row_groups.get(*i as usize).unwrap()) - .sum(); - if expected_rows != table.len() { - return Err(super::Error::ParquetNumRowMismatch { + match (start_offset, num_rows) { + (None, None) if metadata_num_rows != table.len() => { + Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows: expected_rows, + metadata_num_rows, read_rows: table.len(), - } - .into()); + }) } - } else { - match (start_offset, num_rows) { - (None, None) if metadata_num_rows != table.len() => { - Err(super::Error::ParquetNumRowMismatch { - path: uri.into(), - metadata_num_rows, - read_rows: table.len(), - }) - } - (Some(s), None) if metadata_num_rows.saturating_sub(s) != table.len() => { - Err(super::Error::ParquetNumRowMismatch { - path: uri.into(), - metadata_num_rows: metadata_num_rows.saturating_sub(s), - read_rows: table.len(), - }) - } - (_, Some(n)) if n < table.len() => Err(super::Error::ParquetNumRowMismatch { + (Some(s), None) if metadata_num_rows.saturating_sub(s) != table.len() => { + Err(super::Error::ParquetNumRowMismatch { path: uri.into(), - metadata_num_rows: n.min(metadata_num_rows), + metadata_num_rows: metadata_num_rows.saturating_sub(s), read_rows: table.len(), - }), - _ => Ok(()), - }?; - }; + }) + } + (_, Some(n)) if n < table.len() => Err(super::Error::ParquetNumRowMismatch { + path: uri.into(), + metadata_num_rows: n.min(metadata_num_rows), + read_rows: table.len(), + }), + _ => Ok(()), + }?; } let expected_num_columns = if let Some(columns) = columns { columns.len() diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index 5cf4bea97a..0d88c52fc2 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -1,4 +1,3 @@ -use core::num; use std::{collections::HashSet, fs::File}; use arrow2::io::parquet::read; From 5ea4736c99137e977d33e1fbb4506fa5bbfc75f1 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 5 Dec 2023 20:12:55 -0800 Subject: [PATCH 05/11] thread in local predicate --- src/daft-parquet/src/read.rs | 3 ++- src/daft-parquet/src/stream_reader.rs | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 1f79cbcec7..bd10a6253e 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -73,13 +73,13 @@ async fn read_parquet_single( } let (source_type, fixed_uri) = parse_url(uri)?; let (metadata, mut table) = if matches!(source_type, SourceType::File) { - // TODO thread predicate to local parquet read crate::stream_reader::local_parquet_read_async( fixed_uri.as_ref(), columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()), start_offset, num_rows, row_groups.clone(), + predicate.clone(), schema_infer_options, ) .await @@ -208,6 +208,7 @@ async fn read_parquet_single_into_arrow( start_offset, num_rows, row_groups.clone(), + None, schema_infer_options, ) .await?; diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index 0d88c52fc2..fed91e4798 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -3,6 +3,7 @@ use std::{collections::HashSet, fs::File}; use arrow2::io::parquet::read; use common_error::DaftResult; use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_dsl::ExprRef; use daft_table::Table; use itertools::Itertools; use rayon::prelude::{IndexedParallelIterator, IntoParallelIterator, ParallelBridge}; @@ -51,6 +52,7 @@ pub(crate) fn local_parquet_read_into_arrow( start_offset: Option, num_rows: Option, row_groups: Option<&[i64]>, + predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> super::Result<( parquet2::metadata::FileMetaData, @@ -100,7 +102,7 @@ pub(crate) fn local_parquet_read_into_arrow( start_offset.unwrap_or(0), row_groups, // TODO THREAD IN PREDICATES - None, + predicate, &daft_schema, &metadata, uri, @@ -177,6 +179,7 @@ pub(crate) async fn local_parquet_read_async( start_offset: Option, num_rows: Option, row_groups: Option>, + predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult<(parquet2::metadata::FileMetaData, Table)> { let (send, recv) = tokio::sync::oneshot::channel(); @@ -189,6 +192,7 @@ pub(crate) async fn local_parquet_read_async( start_offset, num_rows, row_groups.as_deref(), + predicate, schema_infer_options, ); let (metadata, schema, arrays) = v?; @@ -225,6 +229,7 @@ pub(crate) async fn local_parquet_read_into_arrow_async( start_offset: Option, num_rows: Option, row_groups: Option>, + predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> super::Result<( parquet2::metadata::FileMetaData, @@ -240,6 +245,7 @@ pub(crate) async fn local_parquet_read_into_arrow_async( start_offset, num_rows, row_groups.as_deref(), + predicate, schema_infer_options, ); let _ = send.send(v); From f04add893620782437a8f9fe8fdef7e217b925ca Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 6 Dec 2023 12:01:09 -0800 Subject: [PATCH 06/11] add disjoint predicate handling --- src/daft-parquet/src/file.rs | 4 +- src/daft-parquet/src/read.rs | 35 +++++++++++++----- src/daft-stats/src/table_stats.rs | 13 +++++-- .../sampled-tpch-with-stats.parquet | Bin 0 -> 55644 bytes tests/table/table_io/test_parquet.py | 27 ++++++++++++++ 5 files changed, 65 insertions(+), 14 deletions(-) create mode 100644 tests/assets/parquet-data/sampled-tpch-with-stats.parquet diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 1b44d9a89a..5c3f5798d0 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -217,7 +217,7 @@ impl ParquetReaderBuilder { self.metadata().schema() } - pub fn prune_columns(mut self, columns: &[&str]) -> super::Result { + pub fn prune_columns>(mut self, columns: &[S]) -> super::Result { let avail_names = self .parquet_schema() .fields() @@ -226,7 +226,7 @@ impl ParquetReaderBuilder { .collect::>(); let mut names_to_keep = HashSet::new(); for col_name in columns { - if avail_names.contains(col_name) { + if avail_names.contains(col_name.as_ref()) { names_to_keep.insert(col_name.to_string()); } else { return Err(super::Error::FieldNotFound { diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index bd10a6253e..71d65fbfa9 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -7,7 +7,7 @@ use daft_core::{ schema::Schema, DataType, IntoSeries, Series, }; -use daft_dsl::ExprRef; +use daft_dsl::{optimization::get_required_columns, ExprRef}; use daft_io::{get_runtime, parse_url, IOClient, IOStatsRef, SourceType}; use daft_table::Table; use futures::{ @@ -67,15 +67,28 @@ async fn read_parquet_single( io_stats: Option, schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult
{ - let pred_set = predicate.is_some(); - if pred_set && num_rows.is_some() { - return Err(common_error::DaftError::ValueError("Parquet Reader Currently doesn't support having both `num_rows` and `predicate` set at the same time".to_string())); + let original_columns = columns; + let mut columns = columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()); + let requested_columns = columns.as_ref().map(|v| v.len()); + if let Some(ref pred) = predicate { + if num_rows.is_some() { + return Err(common_error::DaftError::ValueError("Parquet Reader Currently doesn't support having both `num_rows` and `predicate` set at the same time".to_string())); + } + if let Some(req_columns) = columns.as_mut() { + let needed_columns = get_required_columns(pred); + for c in needed_columns { + if !req_columns.contains(&c) { + req_columns.push(c); + } + } + } } let (source_type, fixed_uri) = parse_url(uri)?; + let (metadata, mut table) = if matches!(source_type, SourceType::File) { crate::stream_reader::local_parquet_read_async( fixed_uri.as_ref(), - columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()), + columns, start_offset, num_rows, row_groups.clone(), @@ -88,8 +101,8 @@ async fn read_parquet_single( ParquetReaderBuilder::from_uri(uri, io_client.clone(), io_stats.clone()).await?; let builder = builder.set_infer_schema_options(schema_infer_options); - let builder = if let Some(columns) = columns { - builder.prune_columns(columns)? + let builder = if let Some(columns) = columns.as_ref() { + builder.prune_columns(columns.as_slice())? } else { builder }; @@ -133,6 +146,9 @@ async fn read_parquet_single( if let Some(predicate) = predicate { // TODO ideally pipeline this with IO and before concating, rather than after table = table.filter(&[predicate])?; + if let Some(oc) = original_columns { + table = table.get_columns(oc)?; + } } else if let Some(row_groups) = row_groups { let expected_rows: usize = row_groups .iter() @@ -170,8 +186,9 @@ async fn read_parquet_single( _ => Ok(()), }?; } - let expected_num_columns = if let Some(columns) = columns { - columns.len() + + let expected_num_columns = if let Some(columns) = requested_columns { + columns } else { metadata_num_columns }; diff --git a/src/daft-stats/src/table_stats.rs b/src/daft-stats/src/table_stats.rs index 35154ddf72..b44f8c4be4 100644 --- a/src/daft-stats/src/table_stats.rs +++ b/src/daft-stats/src/table_stats.rs @@ -1,5 +1,6 @@ use std::{fmt::Display, ops::Not}; +use common_error::DaftError; use daft_dsl::Expr; use daft_table::Table; use indexmap::{IndexMap, IndexSet}; @@ -82,9 +83,15 @@ impl TableStatistics { pub fn eval_expression(&self, expr: &Expr) -> crate::Result { match expr { Expr::Alias(col, _) => self.eval_expression(col.as_ref()), - Expr::Column(col) => { - let col = self.columns.get(col.as_ref()).unwrap(); - Ok(col.clone()) + Expr::Column(col_name) => { + let col = self.columns.get(col_name.as_ref()); + if let Some(col) = col { + Ok(col.clone()) + } else { + Err(crate::Error::DaftCoreCompute { + source: DaftError::FieldNotFound(col_name.to_string()), + }) + } } Expr::Literal(lit_value) => lit_value.try_into(), Expr::Not(col) => self.eval_expression(col)?.not(), diff --git a/tests/assets/parquet-data/sampled-tpch-with-stats.parquet b/tests/assets/parquet-data/sampled-tpch-with-stats.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3a641a8b402f885ee8b92da97b7a0f0ea3325e3d GIT binary patch literal 55644 zcmd5_3w#vSxu4nWh7BeXFuP7H=<@IY8bUxsP%K%J@X!z-BoXV25Eh8W5R-s_)*>RN zlp@CocT!Q+%4QeVQ-Dmy8SJ z+iPCqjOJCt*Z7rgl31KfOe1kAs>`D+XsxhgYI~nM_4;HbK2>X-d(Vjc?40~tb8h#! z{3-r}s{fG6@|pKSQbZ}~q8sVBzboWu&VTHCBJx^U!3ixsxH*by(=+6q<>1x86m zYK5`Bp}D?9UKTQV30GVRt=vjnl2>sjc?am&6eV5nDD;ye zW!~KuHCgFGIK9Bx9zjD0ATWd;z}+j1_Xj3!M;SM+(7fL8HKdOii<6Sv-O|=Xj;cPY zFql^`tU#~o`(6He7h9Ih>e>LeC^1Sp(Qq#_3;E*uikn+M*!NK8RgZr;>ivYTGatCQ z^+&%t_lL}sHA6o8uI;Cph~BvLj>Dehc{85+A~S#Em5<-|#MhY<@Nxa+>&y>#?%b)& zJjLU0%Kai!Eb_(`n%9s_%8IdaqZ~uoY8`I>+T`AoGo~OXFFPlDWd5+M99|+&{SUg> zjubZcGFV0w8znKRWybo(MGAiQLu7y%^#frISZKq*x@T2&{2K#LjDiKed~IMjgB3a& zwQhzID1YX=&T5|R_^Iqd9J5x53?5w3B@4p)q|!S!~Umj<{E9Za~M z+TmAze6-5zJF;-sNuk>{?9@}mD(Nj2&zVhiS2r#w>c3-8Nny;xr zoU=TRGZPdxC(z!P7%q-zo$_-E3iI;@=LUyxeQQ+z0+lsf#m-#WJ|aVE_Y|L&OV43E zZd-RD9Y)aCU?_q!W>ZQ+Mri6_oNL`hZzxsI%s5ut$)*N1lA{ZP1%;!z8Z-Q}RJOe% zTiB6n!39XQ80jl88o+K##$K%`k0A6xcmm-8gzFJrV2W~gCq+p>7>)oP?aiK>uF%|c z9vNaeg4U8|Zr;`HeJ-;{GkWN-k=elly)qwF{U=p6zYAO1xxHqQ;w;kTqiX@L>fGI4irdjSO&1Sq{lf z!zqR8a}fF>EJLV5=#Q`h%2q*D4WU2^hpy1vbec$LzA7?YHbCPxZAA2wF^~+)$}xL&ZW8cQ0W_%H@YCdFiUS-=eqo8a?bW( zXVR$hG%FD)-fCZYuJ2M_!*6`pL(q`Mt%PTIa9D0D{O0Ej2?U4bQ)f7-m3Q!nL2Ade ze4L|eNH1Wv^{b$chYQi-a2qbNJTVTG%4#N|jaG;vBPaD#E~RXGU_xnm zaamw|s3KH8K2%;(8mhW6P*pY^XU0t~8y_HsP+2;83Zl{jl_Afq8EJvgw2I15RTbT& zj$2$-S{%jD$@Bp3TT=Y3c=M)R2hPNNppF;znQ0CKGeeGggXMOfrT{~%>) z2^i_i%=NYACAgbAj=INg>K=R06^@6l^r2!*A7@e3yv7-m`}*;)7GrU`#P=84Tll^)yFb3KX!~c-US)@mp4&>*^Pkd>_-yo&G&-YsNj!Z>LopU7 zB$#P@Gr0ex7H196`5^e`ER&IyWIW{{u zh%e~>zHGK83w96%k{!fYc^}b6eBD&yJyhc3K>$^hdKbs2DEjLQS*bVb#f1^yzEjaE zvCYFwjM0ei@}X?Y5WsA9I@0W3p!&w<`r7fDJ=Edj zVnbHKdxx*kyoP|L4c6ZaOnM8g!-1S#ZV^{C`H1j%qE=xV)NnV|{^y5q-LSH|0G?Ngn?Q)!x-jm5+ z>6G}<-)9o{n*OKlnPPD#dQ4YpULz$^R*Z#UNwwcI%A`Z*A-xSF+37r3Mid(*F{x!L zeB+IdO276x1)M!F!5gN2fj7c*aA}g3T^nJ)`gINGWxa+hQpRMdMl6t<4S&t{1HAuI zXySIV-sLWTor|q3U@J!hvM4V~DpJT}eR&bq8!*_elB9;=*8;xMkmT)BZ(x_;G%R4J zyJD;Z=j-SNv4yvrIPG+IHu|2ovH78>}9ox01WhK%mk-^Cj5DN6bzlw+;k0rF^}Y6W{)?) zM5DWNO4y=tfMd#XNS4AG>dQ4ANnPgs5HQ>b-J#s0fIz+WYY{$$n(GiZObUh0Xl}ZO z5;Tu&P?U~Y`h%x#q6tMpb>Uk%sBI%o+~n?PJ{Odmz??J}`F2EQYwl;O%Lr7HG4 zCyFP6`IQ$>B+8cBJ~0(hYU;+SvQVfZP&}bJR7qqwP(qXxuU(WLfd{6PmfQy-TNx(K z6~!g@d4_xXEKU!UjxU?u4i|x&5|`vOlyl<%im?Ln8kkZQN}Ad>x1y>dR8k7!Sz2CI zT{)$M+z+A2<0tE6IrSZbs;Vz46&=R3{b2zos8^DZsz60)dG+MLxKMF<(i7g|iN)&O ziO?}wy}KWXXINl$>E!Y%XEO)e`J{Hxm>d?j4L6034e3i-!#PvTtaGs0E%E-t9=559 z)la6J`wvr>7F4{qG~T)<4=}i60GiUr`7i>h;_<}YjBEXP0*kS@%zQh0D30x(X6w`1 z`0EbUzeiijsm94*In7Ec~_T`30bDUeDgML|h;L?H8bGmwz=4-y2T+ z9pA~lAMtn-pHk9QHNo3yT$b7@Q@N@qRYOIo`}-{Z>nsMEm#(K%n%B6NR1{-zi6p;| zK5UvoB(OVN?EQz>k{PgsC@@Mg(JV2?Vu@iVE+tFEuXU@fyvnZ%6Oz5JzT)yEd(V`T z6Fd0MCE%~u*Asu;(7iSIYi5|YUbjZ)JE9p#q;7`m>-j&NzlyPPGn}4japS`d4g&ig z2<%VT+DBjqQ6kwvjFtELDtsd&eR=yU8%|YSTSl|6+exfZ1moS1wBRYPHe2P0GAWlx+l<~m_x8dp#)oumBU80Utt#Z z#u23Vg5%$0lB4p|Cl6%~1?Ak5{Uv9sMelvax$VvaTQkXzxue}-&R(w^`5AZc&RFwY z=9zAHKDn)W9gqKEoy>)6q@a1ZPZal%zG5t5tfmQ%-8eZ9rfJHDd&I?d|4e0b=aPLy z$x+giHU=|QVzkNp^+u+f+_sq!5<4)=pOTW5HA%_t`hf}ElvPQ|U1}q4Ua>wnxn}J^ z(ngGh*4Fy)T~E}9qYEt9os_nLzu(yTq{?>8C)lFMC<#S_T^51e&v1+CcjAK_W(xD8=5!a@WQ3*Di)=^Q0!9yzg0cmgqO-SZYKBCplC zV8c?C?R<{VnBpA5rD$xqLrzG$vo|l=AANnd+rOE4gzR zTo;n5+=T*Fp3`BEG+4K~IiqU&{T0=d;b0~PKj;P}%?iO;-;hPu*lysCVQV7RyS1)P zox-G^G_>9X#UxGX+Se?(?O??#Tlu%evyHE+?BvTlvs*+hX<^Uw^)u>tz#X98b;pEO znF`*^@egGXeVmn7^BQMNev3v$5{sM7T3oQIjl$l09hQhT+!-Snj^ou%gUSy4hB^+- zPDY9jy=bfR_xp`)i88i%<}M=79md=Vp7_$XKjPc-&UN_izL)&KDU)kJq2Kw93lw_5 zJFjy5%Im56P+n&l(!6vbU4#F^*ODQ`SX?4076;$37(*n_@cwSysIo;H$QGi^C@Dp= zg>~3--jH~*MgBucWQ!g5k>}Ubw++4re+SR6ch*Q1-}iq>W1zR4OzPKfCN+hMQnX$7 zONxfT-TVrT?&bfHdSWb2Net2Wmfa8z_Kb1Qdp4tByh(--WkyL!Y6#nqhj=RTDmg}p zK*n*k@m@Oo3y`)({Q{h_wg!LuB;w8M-?2axd9yz1VaZmNZGD?iUQGWlhj|sjq`}U_ znY$w-A@~8C#@^7!sP$sr;*Bi-!1m$)$l_uwPTKM|o_D=g{?-Cq&YS1Hr?PqP5^Pau zl%%AYiyFV&!kd3cT=vEj@9=@oyC3<2^XHo@PjJ3`V!^gd8vgrm*{RGYw|&#?;Q=3H z5~ce5kxzBLyzgyKzqGTjALUmqVP9v8hB5Bfyqsb0dz-UnG1h4qy$5&T_b^OTtoabe z0l%-Z@3xb5M9E|wF;@Oop~8oe!1{Q#e&Lb8=GbS91g_;HfxQ?BoTMgbT^b^zgvE+r zjF`5&M#WeFx3ETk5j_0t{SPg$orD>$k1#lI%}xve`hXCNA_=h=8x1ks7GcEMItN}F zem|fOd)C3aPmEGKxnCQo#;4IZZ-cIiu#Gcz*kY_5cENk33R}L1 zp}M>2tmYx{r|S7GVz2(ss^9I2(Ya7cc}7 zZa^qR_(wNIxeVI=6TepgPX`1M4_%?T>0V0IJhJ04CTMN^#DYhp&^q7T^f6v_9Uw%e zIEV125lsUkFSNQ5;d+Ei5V}FvHV9o2LO|17iiob#+;lG`Y97fEeexFzqNo)5;6e1v zKP5<}Jco2Cq?zZZ&@z@xJ&&lgT2g(RAh_1fcjl@jg>Ct(r8Y&U(3=jctoAcrp_)iR z3Vb}hoDDmRGYJwohdub!xpUhzgHjWRRs$KP<{RptNf~J~nw`=>$;al42siy8D+Xaykl{f)UF)DYZCw z!_=9a!SI?ATh)hEgSr}iU2;Q%j#r0@%c>{QP-s4a8nha32NZ+3R}_1K8pcjfo-(dF zVY+gaTHQDXlsrs}t)rjekq!_EC+@9NZ+*~l1$E0Ypj#i)tdnqxN{e``tBI=9$nAt2 z-{C$1xRA+f%>CRGoNWejqrIsc2N%{(HwuP6N3j&-+xmuj7wFJemb`QD;zp1EpocBT zkoH-aBKsetEG?uq9?+S10G}C110u<>Pak?GLLX<5)V#(SlcAzPki_CLR=tDh8`fTivCj!35;>K-YOYR=oTWkY8dtz|9KMDOA;!WGlHC8~8;dDK z`U{`wmeVR*{138(C^Je*&9TH|e@G-tG`xNZS>nN&G~u`7cW!*ge{m`7@#LQpH1FRw zkU#mSKLj@(uycscX?OqkrBousXQ|gXqj`;MNk1`GZi&W!+AZPWzp%ufvv||~Em=a8 z86_pDC2RwrZA1Raf7}e;akGi3wd~gn_T!n(w)~F;rpSTyslWB#t8D9k6VhfUB}HoY z9k-$?o=P0J6L>F8{=EjF8$wsWzLqi-pXk10VMoCHv8F}vcfRj0 ze1P*iFMjRu&iiM!KF2pJ>2pqb+kbzY51ww7`VA+{?I)a;P+%HfO;>1MBPCK+j79Qx zyYNZ>haQJrnxep%{JU+oi_Ln>1>=a4qogM_&RE~wM%N@`fZ-H1xK%>(_@xmivF@o| z6=93wL%<}V2D2nh#8@CV9|D;TPtow;-9NS9c5+};gIX6`Ujyd<1VI)>MoCBt*~cS{iO-O^IUBC&j@U@(~yD{ zMj`9;7nvuFLWa9Hh^(;;>r&XcC!u6?-pkqM+9zG?`}r=zWgL|Aa6yp^puLAM{&AsQ z5gF~>_LPgwe@a&*I_Xji*n%e-^1eK%giT;xAvzn5!<`mg2%~v_o{xpS!#R%E!8rP zCCqwtX;RHsxP6Brqokj9mC;;a&H+n|>9-NWn56!oahNgg8Q$5_0ImgbM3BS(9uR**6#rKmx+Ls(iH<-M$ zy@UPzxFhUbHmCZQ6l_!~T|@g)~){1wlpeqy~Q_jzb@l8|?BF^wg(#CuA;!wPTsCIQ zr@Y1y-NSlTwh8Szk(a&IiRXau6tykkCdT42vpKHW?#DZ4c50&qW;ABIe}jviUQb9x zp@dY7l_SOD6^|KX`0Eu^ztaux?zLf7>jSH$2`%)&d@24x-rC-+T8Amq>qz>L9_jkO zFoY3{2rJ}mHv{ti%@)Xb0@*k%hJY@>>zg+*{C^Vx7DYx$C|W7EaN7Ok)~!2tAQM#f zo0SWWWfJpz^1+jQGOwrgKQc-EWqlhtd;K}1G_YX(;k1@#a`Y*#t|h)Nl$7Vl!nSkX$Y@rW%_R6 z+H4KiW`dSDo4a>07CMl37k!+(zg{k%V51mDdF;y$w(9#gK1rVuOJ;S(%NY(8iKiOC$4}MZ_lc)c>2!M}dp6 zcEqd*4Iw-Az6G&BZ}(mq?t4~mnq8C2-hOqh)|A@hj;H9NL2+w2w`$3 z8O^0DH8-6@wz$nBc}@RhH`G5r%{}-B7dyENId2i>5G}3LHgEOK&}upYk??^CgMg(2 zFytU)AUr0;LU(9xI!6hbM|Ld6WZBbyvS1M@uI{fsvj?vn_7ECVoI`liXzqkwSqL{E z%t4q0G(CZ$65$bq3Mm@8QghQeO3*yAqiGlRy)XF4f<{7bbvO(6p#}ak!7=4Iq)Xxa zKclyE;MqNOz*3W<)7z8(g$MF}UYY(KdTX5qti{3fw4_z5#o?*GL{(cM(#dY!6o+%p z7PoJN<%+9a)wb<{R(0t?3o0kcMIK@v@a9j^PJO}=HR;-&@A0jSxORW%+D#*(!#>R7 zlFG@LOiQFs_nYc$mioBHd|y5$I&`h4U7rY(B2Dv>`GQ#$ z#R&(;-ivv_pLp2JBQCb_ zFFfa(k%+V++7@aagnsaVENDZYZ2d<!=6&l^auOS7+Se%k%@17GDTZjx7 zw65Vx7n^sSOd-mQl2S8Fk@#Z1hg`Rl8u`fO;U|d6zMe;{_Fq#m=hu6AaU#AOZlv+c zS3V2oJ7AOF&EYL5jmB_s9lt`O`y!?gW96ne)@V0{$Z&W39o~(TWC~FxnL><}cmG6& z%f~La@T$$;K7HwhUr=H$|qUnGWAA*6B6Yuj)<=m#<8h zsFQgm4ZoLT#$yMn^YkF1kHIm;vTmHQFI6YTLWsSrbLZ{LXD!H`L>JCmyfU8mEy5YX zEefR^7Gvc(jC#Ms9Kbw=7HnSq@hR@4efH6foX~!B%?F<3Zv9_*m($)hIVU~ZyqDhk zn6GPf)8|Jxy>-Pk=wqGhzsaPvo|Q+wkbkvCCrsw(Wy1rTpkdOh-E~_lS=otef(4%qKBmw26=Q+i zBiSdOp`f$+_T#tRT$mExgTP|q+P^GJ?o@sk!jBBRya z&~@|U*vmgBM5Z`GWFE=Otj!HQ7aIKo+HXQ2R3{MP(Ay?No1yb+guhA=QLN^s`zcZL zNRDXpLJJ}ZtJNWGc`AcB<8v$>J_0t#KM zMV-%EdoLER+W!knb&*llPdLFlQ5(ljKh0~5M5+`?wQs61?c>H8>K$0tqLL9*wf#`y zr(r@%Sc%F4mhEjhU|AWa_0do>_!r1(_WfcDA}3vi4-A7D>lVkcnrAo~t1u~y@7%tc z2wi>4HGuLYLdC|1d}^EA@PK6Eji71zQp=vyj4(;mxoz97G@xv{;g?PA@No5{%Hpa? z;gzG{y-=z`k#($S#b@;%CFB`}PS&_SRY;IlS|;{dHoZe_GGA<2UAb&ZS#@beS!t-U zDnOHh`2x)m_pNo=sia?aOE*SFU#)mPSMN^2NNPBh+pbm8d=nE^B@$u1K5@3WrZiUC znov3|JpHzqrXq8>7r5lrAFV|=-hSZwyom~HD#ewSG63YCT-w;7xe za1Z%oLcal9f3H*Yjx{i})-yqDQNWv_}%Im^d z>R*O2ho>LUHb)>Q!($6yb-QeH-z9_OBeQKCF@woAIQCsKy{MEq_{>Bw5f5!xdo}U* zw=IY*oEFp_1Cdv6iDM0K5m-?gffZxrVCA0L3z&zi{30IwdmD6(PHlBh?ap`N*!*{N zO4%}=TJ#&%y=T`%#%+(h8^_kZt1Hq>Zd(ujvjpLf2rnRX>Wm?-2&ch_4j{C}|DzCu z`tGJXG&fy?Y_S|6%O&-D?eC!e`MK@7?dTo-7CCPb<`6Bd$~GqWD70FRunvK+wTJG6 z>7URu3BQR5mm`RH=nl^uYfUq=@JaaL_O!^GJ^9^oJHiQMv7zopEf=PJ(30BS_|vygEwP zYo=+L!hS+%6ooB{kXufID|%0x*qX>4z}7@t{%EO1GRmv(O|)ir99#VdUXhxWjx-l1 zP|rheZm%0k{gtp=mgucM9J$OkFE=HFd}W>YN##&`?Ebb#cWcq-2hd z=B(S(rys4TrQzBtq=0p!J9L>wQ+v|_DToU@dJS(eC(3Qr6YgOj)NN?O7)H7(($Zn_Vb&p&oH9LSc-_~l{6(sT~4wG9t`)^U+w$dxpVhd;JtnuGDy z#`|}B*n0<%XZv`bH8&+`d9)GS!O(_2L-ZeN4;=*8IHP%uYf;a=Vk`uh1b1s3oBbC6 zIzPcZ^(mfbpVBj46c;5CDI)Z6E#r_APySaiI9+0g5!Zs*d3WB`4m@u5`#H3wZ2HQ6 zIQZhr0(}4SCwJrfvfA-*O@8y!KHLCrJ@_>Lb>^<-C24d@^BUKZhGHx(k>u{uzgmnT z@>`5Uo_RQq?K?!a5M@S5DVi;;Q;mQ5RTA;r@$ag{Z!@~mYRvaviAlzVnQ`z}cil|` zliQyn!t3vg1*ZnQddkHuaQSH(Hz9dIdW|!h_x>wQC#@KZnujl`sO!BGex@)d8R8gFKy0Cl1W)HR&JM< z{{g!+gWS$L1)|qT#t|h)Nl)4qjHJDH!fwm<5znl5ys@&<>39u+F zN*YqY0sv)s`+tvDXM5`F59p?7}o#C_$4eg2=^n5gSI0Of>JDW zR&x_>!eSmduoO(KK@4kr+=4~qvpSknv)yd&Y(is-a|oBBu`%>pq0vHw+Yz2dAn*7% zgxjF+&k%AD?m!T8ht6tlx}Op?j|4>Al~nsv3!Bch?%dg0t*f&jeSq`V==}RPrBKb`5c2qmQ>%_ z9^d*1i{0j`roIoBTLvdJd80uF$A{F`jbpH_E^$CU+trv3tGhP4B092ua%B4=V>u@4 z4A&AG*=(C?i?iz38op63r^EE_qdc;X6uD#50}licnRLGvUe($6T-$4s)2S3CJf>8s z^&@E-88!?uHyon1qt(OR;I0jCjMP~zO3}wQr{0x4E(G`R)R0L;i#=UaszQ~BN;;|p ze4lsYD_-fNo1N7S*>shV78%Y=OzUhc&>Zpnib+CcYttFtuVU9TmOOT_-tXf5YvbAJ zU%1)Qr+5}$96h9^!!}QM2z8*x(mdT_XicAS`VX~5wg;=`HO`n67OjRP76Qyc%p;G} z`pdrr^l1Dzy#8{Xn;m!-jgmxQlw_pn9Q=6G&z~cHy!5eah#&uC=bJ}uU^~tIDzUwZGXdckN`Xz3HjyGiUUuP-Nyd;fIX>V;orLpJSY)Elc*lzU2=LW{wo^7xI**ZUBEA)r<7NW#W{%~8h>Rsgo7TF|982Y zt$BeAAx+c)Vp3y1 z1HTi&#F%C7`=zy)4SI~Vs3Oa5={|f(@58bZ953(4MzT6R66j+RW*kWJckL?{@N#Or z@MSlfw~C;PLZc)l&0y4e3pL(IODhj}|4TlE`ET{d_#EBtAAgofT91AFEAIb&y6Zl^ zrP0Ex8$I5#_5BZXfA3w}j%0GOa=e!Nf!F?hZ>FT&V=E-w2Wo6w%S)pWE4Ry`S7DcC zP~&ZD+-%mbdFdAi!YTk$GyBqe)i^OcwaktwTdA_JO%-a(Vi z{e;M#_{=12#8{j$X)?Cr9e@h4|2G!wPMWN5T(o7Kn{9rLV2dK7BqW7B)`#(*Vb^az z{f$86WdLe|zE_dq2In~NC89N{Yxo+DR*bcw^#0ce9sY zC!nG<0xHJJ-wO%!_(GRoUN8Y8Bj;^`u0~fHO;vtQL1BL0;N0L4pUa=(*WJFG-f*+; zH}W2e&a$PJ7sgg!z&cdm-^(`JHIdQdJ#V6`^ro&zbb4$fVQoOAdLwwyFnbuGHZ zHy{)t{0edN5LO`w)!j{ZX>Ph6*@;Zz1O`!W^QdhPB}t z4Xuciehn;p5YnMvcZ6hwiNNs;!edfAI+LaAC{goBj%Uj@3m%as>*Ig7zT;-Qw-O># zoI`lih<!tq7Mu>*GMw8yfaQC`Q5X_&EU`|kG76@-zG_{wg0VrmWclM?=3YdI!)gCzMIwk zj#sLGhb9-9yBJ+7TY@>6C~zw^)w1?7D73T11w5;p*7^(?+Kx&BJG9e+#z~fU^IeS2 zeSm(&2ONo2n6wBk@%VN{*zzt{J+J$hL9 zHC&_QJ2uBSb4P7>+U$fWl~^|lC0r?Rca$mxNEJG#(KIwAI~Nu>RAeZS7&e(W&kPg7SWPAMy{?C|$8eS0Q@&09xM z2Z|G3P`aywk{Wy!?TuBfQfH)a1OaCoiA-y4z(@~h-3>%KP$*ARWd1bf3Yv1_}o{!G?Xx7t6e?9 z2T)O5|IqUJVyrBYS*>k}i1hV{mu+o@L^iys6KT!6U*FP+p1YD*St2`< z%tUMji-Xa9t7{Y4&P$Nae?TAEmbPX($Wo)U@>KLOy33MTT^pp*iyMsc#T)*Q(1@`z zG{?Xtn!$W_q_DY{A*l~{?%b)&JO!Th01dD`_lHbMN{p2ywJ+67%2KZkvn|F#w;J1W zIa2u#Xa;NOV5Va4;L`%L?b-4lAOVA9*jF_YV`VVr1e)%ZH(bTeT*;>yMsZoO+Dy}i8B!=?u>Ecge zUjNK47=uO)|L&V@O^oqS<%;NU!qyL92W~=z(cd>x6JtCCsNRdQl2qA|pqYx;^kekV z_Dr^9AQA}_U#?OErg=;E(quLL<(3#LOXPHpEfGi4kLSz2Y_=u~iELPJ%#hioKLhlp zUx<|@vU!-9NVKMZ`B1iH2-5jan*No;&2*eizrIZT+*~&AR;1FKn~6;33jas>BF4(l z>=|XYl+NA{F9LWlW3ofTNOn38NzrJKlgVG9>BqaTHA8NCl{fujtSqT*g=SLDre7bK zu(E)y9F0`WreA;AqR(OwV`Zr%d*_ehKBkEQ1eAAog6Zf$1??y}d z45mA1?)`Av0jT9j25|OxQ);xPe@+QoG!Amda=FLDa?K-|+`5UTTuIZfJNB1MU=1M% zUdIK08WwCG$pjy~*R*Byrhiu{+c}BMDU#^sHT?_9&19TSKa|G;RBP^MtIH^fCYt^^ zQ%p&YrXTAe9rmzIRjht83G#uWMclhaBR^haJtC8IjZw%E$0sMo=^%1t(e^pO<9izAZVyrBY(=#n32B|n602tZ1?Y~3F859eZ7u~pop=u zRJPByq#`~6(3H`v!N*j#brx;mVc5z}n8a9FQX6Y5Ny#4qbx*)mHHQ*P*MT@517fT! zp_B8FP*W7(h>Pw1nTk~(C^brK!EstKhL&KRvF17cBoJd|sU2NlNzM5v(A^C?pH$h7 z`IL~e+1D3n5o2WueNt;l$nrEm8ui#o{Tu5n%Ek6|pF;>#?NB4`aRnh%LV0Qwv2l$)=T5x5`*ewWoQCDYVFqL)e^(?Qdq23Ct)#N?jvRk zY{X)1i;K1<#l>1ug0{n`J`%yNGN4Qs{nX8uK8Qrao0 z?X|>Ue*TEtZq#R*YiARB(&vU?b}$$mg+QNMgTV)bgC+(C2ZIxXSrd8Oph3alaQZJR z7#v2wI5vLy0sne17`&FEhtp+QIk-Joz%P3c$Jw`23J(Nxg2AD5d$1oJXVGyUNzA&P zzJEkt*<2Nz2nKUTUe~iHXU~59 zdQRxs6GwvvQ)Z7X5jCK>zH5j9z)8i%c28yh_4PLV`iUL-IyV zl-rM_!hVGrlWr-@$sbgdQ;3L4fY9$BZrmTXKhJcF{i)#4{RKH?Bl3W1}jP}hGH z6|LJH_uoR-OYSecuc)x7=#~-r_e~p=&+jLDkJ9t&mVz*zoNR7S6i$3B`9V5Ue&i3j zjn3R+=szs@pppKl;pfGlpHo(VGE`JhhRcSV_xE$$&kK!Jf6^8B^NS`G<`-U{lV6xW zs_?$DY${8^;O$fqhS0D685Yc?@FVb_a$Vi{6H$L|p$IW0S2Bwg9a6r<>%%X(Q#Hk2%-8J3{v*-%pMiCR|#*BDDxwk9M;cS zet7#t<*(N*Ir&Amgk?{p`vJHlY#I`;`cpp0Qk0Rw{9gYHv-)tF literal 0 HcmV?d00001 diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index cff4c41791..0d9c189bf2 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -5,6 +5,7 @@ import os import pathlib import tempfile +from itertools import product import pyarrow as pa import pyarrow.parquet as papq @@ -365,3 +366,29 @@ def test_read_empty_parquet_file_with_pyarrow_bulk(tmpdir): read_back = read_parquet_into_pyarrow_bulk([file_path.as_posix()]) assert len(read_back) == 1 assert tab == read_back[0] + + +PRED_PUSHDOWN_FILES = [ + "s3://daft-public-data/test_fixtures/parquet-dev/sampled-tpch-with-stats.parquet", + "tests/assets/parquet-data/sampled-tpch-with-stats.parquet", +] + + +@pytest.mark.parametrize( + "path, pred", + product(PRED_PUSHDOWN_FILES, [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)]), +) +def test_parquet_filter_pushdowns(path, pred): + with_pushdown = MicroPartition.read_parquet(path, predicate=pred) + after = MicroPartition.read_parquet(path).filter([pred]) + assert with_pushdown.to_arrow() == after.to_arrow() + + +@pytest.mark.parametrize( + "path, pred", + product(PRED_PUSHDOWN_FILES, [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)]), +) +def test_parquet_filter_pushdowns_disjoint_predicate(path, pred): + with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["L_QUANTITY"]) + after = MicroPartition.read_parquet(path).filter([pred]).eval_expression_list([daft.col("L_QUANTITY")]) + assert with_pushdown.to_arrow() == after.to_arrow() From b6b28f88b8d6e0a696efde5d5f1065d616a88db7 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 6 Dec 2023 12:04:05 -0800 Subject: [PATCH 07/11] add test with no stats --- tests/table/table_io/test_parquet.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 0d9c189bf2..a17fda18e5 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -392,3 +392,16 @@ def test_parquet_filter_pushdowns_disjoint_predicate(path, pred): with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["L_QUANTITY"]) after = MicroPartition.read_parquet(path).filter([pred]).eval_expression_list([daft.col("L_QUANTITY")]) assert with_pushdown.to_arrow() == after.to_arrow() + + +@pytest.mark.parametrize( + "path, pred", + product( + ["tests/assets/parquet-data/mvp.parquet", "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"], + [daft.col("a") == 1, daft.col("a") == 10000, daft.lit(True)], + ), +) +def test_parquet_filter_pushdowns_disjoint_predicate_no_stats(path, pred): + with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["b"]) + after = MicroPartition.read_parquet(path).filter([pred]).eval_expression_list([daft.col("b")]) + assert with_pushdown.to_arrow() == after.to_arrow() From 1bdecacb73e894ff942560cbda2d7faac9566acf Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 6 Dec 2023 12:08:29 -0800 Subject: [PATCH 08/11] add limit to filter pushdown --- tests/table/table_io/test_parquet.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index a17fda18e5..fdc9ddbabe 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -375,12 +375,18 @@ def test_read_empty_parquet_file_with_pyarrow_bulk(tmpdir): @pytest.mark.parametrize( - "path, pred", - product(PRED_PUSHDOWN_FILES, [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)]), + "path, pred, limit", + product( + PRED_PUSHDOWN_FILES, + [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)], + [None, 1, 1000], + ), ) -def test_parquet_filter_pushdowns(path, pred): - with_pushdown = MicroPartition.read_parquet(path, predicate=pred) +def test_parquet_filter_pushdowns(path, pred, limit): + with_pushdown = MicroPartition.read_parquet(path, predicate=pred, num_rows=limit) after = MicroPartition.read_parquet(path).filter([pred]) + if limit is not None: + after = after.head(limit) assert with_pushdown.to_arrow() == after.to_arrow() From 0b0a94d2527b2f9367126b4dbef68508d43cce61 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 6 Dec 2023 12:15:10 -0800 Subject: [PATCH 09/11] better error handling --- src/daft-parquet/src/file.rs | 10 ++++++---- src/daft-parquet/src/lib.rs | 8 ++++++++ src/daft-parquet/src/stream_reader.rs | 7 +++++-- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 5c3f5798d0..6f93ca7758 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -20,8 +20,8 @@ use crate::{ read::ParquetSchemaInferenceOptions, read_planner::{CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass}, statistics, JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu, - UnableToCreateParquetPageStreamSnafu, UnableToParseSchemaFromMetadataSnafu, - UnableToRunExpressionOnStatsSnafu, + UnableToConvertSchemaToDaftSnafu, UnableToCreateParquetPageStreamSnafu, + UnableToParseSchemaFromMetadataSnafu, UnableToRunExpressionOnStatsSnafu, }; use arrow2::io::parquet::read::column_iter_to_arrays; @@ -279,8 +279,10 @@ impl ParquetReaderBuilder { .fields .retain(|f| names_to_keep.contains(f.name.as_str())); } - // TODO: DONT UNWRAP - let daft_schema = Schema::try_from(&arrow_schema).unwrap(); + let daft_schema = + Schema::try_from(&arrow_schema).with_context(|_| UnableToConvertSchemaToDaftSnafu { + path: self.uri.to_string(), + })?; let row_ranges = build_row_ranges( self.limit, self.row_start_offset, diff --git a/src/daft-parquet/src/lib.rs b/src/daft-parquet/src/lib.rs index 87df58a1a8..637ab2cf91 100644 --- a/src/daft-parquet/src/lib.rs +++ b/src/daft-parquet/src/lib.rs @@ -70,6 +70,14 @@ pub enum Error { path: String, source: arrow2::error::Error, }, + + #[snafu(display( + "Unable to convert arrow schema to daft schema for file {}: {}", + path, + source + ))] + UnableToConvertSchemaToDaft { path: String, source: DaftError }, + #[snafu(display( "Field: {} not found in Parquet File: {} Available Fields: {:?}", field, diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index fed91e4798..982be9c2da 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -12,6 +12,7 @@ use snafu::ResultExt; use crate::{ file::build_row_ranges, read::{ArrowChunk, ParquetSchemaInferenceOptions}, + UnableToConvertSchemaToDaftSnafu, }; use crate::stream_reader::read::schema::infer_schema_with_options; @@ -92,7 +93,10 @@ pub(crate) fn local_parquet_read_into_arrow( path: uri.to_string(), })?; let schema = prune_fields_from_schema(schema, columns, uri)?; - let daft_schema = Schema::try_from(&schema).unwrap(); + let daft_schema = + Schema::try_from(&schema).with_context(|_| UnableToConvertSchemaToDaftSnafu { + path: uri.to_string(), + })?; let chunk_size = 128 * 1024; let max_rows = metadata.num_rows.min(num_rows.unwrap_or(metadata.num_rows)); @@ -101,7 +105,6 @@ pub(crate) fn local_parquet_read_into_arrow( num_rows, start_offset.unwrap_or(0), row_groups, - // TODO THREAD IN PREDICATES predicate, &daft_schema, &metadata, From d949829ad3ee58dc35e4d6baa1f902128734a16a Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 6 Dec 2023 12:31:15 -0800 Subject: [PATCH 10/11] push head into read_parquet --- src/daft-micropartition/src/micropartition.rs | 19 ++++++++----------- src/daft-parquet/src/read.rs | 7 ++++++- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index b717191b2c..0c8ac8ebff 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -654,13 +654,12 @@ pub(crate) fn read_parquet_into_micropartition( let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?; if let Some(predicate) = predicate { - // We have a predicate, so we will perform eager read with the predicate - // Since we currently + // We have a predicate, so we will perform eager read only reading what row groups we need. let all_tables = read_parquet_bulk( uris, columns, None, - None, + num_rows, row_groups, Some(predicate.clone()), io_client, @@ -685,14 +684,12 @@ pub(crate) fn read_parquet_into_micropartition( .into_iter() .map(|t| t.cast_to_schema(&pruned_daft_schema)) .collect::>>()?; - let loaded = - MicroPartition::new_loaded(Arc::new(pruned_daft_schema), all_tables.into(), None); - - if let Some(num_rows) = num_rows { - return loaded.head(num_rows); - } else { - return Ok(loaded); - } + // TODO: we can pass in stats here to optimize downstream workloads such as join + return Ok(MicroPartition::new_loaded( + Arc::new(pruned_daft_schema), + all_tables.into(), + None, + )); } let meta_io_client = io_client.clone(); diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 71d65fbfa9..8e482cffb9 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -68,11 +68,13 @@ async fn read_parquet_single( schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult
{ let original_columns = columns; + let original_num_rows = num_rows; + let mut num_rows = num_rows; let mut columns = columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()); let requested_columns = columns.as_ref().map(|v| v.len()); if let Some(ref pred) = predicate { if num_rows.is_some() { - return Err(common_error::DaftError::ValueError("Parquet Reader Currently doesn't support having both `num_rows` and `predicate` set at the same time".to_string())); + num_rows = None; } if let Some(req_columns) = columns.as_mut() { let needed_columns = get_required_columns(pred); @@ -149,6 +151,9 @@ async fn read_parquet_single( if let Some(oc) = original_columns { table = table.get_columns(oc)?; } + if let Some(nr) = original_num_rows { + table = table.head(nr)?; + } } else if let Some(row_groups) = row_groups { let expected_rows: usize = row_groups .iter() From ddbb050cf8c15f64cf893f17655309feca2d0c43 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 6 Dec 2023 13:05:19 -0800 Subject: [PATCH 11/11] add anon mode --- .../io/parquet/test_read_pushdowns.py | 65 +++++++++++++++++++ tests/table/table_io/test_parquet.py | 46 ------------- 2 files changed, 65 insertions(+), 46 deletions(-) create mode 100644 tests/integration/io/parquet/test_read_pushdowns.py diff --git a/tests/integration/io/parquet/test_read_pushdowns.py b/tests/integration/io/parquet/test_read_pushdowns.py new file mode 100644 index 0000000000..40f227bb4b --- /dev/null +++ b/tests/integration/io/parquet/test_read_pushdowns.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from itertools import product + +import pytest + +import daft +from daft.table import MicroPartition + +PRED_PUSHDOWN_FILES = [ + "s3://daft-public-data/test_fixtures/parquet-dev/sampled-tpch-with-stats.parquet", + "tests/assets/parquet-data/sampled-tpch-with-stats.parquet", +] + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path, pred, limit", + product( + PRED_PUSHDOWN_FILES, + [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)], + [None, 1, 1000], + ), +) +def test_parquet_filter_pushdowns(path, pred, limit, aws_public_s3_config): + with_pushdown = MicroPartition.read_parquet(path, predicate=pred, num_rows=limit, io_config=aws_public_s3_config) + after = MicroPartition.read_parquet(path, io_config=aws_public_s3_config).filter([pred]) + if limit is not None: + after = after.head(limit) + assert with_pushdown.to_arrow() == after.to_arrow() + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path, pred", + product(PRED_PUSHDOWN_FILES, [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)]), +) +def test_parquet_filter_pushdowns_disjoint_predicate(path, pred, aws_public_s3_config): + with_pushdown = MicroPartition.read_parquet( + path, predicate=pred, columns=["L_QUANTITY"], io_config=aws_public_s3_config + ) + after = ( + MicroPartition.read_parquet(path, io_config=aws_public_s3_config) + .filter([pred]) + .eval_expression_list([daft.col("L_QUANTITY")]) + ) + assert with_pushdown.to_arrow() == after.to_arrow() + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path, pred", + product( + ["tests/assets/parquet-data/mvp.parquet", "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"], + [daft.col("a") == 1, daft.col("a") == 10000, daft.lit(True)], + ), +) +def test_parquet_filter_pushdowns_disjoint_predicate_no_stats(path, pred, aws_public_s3_config): + with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["b"], io_config=aws_public_s3_config) + after = ( + MicroPartition.read_parquet(path, io_config=aws_public_s3_config) + .filter([pred]) + .eval_expression_list([daft.col("b")]) + ) + assert with_pushdown.to_arrow() == after.to_arrow() diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index fdc9ddbabe..cff4c41791 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -5,7 +5,6 @@ import os import pathlib import tempfile -from itertools import product import pyarrow as pa import pyarrow.parquet as papq @@ -366,48 +365,3 @@ def test_read_empty_parquet_file_with_pyarrow_bulk(tmpdir): read_back = read_parquet_into_pyarrow_bulk([file_path.as_posix()]) assert len(read_back) == 1 assert tab == read_back[0] - - -PRED_PUSHDOWN_FILES = [ - "s3://daft-public-data/test_fixtures/parquet-dev/sampled-tpch-with-stats.parquet", - "tests/assets/parquet-data/sampled-tpch-with-stats.parquet", -] - - -@pytest.mark.parametrize( - "path, pred, limit", - product( - PRED_PUSHDOWN_FILES, - [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)], - [None, 1, 1000], - ), -) -def test_parquet_filter_pushdowns(path, pred, limit): - with_pushdown = MicroPartition.read_parquet(path, predicate=pred, num_rows=limit) - after = MicroPartition.read_parquet(path).filter([pred]) - if limit is not None: - after = after.head(limit) - assert with_pushdown.to_arrow() == after.to_arrow() - - -@pytest.mark.parametrize( - "path, pred", - product(PRED_PUSHDOWN_FILES, [daft.col("L_ORDERKEY") == 1, daft.col("L_ORDERKEY") == 10000, daft.lit(True)]), -) -def test_parquet_filter_pushdowns_disjoint_predicate(path, pred): - with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["L_QUANTITY"]) - after = MicroPartition.read_parquet(path).filter([pred]).eval_expression_list([daft.col("L_QUANTITY")]) - assert with_pushdown.to_arrow() == after.to_arrow() - - -@pytest.mark.parametrize( - "path, pred", - product( - ["tests/assets/parquet-data/mvp.parquet", "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"], - [daft.col("a") == 1, daft.col("a") == 10000, daft.lit(True)], - ), -) -def test_parquet_filter_pushdowns_disjoint_predicate_no_stats(path, pred): - with_pushdown = MicroPartition.read_parquet(path, predicate=pred, columns=["b"]) - after = MicroPartition.read_parquet(path).filter([pred]).eval_expression_list([daft.col("b")]) - assert with_pushdown.to_arrow() == after.to_arrow()