Skip to content

Commit

Permalink
add pfilters to iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 16, 2023
1 parent 06c806a commit 62a8332
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 5 deletions.
2 changes: 1 addition & 1 deletion daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def _iceberg_record_to_partition_spec(self, record: Record) -> daft.table.Table
arrays[name] = daft.Series.from_pylist([value], name=name).cast(field_dtype)

if len(arrays) > 0:
return daft.table.LegacyTable.from_pydict(arrays)
return daft.table.Table.from_pydict(arrays)
else:
return None

Expand Down
1 change: 1 addition & 0 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ pub(crate) fn read_parquet_into_micropartition(
)
.into(),
Pushdowns::new(
None,
None,
columns
.map(|cols| Arc::new(cols.iter().map(|v| v.to_string()).collect::<Vec<_>>())),
Expand Down
13 changes: 13 additions & 0 deletions src/daft-plan/src/optimization/rules/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use daft_dsl::{
},
Expr,
};
use daft_scan::{rewrite_predicate_for_partitioning, ScanExternalInfo};

use crate::{
logical_ops::{Concat, Filter, Project, Source},
Expand Down Expand Up @@ -115,8 +116,20 @@ impl OptimizerRule for PushDownFilter {
return Ok(Transformed::No(plan));
}
let new_predicate = external_info.pushdowns().filters.as_ref().map(|f| predicate.and(f)).unwrap_or(predicate.clone());
let partition_filter = if let ExternalInfo::Scan(ScanExternalInfo {scan_op, ..}) = &external_info {
rewrite_predicate_for_partitioning(new_predicate.clone(), scan_op.0.partitioning_keys())?
} else {
None
};
println!("pfilter: {:#?}", partition_filter);
let new_pushdowns =
external_info.pushdowns().with_filters(Some(Arc::new(new_predicate)));

let new_pushdowns = if let Some(pfilter) = partition_filter {
new_pushdowns.with_partition_filters(Some(Arc::new(pfilter)))
} else {
new_pushdowns
};
let new_external_info = external_info.with_pushdowns(new_pushdowns);
let new_source = LogicalPlan::Source(Source::new(
source.output_schema.clone(),
Expand Down
5 changes: 2 additions & 3 deletions src/daft-scan/src/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn apply_partitioning_expr(expr: Expr, tfm: PartitionTransform) -> Option<Expr>
pub fn rewrite_predicate_for_partitioning(
predicate: Expr,
pfields: &[PartitionField],
) -> DaftResult<Vec<Expr>> {
) -> DaftResult<Option<Expr>> {
if pfields.is_empty() {
todo!("no predicate")
}
Expand Down Expand Up @@ -153,6 +153,5 @@ pub fn rewrite_predicate_for_partitioning(
})
.cloned()
.collect::<Vec<_>>();

Ok(filtered)
Ok(conjuct(filtered))
}
22 changes: 21 additions & 1 deletion src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use python::register_modules;
use snafu::Snafu;
use storage_config::StorageConfig;
mod expr_rewriter;
pub use expr_rewriter::rewrite_predicate_for_partitioning;
#[derive(Debug, Snafu)]
pub enum Error {
#[cfg(feature = "python")]
Expand Down Expand Up @@ -451,6 +452,8 @@ impl ScanExternalInfo {
pub struct Pushdowns {
/// Optional filters to apply to the source data.
pub filters: Option<ExprRef>,
/// Optional filters to apply on partitioning keys.
pub partition_filters: Option<ExprRef>,
/// Optional columns to select from the source data.
pub columns: Option<Arc<Vec<String>>>,
/// Optional number of rows to read.
Expand All @@ -459,18 +462,20 @@ pub struct Pushdowns {

impl Default for Pushdowns {
fn default() -> Self {
Self::new(None, None, None)
Self::new(None, None, None, None)
}
}

impl Pushdowns {
pub fn new(
filters: Option<ExprRef>,
partition_filters: Option<ExprRef>,
columns: Option<Arc<Vec<String>>>,
limit: Option<usize>,
) -> Self {
Self {
filters,
partition_filters,
columns,
limit,
}
Expand All @@ -479,6 +484,7 @@ impl Pushdowns {
pub fn with_limit(&self, limit: Option<usize>) -> Self {
Self {
filters: self.filters.clone(),
partition_filters: self.partition_filters.clone(),
columns: self.columns.clone(),
limit,
}
Expand All @@ -487,6 +493,16 @@ impl Pushdowns {
pub fn with_filters(&self, filters: Option<ExprRef>) -> Self {
Self {
filters,
partition_filters: self.partition_filters.clone(),
columns: self.columns.clone(),
limit: self.limit,
}
}

pub fn with_partition_filters(&self, partition_filters: Option<ExprRef>) -> Self {
Self {
filters: self.filters.clone(),
partition_filters,
columns: self.columns.clone(),
limit: self.limit,
}
Expand All @@ -495,6 +511,7 @@ impl Pushdowns {
pub fn with_columns(&self, columns: Option<Arc<Vec<String>>>) -> Self {
Self {
filters: self.filters.clone(),
partition_filters: self.partition_filters.clone(),
columns,
limit: self.limit,
}
Expand All @@ -508,6 +525,9 @@ impl Pushdowns {
if let Some(filters) = &self.filters {
res.push(format!("Filter pushdown = {}", filters));
}
if let Some(pfilters) = &self.partition_filters {
res.push(format!("Partition Filter = {}", pfilters));
}
if let Some(limit) = self.limit {
res.push(format!("Limit pushdown = {}", limit));
}
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/iceberg/test_table_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ def test_daft_iceberg_table_show(table_name, local_iceberg_catalog):
def test_daft_iceberg_table_collect_correct(table_name, local_iceberg_catalog):
tab = local_iceberg_catalog.load_table(f"default.{table_name}")
df = daft.read_iceberg(tab)
df.collect()
daft_pandas = df.to_pandas()
iceberg_pandas = tab.scan().to_arrow().to_pandas()
assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[])


@pytest.mark.integration()
def test_daft_iceberg_table_predicate_pushdown(local_iceberg_catalog):
tab = local_iceberg_catalog.load_table("default.test_partitioned_by_days")
df = daft.read_iceberg(tab)
import ipdb

ipdb.set_trace()
Expand Down

0 comments on commit 62a8332

Please sign in to comment.