Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 committed Nov 13, 2024
1 parent 74640cf commit 1871ff9
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 6 deletions.
3 changes: 2 additions & 1 deletion src/daft-logical-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ impl LogicalPlanBuilder {
}
pub fn union(&self, other: &Self, is_all: bool) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
ops::Union::new(self.plan.clone(), other.plan.clone(), is_all).to_logical_plan()?;
ops::Union::try_new(self.plan.clone(), other.plan.clone(), is_all)?
.to_logical_plan()?;
Ok(self.with_new_plan(logical_plan))
}

Expand Down
2 changes: 1 addition & 1 deletion src/daft-logical-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl LogicalPlan {
Self::Source(_) => panic!("Source nodes don't have children, with_new_children() should never be called for Source ops"),
Self::Concat(_) => Self::Concat(Concat::try_new(input1.clone(), input2.clone()).unwrap()),
Self::Intersect(inner) => Self::Intersect(Intersect::try_new(input1.clone(), input2.clone(), inner.is_all).unwrap()),
Self::Union(inner) => Self::Union(Union::new(input1.clone(), input2.clone(), inner.is_all)),
Self::Union(inner) => Self::Union(Union::try_new(input1.clone(), input2.clone(), inner.is_all).unwrap()),
Self::Join(Join { left_on, right_on, null_equals_nulls, join_type, join_strategy, .. }) => Self::Join(Join::try_new(
input1.clone(),
input2.clone(),
Expand Down
19 changes: 17 additions & 2 deletions src/daft-logical-plan/src/ops/set_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,23 @@ impl Union {
/// > select * from t0 union select * from t1;
/// ```
/// This is valid in Union, but not in Concat
pub(crate) fn new(lhs: Arc<LogicalPlan>, rhs: Arc<LogicalPlan>, is_all: bool) -> Self {
Self { lhs, rhs, is_all }
pub(crate) fn try_new(
lhs: Arc<LogicalPlan>,
rhs: Arc<LogicalPlan>,
is_all: bool,
) -> logical_plan::Result<Self> {
if lhs.schema().len() != rhs.schema().len() {
return Err(DaftError::SchemaMismatch(format!(
"Both plans must have the same num of fields to union, \
but got[lhs: {} v.s rhs: {}], lhs schema: {}, rhs schema: {}",
lhs.schema().len(),
rhs.schema().len(),
lhs.schema(),
rhs.schema()
)))
.context(CreationSnafu);
}
Ok(Self { lhs, rhs, is_all })
}

/// union could be represented as a concat + distinct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ impl PushDownProjection {
Ok(new_plan)
}
LogicalPlan::Union(union) => {
if !union.is_all {
// can not push down past a DISTINCT
return Ok(Transformed::no(plan));
}

// Get required columns from projection and upstream.
let combined_dependencies = plan
.required_columns()
Expand Down
4 changes: 2 additions & 2 deletions src/daft-sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl SQLPlanner {
return left.union(&right, true).map_err(|e| e.into());
}

(Union, SetQuantifier::None) => {
(Union, SetQuantifier::None | SetQuantifier::Distinct) => {
let left = self.plan_query(&make_query(left))?;
let right = self.plan_query(&make_query(right))?;
return left.union(&right, false).map_err(|e| e.into());
Expand All @@ -247,7 +247,7 @@ impl SQLPlanner {
let right = self.plan_query(&make_query(right))?;
return left.intersect(&right, true).map_err(|e| e.into());
}
(Intersect, SetQuantifier::None) => {
(Intersect, SetQuantifier::None | SetQuantifier::Distinct) => {
let left = self.plan_query(&make_query(left))?;
let right = self.plan_query(&make_query(right))?;
return left.intersect(&right, false).map_err(|e| e.into());
Expand Down

0 comments on commit 1871ff9

Please sign in to comment.