From 813c0aef8c1369a8cdaa8795b004b9640d67126d Mon Sep 17 00:00:00 2001 From: Andrew Gazelka Date: Tue, 19 Nov 2024 23:15:27 -0800 Subject: [PATCH] [FEAT] connect: add `df.filter` --- .../src/translation/logical_plan.rs | 6 ++++- .../src/translation/logical_plan/filter.rs | 25 +++++++++++++++++++ tests/connect/test_filter.py | 19 ++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 src/daft-connect/src/translation/logical_plan/filter.rs create mode 100644 tests/connect/test_filter.py diff --git a/src/daft-connect/src/translation/logical_plan.rs b/src/daft-connect/src/translation/logical_plan.rs index 947e0cd0d3..1d4d1809f0 100644 --- a/src/daft-connect/src/translation/logical_plan.rs +++ b/src/daft-connect/src/translation/logical_plan.rs @@ -3,9 +3,12 @@ use eyre::{bail, Context}; use spark_connect::{relation::RelType, Relation}; use tracing::warn; -use crate::translation::logical_plan::{aggregate::aggregate, project::project, range::range}; +use crate::translation::logical_plan::{ + aggregate::aggregate, filter::filter, project::project, range::range, +}; mod aggregate; +mod filter; mod project; mod range; @@ -21,6 +24,7 @@ pub fn to_logical_plan(relation: Relation) -> eyre::Result { match rel_type { RelType::Range(r) => range(r).wrap_err("Failed to apply range to logical plan"), RelType::Project(p) => project(*p).wrap_err("Failed to apply project to logical plan"), + RelType::Filter(f) => filter(*f).wrap_err("Failed to apply filter to logical plan"), RelType::Aggregate(a) => { aggregate(*a).wrap_err("Failed to apply aggregate to logical plan") } diff --git a/src/daft-connect/src/translation/logical_plan/filter.rs b/src/daft-connect/src/translation/logical_plan/filter.rs new file mode 100644 index 0000000000..2a5df304f2 --- /dev/null +++ b/src/daft-connect/src/translation/logical_plan/filter.rs @@ -0,0 +1,25 @@ +use eyre::bail; + +use crate::translation::{to_daft_expr, to_logical_plan}; + +pub fn filter( + filter: spark_connect::Filter, +) -> eyre::Result { + let spark_connect::Filter { input, condition } = filter; + + let Some(input) = input else { + bail!("input is required"); + }; + + let Some(condition) = condition else { + bail!("condition is required"); + }; + + let condition = to_daft_expr(&condition)?; + + let plan = to_logical_plan(*input)?; + + let plan = plan.filter(condition)?; + + Ok(plan) +} diff --git a/tests/connect/test_filter.py b/tests/connect/test_filter.py new file mode 100644 index 0000000000..1586c7e7b5 --- /dev/null +++ b/tests/connect/test_filter.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from pyspark.sql.functions import col + + +def test_filter(spark_session): + # Create DataFrame from range(10) + df = spark_session.range(10) + + # Filter for values less than 5 + df_filtered = df.filter(col("id") < 5) + + # Verify the schema is unchanged after filter + assert df_filtered.schema == df.schema, "Schema should be unchanged after filter" + + # Verify the filtered data is correct + df_filtered_pandas = df_filtered.toPandas() + assert len(df_filtered_pandas) == 5, "Should have 5 rows after filtering < 5" + assert all(df_filtered_pandas["id"] < 5), "All values should be less than 5"