Skip to content

Commit

Permalink
Fix partition filter pushdowns.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Feb 20, 2024
1 parent 3ad0ed1 commit 932a5a5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 52 deletions.
10 changes: 5 additions & 5 deletions src/daft-plan/src/optimization/rules/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl OptimizerRule for PushDownFilter {
return Ok(Transformed::No(plan));
}
let new_predicate = external_info.pushdowns().filters.as_ref().map(|f| predicate.and(f)).unwrap_or(predicate.clone());
let PredicateGroups { identity_partition_filter, non_identity_partition_filter, partition_and_data_filter, data_only_filter } = if let ExternalInfo::Scan(ScanExternalInfo {scan_op, ..}) = &external_info {
let PredicateGroups { partition_filter, non_identity_partition_filter_for_data, partition_and_data_filter, data_only_filter } = if let ExternalInfo::Scan(ScanExternalInfo {scan_op, ..}) = &external_info {
// We split the predicate into four groups:
// 1. Identity transform partition filters, which can then be dropped from the data-level filter.
// 2. Non-identity transform partition filters, which must still be applied on the data.
Expand All @@ -125,18 +125,18 @@ impl OptimizerRule for PushDownFilter {
} else {
PredicateGroups::from_data_only(vec![new_predicate])
};
assert!(identity_partition_filter.len() + non_identity_partition_filter.len() + partition_and_data_filter.len() + data_only_filter.len() > 0);
assert!(partition_filter.len() + non_identity_partition_filter_for_data.len() + partition_and_data_filter.len() + data_only_filter.len() > 0);

if !partition_and_data_filter.is_empty() && identity_partition_filter.is_empty() && non_identity_partition_filter.is_empty() && data_only_filter.is_empty() {
if !partition_and_data_filter.is_empty() && partition_filter.is_empty() && data_only_filter.is_empty() {
// If the filter predicate consists of only expressions that rely on both a partition
// column and a data column, then no pushdown into the scan is possible, so we
// short-circuit.
// TODO(Clark): Support pushing predicates referencing both partition and data columns into the scan.
return Ok(Transformed::No(plan));
}

let data_filter = conjuct(non_identity_partition_filter.clone().into_iter().chain(data_only_filter));
let partition_filter = conjuct(identity_partition_filter.into_iter().chain(non_identity_partition_filter));
let data_filter = conjuct(non_identity_partition_filter_for_data.clone().into_iter().chain(data_only_filter));
let partition_filter = conjuct(partition_filter);
assert!(data_filter.is_some() || partition_filter.is_some());

let new_pushdowns = if let Some(data_filter) = data_filter {
Expand Down
103 changes: 60 additions & 43 deletions src/daft-scan/src/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,22 @@ fn apply_partitioning_expr(expr: Expr, pfield: &PartitionField) -> Option<Expr>

/// Grouping of clauses in a conjunctive predicate around partitioning semantics.
pub struct PredicateGroups {
pub identity_partition_filter: Vec<Expr>,
pub non_identity_partition_filter: Vec<Expr>,
pub partition_filter: Vec<Expr>,
pub non_identity_partition_filter_for_data: Vec<Expr>,
pub partition_and_data_filter: Vec<Expr>,
pub data_only_filter: Vec<Expr>,
}

impl PredicateGroups {
pub fn new(
identity_partition_filter: Vec<Expr>,
non_identity_partition_filter: Vec<Expr>,
partition_filter: Vec<Expr>,
non_identity_partition_filter_for_data: Vec<Expr>,
partition_and_data_filter: Vec<Expr>,
data_only_filter: Vec<Expr>,
) -> Self {
Self {
identity_partition_filter,
non_identity_partition_filter,
partition_filter,
non_identity_partition_filter_for_data,
partition_and_data_filter,
data_only_filter,
}
Expand All @@ -85,6 +85,50 @@ pub fn rewrite_predicate_for_partitioning(
return Ok(PredicateGroups::from_data_only(vec![predicate]));
}

let pfields_map: HashMap<&str, &PartitionField> = pfields
.iter()
.map(|pfield| (pfield.field.name.as_str(), pfield))
.collect();

// Before rewriting predicate for partition filter pushdown, partition predicate clauses into groups that will need
// to be applied at the data level (i.e. any clauses that aren't pure partition predicates with identity
// transformations).
let data_split = split_conjuction(&predicate);
// Predicates that only reference partition columns, but involve non-identity transformations.
let mut non_identity_part_preds_for_data = vec![];
// Predicates that reference both partition columns and data columns.
let mut part_and_data_preds = vec![];
// Predicates that only reference data columns (no partition column references).
let mut data_preds = vec![];
for e in data_split.into_iter() {
let mut all_data_keys = true;
let mut all_part_keys = true;
let mut any_non_identity_part_keys = false;
e.apply(&mut |e| {
if let Expr::Column(col_name) = e {
if let Some(pfield) = pfields_map.get(col_name.as_ref()) {
all_data_keys = false;
if !matches!(pfield.transform, Some(PartitionTransform::Identity) | None) {
any_non_identity_part_keys = true;
}
} else {
all_part_keys = false;
}
}
Ok(VisitRecursion::Continue)
})
.unwrap();

// Push to appropriate vec.
if all_data_keys {
data_preds.push(e.clone());
} else if any_non_identity_part_keys {
non_identity_part_preds_for_data.push(e.clone());
} else if !all_part_keys {
part_and_data_preds.push(e.clone());
}
}

let predicate = unalias(predicate)?;

let source_to_pfield = {
Expand Down Expand Up @@ -177,55 +221,28 @@ pub fn rewrite_predicate_for_partitioning(
}
})?;

let pfields_map: HashMap<&str, &PartitionField> = pfields
.iter()
.map(|pfield| (pfield.field.name.as_str(), pfield))
.collect();

// Filter to predicate clauses that only involve partition columns.
let split = split_conjuction(&with_part_cols);
// Predicates that only involve identity transformations on partition columns.
let mut part_preds_identity = vec![];
// Predicates that only reference partition columns, but involve non-identity transformations.
let mut part_preds_non_identity = vec![];
// Predicates that reference both partition columns and data columns.
let mut part_and_data_preds = vec![];
// Predicates that only reference data columns (no partition column references).
let mut non_part_preds = vec![];
let mut part_preds = vec![];
for e in split.into_iter() {
let mut all_part_keys = true;
let mut all_identity_part_keys = true;
let mut at_least_one_part_key = false;
e.apply(&mut |e| {
if let Expr::Column(col_name) = e {
if let Some(pfield) = pfields_map.get(col_name.as_ref()) {
at_least_one_part_key = true;
if !matches!(pfield.transform, Some(PartitionTransform::Identity) | None) {
all_identity_part_keys = false;
}
} else {
all_part_keys = false;
all_identity_part_keys = false;
}
if let Expr::Column(col_name) = e && !pfields_map.contains_key(col_name.as_ref()) {
all_part_keys = false;
}
Ok(VisitRecursion::Continue)
})
.unwrap();

// Push to appropriate vec.
if all_identity_part_keys {
part_preds_identity.push(e.clone());
} else if all_part_keys {
part_preds_non_identity.push(e.clone());
} else if at_least_one_part_key {
part_and_data_preds.push(e.clone());
} else {
non_part_preds.push(e.clone());
// Push to partition preds vec.
if all_part_keys {
part_preds.push(e.clone());
}
}
Ok(PredicateGroups::new(
part_preds_identity,
part_preds_non_identity,
part_preds,
non_identity_part_preds_for_data,
part_and_data_preds,
non_part_preds,
data_preds,
))
}
3 changes: 1 addition & 2 deletions src/daft-stats/src/table_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl TableStatistics {
}
Ok(TableStatistics { columns })
}

pub fn from_table(table: &Table) -> Self {
let mut columns = IndexMap::with_capacity(table.num_columns());
for name in table.column_names() {
Expand All @@ -46,9 +47,7 @@ impl TableStatistics {
}
TableStatistics { columns }
}
}

impl TableStatistics {
pub fn union(&self, other: &Self) -> crate::Result<Self> {
// maybe use the schema from micropartition instead
let unioned_columns = self
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/iceberg/docker-compose/run-minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ set -ex
if [ $(docker ps -q --filter "name=pyiceberg-minio" --filter "status=running" ) ]; then
echo "Minio backend running"
else
docker-compose -f dev/docker-compose.yml kill
docker-compose -f dev/docker-compose.yml up -d
docker-compose -f docker-compose.yml kill
docker-compose -f docker-compose.yml up -d
while [ -z $(docker ps -q --filter "name=pyiceberg-minio" --filter "status=running" ) ]
do
echo "Waiting for Minio"
Expand Down

0 comments on commit 932a5a5

Please sign in to comment.