-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Support intersect as a DataFrame API #3134
Merged
universalmind303
merged 2 commits into
Eventual-Inc:main
from
advancedxy:intersect_operation
Nov 12, 2024
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
use std::sync::Arc; | ||
|
||
use common_error::DaftError; | ||
use daft_core::join::JoinType; | ||
use daft_dsl::col; | ||
use snafu::ResultExt; | ||
|
||
use crate::{logical_plan, logical_plan::CreationSnafu, LogicalPlan}; | ||
|
||
#[derive(Clone, Debug, PartialEq, Eq, Hash)] | ||
pub struct Intersect { | ||
// Upstream nodes. | ||
pub lhs: Arc<LogicalPlan>, | ||
pub rhs: Arc<LogicalPlan>, | ||
pub is_all: bool, | ||
} | ||
|
||
impl Intersect { | ||
pub(crate) fn try_new( | ||
lhs: Arc<LogicalPlan>, | ||
rhs: Arc<LogicalPlan>, | ||
is_all: bool, | ||
) -> logical_plan::Result<Self> { | ||
let lhs_schema = lhs.schema(); | ||
let rhs_schema = rhs.schema(); | ||
if lhs_schema.len() != rhs_schema.len() { | ||
return Err(DaftError::SchemaMismatch(format!( | ||
"Both plans must have the same num of fields to intersect, \ | ||
but got[lhs: {} v.s rhs: {}], lhs schema: {}, rhs schema: {}", | ||
lhs_schema.len(), | ||
rhs_schema.len(), | ||
lhs_schema, | ||
rhs_schema | ||
))) | ||
.context(CreationSnafu); | ||
} | ||
// lhs and rhs should have the same type for each field to intersect | ||
if lhs_schema | ||
.fields | ||
.values() | ||
.zip(rhs_schema.fields.values()) | ||
.any(|(l, r)| l.dtype != r.dtype) | ||
{ | ||
return Err(DaftError::SchemaMismatch(format!( | ||
"Both plans' schemas should have the same type for each field to intersect, \ | ||
but got lhs schema: {}, rhs schema: {}", | ||
lhs_schema, rhs_schema | ||
))) | ||
.context(CreationSnafu); | ||
} | ||
Ok(Self { lhs, rhs, is_all }) | ||
} | ||
|
||
/// intersect distinct could be represented as a semi join + distinct | ||
/// the following intersect operator: | ||
/// ```sql | ||
/// select a1, a2 from t1 intersect select b1, b2 from t2 | ||
/// ``` | ||
/// is the same as: | ||
/// ```sql | ||
/// select distinct a1, a2 from t1 left semi join t2 | ||
/// on t1.a1 <> t2.b1 and t1.a2 <> t2.b2 | ||
/// ``` | ||
/// TODO: Move this logical to logical optimization rules | ||
pub(crate) fn to_optimized_join(&self) -> logical_plan::Result<LogicalPlan> { | ||
if self.is_all { | ||
Err(logical_plan::Error::CreationError { | ||
source: DaftError::InternalError("intersect all is not supported yet".to_string()), | ||
}) | ||
} else { | ||
let left_on = self | ||
.lhs | ||
.schema() | ||
.fields | ||
.keys() | ||
.map(|k| col(k.clone())) | ||
.collect(); | ||
let right_on = self | ||
.rhs | ||
.schema() | ||
.fields | ||
.keys() | ||
.map(|k| col(k.clone())) | ||
.collect(); | ||
let join = logical_plan::Join::try_new( | ||
self.lhs.clone(), | ||
self.rhs.clone(), | ||
left_on, | ||
right_on, | ||
Some(vec![true; self.lhs.schema().fields.len()]), | ||
JoinType::Semi, | ||
None, | ||
None, | ||
None, | ||
); | ||
join.map(|j| logical_plan::Distinct::new(j.into()).into()) | ||
} | ||
} | ||
|
||
pub fn multiline_display(&self) -> Vec<String> { | ||
let mut res = vec![]; | ||
if self.is_all { | ||
res.push("Intersect All:".to_string()); | ||
} else { | ||
res.push("Intersect:".to_string()); | ||
} | ||
res | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
from __future__ import annotations | ||
|
||
import daft | ||
from daft import col | ||
|
||
|
||
def test_simple_intersect(make_df): | ||
df1 = make_df({"foo": [1, 2, 3]}) | ||
df2 = make_df({"bar": [2, 3, 4]}) | ||
result = df1.intersect(df2) | ||
assert result.to_pydict() == {"foo": [2, 3]} | ||
|
||
|
||
def test_intersect_with_duplicate(make_df): | ||
df1 = make_df({"foo": [1, 2, 2, 3]}) | ||
df2 = make_df({"bar": [2, 3, 3]}) | ||
result = df1.intersect(df2) | ||
assert result.to_pydict() == {"foo": [2, 3]} | ||
|
||
|
||
def test_self_intersect(make_df): | ||
df = make_df({"foo": [1, 2, 3]}) | ||
result = df.intersect(df).sort(by="foo") | ||
assert result.to_pydict() == {"foo": [1, 2, 3]} | ||
|
||
|
||
def test_intersect_empty(make_df): | ||
df1 = make_df({"foo": [1, 2, 3]}) | ||
df2 = make_df({"bar": []}).select(col("bar").cast(daft.DataType.int64())) | ||
result = df1.intersect(df2) | ||
assert result.to_pydict() == {"foo": []} | ||
|
||
|
||
def test_intersect_with_nulls(make_df): | ||
df1 = make_df({"foo": [1, 2, None]}) | ||
df1_without_mull = make_df({"foo": [1, 2]}) | ||
df2 = make_df({"bar": [2, 3, None]}) | ||
df2_without_null = make_df({"bar": [2, 3]}) | ||
|
||
result = df1.intersect(df2) | ||
assert result.to_pydict() == {"foo": [2, None]} | ||
|
||
result = df1_without_mull.intersect(df2) | ||
assert result.to_pydict() == {"foo": [2]} | ||
|
||
result = df1.intersect(df2_without_null) | ||
assert result.to_pydict() == {"foo": [2]} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of immediately converting it to a join, I think we should instead defer this until
translate
stage.If we want to have a concept of a logical
Intersect
as discussed here, then I don't think we should immediately convert it to a logical join, but only convert it once to a physical join during the logical -> physical translation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's definitely an option. And DuckDB does that indeed: https://github.com/duckdb/duckdb/blob/a2dce8b1c9fa6039c82e9a32bfcc4c49b03ca871/src/execution/physical_plan/plan_set_operation.cpp#L53. The problem is like you stated in #3241 (comment), it will make the optimization rules more complex: a lot of rules should be amend to be set-operations aware.
On the other side, Spark doesn't defer this to
translate
stage, the intersect/except operators are optimized during the optimization phase, which makes the optimization phase simpler(at least for set-operation handling).From my perspective, I agree with you that we should convert set operations to join early to avoid complexing optimization phase currently. I just want to decouple the logical into a separate func/struct and make sure it's extensible for long term plan. I can avoid define these operations if that doesn't sound right to you. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, Lets go with what's in here now. We can always revisit it needed 😅