diff --git a/src/daft-connect/src/translation/logical_plan.rs b/src/daft-connect/src/translation/logical_plan.rs index 0af833c54d..3742326430 100644 --- a/src/daft-connect/src/translation/logical_plan.rs +++ b/src/daft-connect/src/translation/logical_plan.rs @@ -5,11 +5,12 @@ use spark_connect::{relation::RelType, Limit, Relation}; use tracing::warn; use crate::translation::logical_plan::{ - aggregate::aggregate, local_relation::local_relation, project::project, range::range, - to_df::to_df, with_columns::with_columns, + aggregate::aggregate, filter::filter, local_relation::local_relation, project::project, + range::range, to_df::to_df, with_columns::with_columns, }; mod aggregate; +mod filter; mod local_relation; mod project; mod range; @@ -54,6 +55,7 @@ pub fn to_logical_plan(relation: Relation) -> eyre::Result { RelType::Limit(l) => limit(*l).wrap_err("Failed to apply limit to logical plan"), RelType::Range(r) => range(r).wrap_err("Failed to apply range to logical plan"), RelType::Project(p) => project(*p).wrap_err("Failed to apply project to logical plan"), + RelType::Filter(f) => filter(*f).wrap_err("Failed to apply filter to logical plan"), RelType::Aggregate(a) => { aggregate(*a).wrap_err("Failed to apply aggregate to logical plan") } diff --git a/src/daft-connect/src/translation/logical_plan/filter.rs b/src/daft-connect/src/translation/logical_plan/filter.rs new file mode 100644 index 0000000000..7efdbb1629 --- /dev/null +++ b/src/daft-connect/src/translation/logical_plan/filter.rs @@ -0,0 +1,22 @@ +use eyre::bail; + +use crate::translation::{to_daft_expr, to_logical_plan, Plan}; + +pub fn filter(filter: spark_connect::Filter) -> eyre::Result { + let spark_connect::Filter { input, condition } = filter; + + let Some(input) = input else { + bail!("input is required"); + }; + + let Some(condition) = condition else { + bail!("condition is required"); + }; + + let condition = to_daft_expr(&condition)?; + + let mut plan = to_logical_plan(*input)?; + plan.builder = plan.builder.filter(condition)?; + + Ok(plan) +} diff --git a/tests/connect/test_filter.py b/tests/connect/test_filter.py new file mode 100644 index 0000000000..1586c7e7b5 --- /dev/null +++ b/tests/connect/test_filter.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from pyspark.sql.functions import col + + +def test_filter(spark_session): + # Create DataFrame from range(10) + df = spark_session.range(10) + + # Filter for values less than 5 + df_filtered = df.filter(col("id") < 5) + + # Verify the schema is unchanged after filter + assert df_filtered.schema == df.schema, "Schema should be unchanged after filter" + + # Verify the filtered data is correct + df_filtered_pandas = df_filtered.toPandas() + assert len(df_filtered_pandas) == 5, "Should have 5 rows after filtering < 5" + assert all(df_filtered_pandas["id"] < 5), "All values should be less than 5"