From e52d9654fcbfa7e1727a34f69e039853521ace67 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 25 Sep 2024 17:25:05 -0700 Subject: [PATCH] [BUG] Enable groupby with alias for native executor (#2917) Fixed this already for the current translation path back in https://github.com/Eventual-Inc/Daft/pull/2790, doing the same for swordfish --------- Co-authored-by: Colin Ho Co-authored-by: Colin Ho --- src/daft-local-execution/src/pipeline.rs | 28 +++++++++++-------- tests/cookbook/test_aggregations.py | 6 ---- tests/dataframe/test_aggregations.py | 7 +---- .../test_approx_percentiles_aggregations.py | 7 +---- tests/dataframe/test_distinct.py | 6 ---- tests/dataframe/test_map_groups.py | 6 ---- 6 files changed, 19 insertions(+), 41 deletions(-) diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 17146f216d..0f84ac2636 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -7,7 +7,7 @@ use daft_core::{ prelude::{Schema, SchemaRef}, utils::supertype, }; -use daft_dsl::{join::get_common_join_keys, Expr}; +use daft_dsl::{col, join::get_common_join_keys, Expr}; use daft_micropartition::MicroPartition; use daft_physical_plan::{ Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, Project, Sort, @@ -186,17 +186,23 @@ pub fn physical_plan_to_pipeline( }) => { let (first_stage_aggs, second_stage_aggs, final_exprs) = populate_aggregation_stages(aggregations, schema, group_by); - let first_stage_agg_op = AggregateOperator::new( - first_stage_aggs - .values() - .cloned() - .map(|e| Arc::new(Expr::Agg(e.clone()))) - .collect(), - group_by.clone(), - ); let child_node = physical_plan_to_pipeline(input, psets)?; - let post_first_agg_node = - IntermediateNode::new(Arc::new(first_stage_agg_op), vec![child_node]).boxed(); + let (post_first_agg_node, group_by) = if !first_stage_aggs.is_empty() { + let agg_op = AggregateOperator::new( + first_stage_aggs + .values() + .cloned() + .map(|e| Arc::new(Expr::Agg(e.clone()))) + .collect(), + group_by.clone(), + ); + ( + IntermediateNode::new(Arc::new(agg_op), vec![child_node]).boxed(), + &group_by.iter().map(|e| col(e.name())).collect(), + ) + } else { + (child_node, group_by) + }; let second_stage_agg_sink = AggregateSink::new( second_stage_aggs diff --git a/tests/cookbook/test_aggregations.py b/tests/cookbook/test_aggregations.py index 2112a4a41f..9eb2783da6 100644 --- a/tests/cookbook/test_aggregations.py +++ b/tests/cookbook/test_aggregations.py @@ -6,17 +6,11 @@ import pandas as pd import pytest -from daft import context from daft.datatype import DataType from daft.expressions import col from daft.udf import udf from tests.conftest import assert_df_equals -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) - def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts): """Sums across an entire column for the entire table""" diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index 9fe567aaef..74fe889ce0 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -7,18 +7,13 @@ import pytest import daft -from daft import col, context +from daft import col from daft.context import get_context from daft.datatype import DataType from daft.errors import ExpressionTypeError from daft.utils import freeze from tests.utils import sort_arrow_table -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) - @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) def test_agg_global(make_df, repartition_nparts): diff --git a/tests/dataframe/test_approx_percentiles_aggregations.py b/tests/dataframe/test_approx_percentiles_aggregations.py index 3e6b3ecd87..d64f1a2381 100644 --- a/tests/dataframe/test_approx_percentiles_aggregations.py +++ b/tests/dataframe/test_approx_percentiles_aggregations.py @@ -4,12 +4,7 @@ import pyarrow as pa import pytest -from daft import col, context - -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) +from daft import col @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) diff --git a/tests/dataframe/test_distinct.py b/tests/dataframe/test_distinct.py index 8865a7890c..8e4b2c0a85 100644 --- a/tests/dataframe/test_distinct.py +++ b/tests/dataframe/test_distinct.py @@ -3,15 +3,9 @@ import pyarrow as pa import pytest -from daft import context from daft.datatype import DataType from tests.utils import sort_arrow_table -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) - @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) def test_distinct_with_nulls(make_df, repartition_nparts): diff --git a/tests/dataframe/test_map_groups.py b/tests/dataframe/test_map_groups.py index 379e4b4bdc..4f0f2e29ec 100644 --- a/tests/dataframe/test_map_groups.py +++ b/tests/dataframe/test_map_groups.py @@ -3,12 +3,6 @@ import pytest import daft -from daft import context - -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) @pytest.mark.parametrize("repartition_nparts", [1, 2, 4])