Skip to content

Commit

Permalink
feat: Implement LeftMark join to fix subquery correctness issue (apac…
Browse files Browse the repository at this point in the history
…he#13134)

* Implement LeftMark join

In apache#12945 the emulation of an
mark join has a bug when there is duplicate values in the subquery. This
would be fixable by adding a distinct before the join. But this patch
instead implements a LeftMark join with the desired semantics and uses
that. The LeftMark join will return a row for each in the left input
with an additional column "mark" that is true if there was a match in
the right input and false otherwise.

Note: This patch does not implement the full null semantics for the mark
join described in
http://btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf
which which will be needed if we and `ANY` subqueries. The version is
this patch the mark column will only be true for had a match and false
when no match was found, never `null`.

* Use mark join in decorrelate subqueries

This fixes a correctness issue in the current approach.

* Add physical plan sqllogictest

* fmt

* Fix join type in doc comment

* Minor clean ups

* Add more documentation to LeftMark join

* Remove qualification

* fix doc

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
eejbyfeldt and alamb authored Oct 31, 2024
1 parent 7ae1ccb commit 2047d7f
Show file tree
Hide file tree
Showing 33 changed files with 592 additions and 195 deletions.
2 changes: 1 addition & 1 deletion datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl FunctionalDependencies {
left_func_dependencies.extend(right_func_dependencies);
left_func_dependencies
}
JoinType::LeftSemi | JoinType::LeftAnti => {
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
// These joins preserve functional dependencies of the left side:
left_func_dependencies
}
Expand Down
21 changes: 21 additions & 0 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ pub enum JoinType {
LeftAnti,
/// Right Anti Join
RightAnti,
/// Left Mark join
///
/// Returns one record for each record from the left input. The output contains an additional
/// column "mark" which is true if there is at least one match in the right input where the
/// join condition evaluates to true. Otherwise, the mark column is false. For more details see
/// [1]. This join type is used to decorrelate EXISTS subqueries used inside disjunctive
/// predicates.
///
/// Note: This we currently do not implement the full null semantics for the mark join described
/// in [1] which will be needed if we and ANY subqueries. In our version the mark column will
/// only be true for had a match and false when no match was found, never null.
///
/// [1]: http://btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf
LeftMark,
}

impl JoinType {
Expand All @@ -63,6 +77,7 @@ impl Display for JoinType {
JoinType::RightSemi => "RightSemi",
JoinType::LeftAnti => "LeftAnti",
JoinType::RightAnti => "RightAnti",
JoinType::LeftMark => "LeftMark",
};
write!(f, "{join_type}")
}
Expand All @@ -82,6 +97,7 @@ impl FromStr for JoinType {
"RIGHTSEMI" => Ok(JoinType::RightSemi),
"LEFTANTI" => Ok(JoinType::LeftAnti),
"RIGHTANTI" => Ok(JoinType::RightAnti),
"LEFTMARK" => Ok(JoinType::LeftMark),
_ => _not_impl_err!("The join type {s} does not exist or is not implemented"),
}
}
Expand All @@ -101,6 +117,7 @@ impl Display for JoinSide {
match self {
JoinSide::Left => write!(f, "left"),
JoinSide::Right => write!(f, "right"),
JoinSide::None => write!(f, "none"),
}
}
}
Expand All @@ -113,6 +130,9 @@ pub enum JoinSide {
Left,
/// Right side of the join
Right,
/// Neither side of the join, used for Mark joins where the mark column does not belong to
/// either side of the join
None,
}

impl JoinSide {
Expand All @@ -121,6 +141,7 @@ impl JoinSide {
match self {
JoinSide::Left => JoinSide::Right,
JoinSide::Right => JoinSide::Left,
JoinSide::None => JoinSide::None,
}
}
}
6 changes: 5 additions & 1 deletion datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3864,6 +3864,7 @@ mod tests {
JoinType::RightSemi,
JoinType::LeftAnti,
JoinType::RightAnti,
JoinType::LeftMark,
];

let default_partition_count = SessionConfig::new().target_partitions();
Expand All @@ -3881,7 +3882,10 @@ mod tests {
let join_schema = physical_plan.schema();

match join_type {
JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark => {
let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("c1", &join_schema)?),
Arc::new(Column::new_with_schema("c2", &join_schema)?),
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ fn adjust_input_keys_ordering(
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::Full => vec![],
| JoinType::Full
| JoinType::LeftMark => vec![],
};
}
PartitionMode::Auto => {
Expand Down Expand Up @@ -1959,6 +1960,7 @@ pub(crate) mod tests {
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightSemi,
JoinType::RightAnti,
];
Expand All @@ -1981,7 +1983,8 @@ pub(crate) mod tests {
| JoinType::Right
| JoinType::Full
| JoinType::LeftSemi
| JoinType::LeftAnti => {
| JoinType::LeftAnti
| JoinType::LeftMark => {
// Join on (a == c)
let top_join_on = vec![(
Arc::new(Column::new_with_schema("a", &join.schema()).unwrap())
Expand All @@ -1999,7 +2002,7 @@ pub(crate) mod tests {

let expected = match join_type {
// Should include 3 RepartitionExecs
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => vec![
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
Expand Down Expand Up @@ -2098,7 +2101,7 @@ pub(crate) mod tests {
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
}
JoinType::LeftSemi | JoinType::LeftAnti => {}
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {}
}
}

Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ fn swap_join_type(join_type: JoinType) -> JoinType {
JoinType::RightSemi => JoinType::LeftSemi,
JoinType::LeftAnti => JoinType::RightAnti,
JoinType::RightAnti => JoinType::LeftAnti,
JoinType::LeftMark => {
unreachable!("LeftMark join type does not support swapping")
}
}
}

Expand Down Expand Up @@ -573,6 +576,7 @@ fn hash_join_convert_symmetric_subrule(
hash_join.right().equivalence_properties(),
hash_join.right().schema(),
),
JoinSide::None => return false,
};

let name = schema.field(*index).name();
Expand All @@ -588,6 +592,7 @@ fn hash_join_convert_symmetric_subrule(
match side {
JoinSide::Left => hash_join.left().output_ordering(),
JoinSide::Right => hash_join.right().output_ordering(),
JoinSide::None => unreachable!(),
}
.map(|p| p.to_vec())
})
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ fn try_pushdown_requirements_to_join(
return Ok(None);
}
}
JoinSide::None => return Ok(None),
};
let join_type = smj.join_type();
let probe_side = SortMergeJoinExec::probe_side(&join_type);
Expand All @@ -410,6 +411,7 @@ fn try_pushdown_requirements_to_join(
JoinSide::Right => {
required_input_ordering[1] = new_req;
}
JoinSide::None => unreachable!(),
}
required_input_ordering
}))
Expand All @@ -421,7 +423,11 @@ fn expr_source_side(
left_columns_len: usize,
) -> Option<JoinSide> {
match join_type {
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
JoinType::Inner
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::LeftMark => {
let all_column_sides = required_exprs
.iter()
.filter_map(|r| {
Expand Down
24 changes: 24 additions & 0 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,30 @@ async fn test_anti_join_1k_filtered() {
.await
}

#[tokio::test]
async fn test_left_mark_join_1k() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::LeftMark,
None,
)
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
.await
}

#[tokio::test]
async fn test_left_mark_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::LeftMark,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
.await
}

type JoinFilterBuilder = Box<dyn Fn(Arc<Schema>, Arc<Schema>) -> JoinFilter>;

struct JoinFuzzTestCase {
Expand Down
24 changes: 24 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::any::Any;
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::iter::once;
use std::sync::Arc;

use crate::dml::CopyTo;
Expand Down Expand Up @@ -1326,6 +1327,25 @@ pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
})
.collect()
}

fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
let mut table_references = schema
.iter()
.filter_map(|(qualifier, _)| qualifier)
.collect::<Vec<_>>();
table_references.dedup();
let table_reference = if table_references.len() == 1 {
table_references.pop().cloned()
} else {
None
};

(
table_reference,
Arc::new(Field::new("mark", DataType::Boolean, false)),
)
}

/// Creates a schema for a join operation.
/// The fields from the left side are first
pub fn build_join_schema(
Expand Down Expand Up @@ -1392,6 +1412,10 @@ pub fn build_join_schema(
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
.collect()
}
JoinType::LeftMark => left_fields
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
.chain(once(mark_field(right)))
.collect(),
JoinType::RightSemi | JoinType::RightAnti => {
// Only use the right side for the schema
right_fields
Expand Down
8 changes: 6 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,9 @@ impl LogicalPlan {
left.head_output_expr()
}
}
JoinType::LeftSemi | JoinType::LeftAnti => left.head_output_expr(),
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
left.head_output_expr()
}
JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
},
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
Expand Down Expand Up @@ -1290,7 +1292,9 @@ impl LogicalPlan {
_ => None,
}
}
JoinType::LeftSemi | JoinType::LeftAnti => left.max_rows(),
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
left.max_rows()
}
JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
},
LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
Expand Down
5 changes: 4 additions & 1 deletion datafusion/optimizer/src/analyzer/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Re
})?;
Ok(())
}
JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark => {
check_inner_plan(left, can_contain_outer_ref)?;
check_inner_plan(right, false)
}
Expand Down
Loading

0 comments on commit 2047d7f

Please sign in to comment.