Skip to content

Commit

Permalink
WIP row group prunning
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 5, 2023
1 parent 08702a3 commit 6bf9417
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/daft-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ async-stream = {workspace = true}
bytes = {workspace = true}
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
Expand All @@ -24,7 +25,7 @@ tokio-util = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python"]
python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python", "daft-dsl/python"]

[package]
edition = {workspace = true}
Expand Down
92 changes: 71 additions & 21 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::{collections::HashSet, sync::Arc};

use arrow2::io::parquet::read::schema::infer_schema_with_options;
use common_error::DaftResult;
use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_dsl::Expr;
use daft_io::{IOClient, IOStatsRef};
use daft_stats::TruthValue;
use daft_table::Table;
use futures::{future::try_join_all, StreamExt};
use parquet2::{
Expand All @@ -17,8 +19,9 @@ use crate::{
metadata::read_parquet_metadata,
read::ParquetSchemaInferenceOptions,
read_planner::{CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass},
JoinSnafu, OneShotRecvSnafu, UnableToCreateParquetPageStreamSnafu,
UnableToParseSchemaFromMetadataSnafu,
statistics, JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu,
UnableToCreateParquetPageStreamSnafu, UnableToParseSchemaFromMetadataSnafu,
UnableToRunExpressionOnStatsSnafu,
};
use arrow2::io::parquet::read::column_iter_to_arrays;

Expand All @@ -27,9 +30,10 @@ pub(crate) struct ParquetReaderBuilder {
pub metadata: parquet2::metadata::FileMetaData,
selected_columns: Option<HashSet<String>>,
row_start_offset: usize,
num_rows: usize,
limit: Option<usize>,
row_groups: Option<Vec<i64>>,
schema_inference_options: ParquetSchemaInferenceOptions,
predicate: Option<Expr>,
}
use parquet2::read::decompress;

Expand Down Expand Up @@ -92,16 +96,19 @@ where
}

pub(crate) fn build_row_ranges(
num_rows: usize,
limit: Option<usize>,
row_start_offset: usize,
row_groups: Option<&[i64]>,
predicate: Option<&Expr>,
schema: &Schema,
metadata: &parquet2::metadata::FileMetaData,
uri: &str,
) -> super::Result<Vec<RowGroupRange>> {
let limit = limit.map(|v| v as i64);
let mut row_ranges = vec![];
let mut curr_row_index = 0;
let mut rows_to_add = num_rows;
if let Some(row_groups) = row_groups {
let mut rows_to_add: i64 = limit.unwrap_or(i64::MAX);
for i in row_groups {
let i = *i as usize;
if !(0..metadata.row_groups.len()).contains(&i) {
Expand All @@ -111,26 +118,62 @@ pub(crate) fn build_row_ranges(
total_row_groups: metadata.row_groups.len() as i64,
});
}
if rows_to_add <= 0 {
break;
}
let rg = metadata.row_groups.get(i).unwrap();
if let Some(pred) = predicate {
let stats = statistics::row_group_metadata_to_table_stats(rg, schema)
.with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu {
path: uri.to_string(),
})?;
let evaled = stats.eval_expression(pred).with_context(|_| {
UnableToRunExpressionOnStatsSnafu {
path: uri.to_string(),
}
})?;
if evaled.to_truth_value() == TruthValue::False {
continue;
}
}

let range_to_add = RowGroupRange {
row_group_index: i,
start: 0,
num_rows: rg.num_rows(),
num_rows: rg.num_rows().min(rows_to_add as usize),
};
rows_to_add = rows_to_add.saturating_sub((rg.num_rows() as i64).min(rows_to_add));
row_ranges.push(range_to_add);
}
} else {
let mut rows_to_add = limit.unwrap_or(metadata.num_rows as i64);

for (i, rg) in metadata.row_groups.iter().enumerate() {
if (curr_row_index + rg.num_rows()) < row_start_offset {
curr_row_index += rg.num_rows();
continue;
} else if rows_to_add > 0 {
if let Some(pred) = predicate {
let stats = statistics::row_group_metadata_to_table_stats(rg, schema)
.with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu {
path: uri.to_string(),
})?;
let evaled = stats.eval_expression(pred).with_context(|_| {
UnableToRunExpressionOnStatsSnafu {
path: uri.to_string(),
}
})?;
if evaled.to_truth_value() == TruthValue::False {
curr_row_index += rg.num_rows();
continue;
}
}
let range_to_add = RowGroupRange {
row_group_index: i,
start: row_start_offset.saturating_sub(curr_row_index),
num_rows: rg.num_rows().min(rows_to_add),
num_rows: rg.num_rows().min(rows_to_add as usize),
};
rows_to_add = rows_to_add.saturating_sub(rg.num_rows().min(rows_to_add));
rows_to_add = rows_to_add.saturating_sub((rg.num_rows() as i64).min(rows_to_add));
row_ranges.push(range_to_add);
} else {
break;
Expand All @@ -152,15 +195,15 @@ impl ParquetReaderBuilder {
.single_url_get_size(uri.into(), io_stats.clone())
.await?;
let metadata = read_parquet_metadata(uri, size, io_client, io_stats).await?;
let num_rows = metadata.num_rows;
Ok(ParquetReaderBuilder {
uri: uri.into(),
metadata,
selected_columns: None,
row_start_offset: 0,
num_rows,
limit: None,
row_groups: None,
schema_inference_options: Default::default(),
predicate: None,
})
}

Expand Down Expand Up @@ -201,9 +244,8 @@ impl ParquetReaderBuilder {
num_rows: Option<usize>,
) -> super::Result<Self> {
let start_offset = start_offset.unwrap_or(0);
let num_rows = num_rows.unwrap_or(self.metadata.num_rows.saturating_sub(start_offset));
self.row_start_offset = start_offset;
self.num_rows = num_rows;
self.limit = num_rows;
Ok(self)
}

Expand All @@ -217,15 +259,12 @@ impl ParquetReaderBuilder {
self
}

pub fn build(self) -> super::Result<ParquetFileReader> {
let row_ranges = build_row_ranges(
self.num_rows,
self.row_start_offset,
self.row_groups.as_deref(),
&self.metadata,
&self.uri,
)?;
pub fn set_filter(mut self, predicate: Expr) -> Self {
self.predicate = Some(predicate);
self
}

pub fn build(self) -> super::Result<ParquetFileReader> {
let mut arrow_schema =
infer_schema_with_options(&self.metadata, &Some(self.schema_inference_options.into()))
.context(UnableToParseSchemaFromMetadataSnafu::<String> {
Expand All @@ -237,6 +276,17 @@ impl ParquetReaderBuilder {
.fields
.retain(|f| names_to_keep.contains(f.name.as_str()));
}
// DONT UNWRAP
let daft_schema = Schema::try_from(&arrow_schema).unwrap();
let row_ranges = build_row_ranges(
self.limit,
self.row_start_offset,
self.row_groups.as_deref(),
self.predicate.as_ref(),
&daft_schema,
&self.metadata,
&self.uri,
)?;

ParquetFileReader::new(self.uri, self.metadata, arrow_schema, row_ranges)
}
Expand Down
15 changes: 15 additions & 0 deletions src/daft-parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ pub enum Error {
read_columns: usize,
},

#[snafu(display(
"Parquet file: {} unable to convert row group metadata to stats\nDetails:\n{source}",
path,
))]
UnableToConvertRowGroupMetadataToStats { path: String, source: DaftError },

#[snafu(display(
"Parquet file: {} unable to evaluate predicate on stats\nDetails:\n{source}",
path,
))]
UnableToRunExpressionOnStats {
path: String,
source: daft_stats::Error,
},

#[snafu(display("Error joining spawned task: {} for path: {}", source, path))]
JoinError {
path: String,
Expand Down
13 changes: 9 additions & 4 deletions src/daft-parquet/src/stream_reader.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use core::num;
use std::{collections::HashSet, fs::File};

use arrow2::io::parquet::read;
use common_error::DaftResult;
use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_table::Table;
use itertools::Itertools;
use rayon::prelude::{IndexedParallelIterator, IntoParallelIterator, ParallelBridge};
Expand Down Expand Up @@ -90,14 +91,18 @@ pub(crate) fn local_parquet_read_into_arrow(
path: uri.to_string(),
})?;
let schema = prune_fields_from_schema(schema, columns, uri)?;
let daft_schema = Schema::try_from(&schema).unwrap();
let chunk_size = 128 * 1024;
let expected_rows = metadata.num_rows.min(num_rows.unwrap_or(metadata.num_rows));
let max_rows = metadata.num_rows.min(num_rows.unwrap_or(metadata.num_rows));

let num_expected_arrays = f32::ceil(expected_rows as f32 / chunk_size as f32) as usize;
let num_expected_arrays = f32::ceil(max_rows as f32 / chunk_size as f32) as usize;
let row_ranges = build_row_ranges(
expected_rows,
num_rows,
start_offset.unwrap_or(0),
row_groups,
// TODO THREAD IN PREDICATES
None,
&daft_schema,
&metadata,
uri,
)?;
Expand Down

0 comments on commit 6bf9417

Please sign in to comment.