Skip to content

Commit

Permalink
e2e micropartition
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 6, 2023
1 parent 6bf9417 commit bbd52e0
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 50 deletions.
4 changes: 4 additions & 0 deletions daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def read_parquet(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[int] | None = None,
predicates: list[Expression] | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
Expand All @@ -321,6 +322,7 @@ def read_parquet(
start_offset,
num_rows,
row_groups,
[pred._expr for pred in predicates] if predicates is not None else None,
io_config,
multithreaded_io,
coerce_int96_timestamp_unit._timeunit,
Expand All @@ -335,6 +337,7 @@ def read_parquet_bulk(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
predicates: list[Expression] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand All @@ -347,6 +350,7 @@ def read_parquet_bulk(
start_offset,
num_rows,
row_groups_per_path,
[pred._expr for pred in predicates] if predicates is not None else None,
io_config,
num_parallel_tasks,
multithreaded_io,
Expand Down
4 changes: 4 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ def read_parquet(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[int] | None = None,
predicates: list[Expression] | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
Expand All @@ -394,6 +395,7 @@ def read_parquet(
start_offset=start_offset,
num_rows=num_rows,
row_groups=row_groups,
predicates=[pred._expr for pred in predicates] if predicates is not None else None,
io_config=io_config,
multithreaded_io=multithreaded_io,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
Expand All @@ -408,6 +410,7 @@ def read_parquet_bulk(
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
predicates: list[Expression] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand All @@ -419,6 +422,7 @@ def read_parquet_bulk(
start_offset=start_offset,
num_rows=num_rows,
row_groups=row_groups_per_path,
predicates=[pred._expr for pred in predicates] if predicates is not None else None,
io_config=io_config,
num_parallel_tasks=num_parallel_tasks,
multithreaded_io=multithreaded_io,
Expand Down
56 changes: 56 additions & 0 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use common_error::DaftResult;
use daft_core::schema::{Schema, SchemaRef};

use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions};
use daft_dsl::{Expr, ExprRef};
use daft_parquet::read::{
read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions,
};
Expand Down Expand Up @@ -119,6 +120,11 @@ fn materialize_scan_task(
None,
scan_task.pushdowns.limit,
row_groups,
scan_task
.pushdowns
.filters
.as_ref()
.map(|v| v.as_ref().clone()),
io_client.clone(),
io_stats,
8,
Expand Down Expand Up @@ -387,6 +393,11 @@ impl MicroPartition {
None,
scan_task.pushdowns.limit,
row_groups,
scan_task
.pushdowns
.filters
.as_ref()
.map(|v| v.as_ref().clone()),
cfg.io_config
.clone()
.map(|c| Arc::new(c.clone()))
Expand Down Expand Up @@ -635,6 +646,7 @@ pub(crate) fn read_parquet_into_micropartition(
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
predicates: Option<Vec<ExprRef>>,
io_config: Arc<IOConfig>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
Expand All @@ -648,6 +660,49 @@ pub(crate) fn read_parquet_into_micropartition(
// Run the required I/O to retrieve all the Parquet FileMetaData
let runtime_handle = daft_io::get_runtime(multithreaded_io)?;
let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?;

if let Some(ref predicates) = predicates {
// We have a predicate, so we will perform eager read with the predicate
// Since we currently
let all_tables = read_parquet_bulk(
uris,
columns,
None,
None,
row_groups,
Some(predicates.clone()),
io_client,
io_stats,
num_parallel_tasks,
runtime_handle,
schema_infer_options,
)?;

let unioned_schema = all_tables
.iter()
.map(|t| t.schema.clone())
.try_reduce(|l, r| DaftResult::Ok(l.union(&r)?.into()))?;
let full_daft_schema = unioned_schema.expect("we need at least 1 schema");
// Hack to avoid to owned schema
let full_daft_schema = Schema {
fields: full_daft_schema.fields.clone(),
};
let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?;

let all_tables = all_tables
.into_iter()
.map(|t| t.cast_to_schema(&pruned_daft_schema))
.collect::<DaftResult<Vec<_>>>()?;
let loaded =
MicroPartition::new_loaded(Arc::new(pruned_daft_schema), all_tables.into(), None);

if let Some(num_rows) = num_rows {
return loaded.head(num_rows);
} else {
return Ok(loaded);
}
}

let meta_io_client = io_client.clone();
let meta_io_stats = io_stats.clone();
let metadata = runtime_handle.block_on(async move {
Expand Down Expand Up @@ -775,6 +830,7 @@ pub(crate) fn read_parquet_into_micropartition(
start_offset,
num_rows,
row_groups,
None,
io_client,
io_stats,
num_parallel_tasks,
Expand Down
4 changes: 2 additions & 2 deletions src/daft-micropartition/src/ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu};
use daft_stats::TruthValue;

impl MicroPartition {
pub fn filter(&self, predicate: &[Expr]) -> DaftResult<Self> {
pub fn filter<E: AsRef<Expr>>(&self, predicate: &[E]) -> DaftResult<Self> {
let io_stats = IOStatsContext::new("MicroPartition::filter");
if predicate.is_empty() {
return Ok(Self::empty(Some(self.schema.clone())));
}
if let Some(statistics) = &self.statistics {
let folded_expr = predicate
.iter()
.cloned()
.map(|e| e.as_ref().clone())
.reduce(|a, b| a.and(&b))
.expect("should have at least 1 expr");
let eval_result = statistics.eval_expression(&folded_expr)?;
Expand Down
6 changes: 6 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ impl PyMicroPartition {
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<i64>>,
predicates: Option<Vec<PyExpr>>,
io_config: Option<IOConfig>,
multithreaded_io: Option<bool>,
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
Expand All @@ -423,6 +424,8 @@ impl PyMicroPartition {
start_offset,
num_rows,
row_groups.map(|rg| vec![Some(rg)]),
predicates
.map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::<Vec<_>>()),
io_config,
Some(io_stats),
1,
Expand All @@ -442,6 +445,7 @@ impl PyMicroPartition {
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
predicates: Option<Vec<PyExpr>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
Expand All @@ -461,6 +465,8 @@ impl PyMicroPartition {
start_offset,
num_rows,
row_groups,
predicates
.map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::<Vec<_>>()),
io_config,
Some(io_stats),
num_parallel_tasks.unwrap_or(128) as usize,
Expand Down
31 changes: 21 additions & 10 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashSet, sync::Arc};
use arrow2::io::parquet::read::schema::infer_schema_with_options;
use common_error::DaftResult;
use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_dsl::Expr;
use daft_dsl::{Expr, ExprRef};
use daft_io::{IOClient, IOStatsRef};
use daft_stats::TruthValue;
use daft_table::Table;
Expand Down Expand Up @@ -33,7 +33,7 @@ pub(crate) struct ParquetReaderBuilder {
limit: Option<usize>,
row_groups: Option<Vec<i64>>,
schema_inference_options: ParquetSchemaInferenceOptions,
predicate: Option<Expr>,
predicates: Option<Vec<ExprRef>>,
}
use parquet2::read::decompress;

Expand Down Expand Up @@ -99,14 +99,23 @@ pub(crate) fn build_row_ranges(
limit: Option<usize>,
row_start_offset: usize,
row_groups: Option<&[i64]>,
predicate: Option<&Expr>,
predicates: Option<&[ExprRef]>,
schema: &Schema,
metadata: &parquet2::metadata::FileMetaData,
uri: &str,
) -> super::Result<Vec<RowGroupRange>> {
let limit = limit.map(|v| v as i64);
let mut row_ranges = vec![];
let mut curr_row_index = 0;

let folded_expr = predicates.map(|preds| {
preds
.iter()
.cloned()
.reduce(|a, b| a.and(&b).into())
.expect("should have at least 1 expr")
});

if let Some(row_groups) = row_groups {
let mut rows_to_add: i64 = limit.unwrap_or(i64::MAX);
for i in row_groups {
Expand All @@ -122,11 +131,12 @@ pub(crate) fn build_row_ranges(
break;
}
let rg = metadata.row_groups.get(i).unwrap();
if let Some(pred) = predicate {
if let Some(ref pred) = folded_expr {
let stats = statistics::row_group_metadata_to_table_stats(rg, schema)
.with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu {
path: uri.to_string(),
})?;

let evaled = stats.eval_expression(pred).with_context(|_| {
UnableToRunExpressionOnStatsSnafu {
path: uri.to_string(),
Expand All @@ -153,7 +163,7 @@ pub(crate) fn build_row_ranges(
curr_row_index += rg.num_rows();
continue;
} else if rows_to_add > 0 {
if let Some(pred) = predicate {
if let Some(ref pred) = folded_expr {
let stats = statistics::row_group_metadata_to_table_stats(rg, schema)
.with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu {
path: uri.to_string(),
Expand Down Expand Up @@ -203,7 +213,7 @@ impl ParquetReaderBuilder {
limit: None,
row_groups: None,
schema_inference_options: Default::default(),
predicate: None,
predicates: None,
})
}

Expand Down Expand Up @@ -259,8 +269,9 @@ impl ParquetReaderBuilder {
self
}

pub fn set_filter(mut self, predicate: Expr) -> Self {
self.predicate = Some(predicate);
pub fn set_filter(mut self, predicates: Vec<ExprRef>) -> Self {
assert_eq!(self.limit, None);
self.predicates = Some(predicates);
self
}

Expand All @@ -276,13 +287,13 @@ impl ParquetReaderBuilder {
.fields
.retain(|f| names_to_keep.contains(f.name.as_str()));
}
// DONT UNWRAP
// TODO: DONT UNWRAP
let daft_schema = Schema::try_from(&arrow_schema).unwrap();
let row_ranges = build_row_ranges(
self.limit,
self.row_start_offset,
self.row_groups.as_deref(),
self.predicate.as_ref(),
self.predicates.as_deref(),
&daft_schema,
&self.metadata,
&self.uri,
Expand Down
7 changes: 7 additions & 0 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod pylib {
ffi::field_to_py,
python::{datatype::PyTimeUnit, schema::PySchema, PySeries},
};
use daft_dsl::python::PyExpr;
use daft_io::{get_io_client, python::IOConfig, IOStatsContext};
use daft_table::python::PyTable;
use pyo3::{pyfunction, types::PyModule, PyResult, Python};
Expand All @@ -21,6 +22,7 @@ pub mod pylib {
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<i64>>,
predicates: Option<Vec<PyExpr>>,
io_config: Option<IOConfig>,
multithreaded_io: Option<bool>,
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
Expand All @@ -43,6 +45,8 @@ pub mod pylib {
start_offset,
num_rows,
row_groups,
predicates
.map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::<Vec<_>>()),
io_client,
Some(io_stats.clone()),
runtime_handle,
Expand Down Expand Up @@ -127,6 +131,7 @@ pub mod pylib {
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
predicates: Option<Vec<PyExpr>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
Expand All @@ -150,6 +155,8 @@ pub mod pylib {
start_offset,
num_rows,
row_groups,
predicates
.map(|e_vec| e_vec.into_iter().map(|e| e.expr.into()).collect::<Vec<_>>()),
io_client,
Some(io_stats),
num_parallel_tasks.unwrap_or(128) as usize,
Expand Down
Loading

0 comments on commit bbd52e0

Please sign in to comment.