From aabd04e180cb20ca7f60f57025ffd0ac1924c4ad Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 4 Oct 2024 13:17:30 -0700 Subject: [PATCH 1/5] [BUG] Fix actor pool project splitting when column is not renamed --- .../rules/split_actor_pool_projects.rs | 91 ++++++++++++++++++- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 1d77502221..7728098863 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -122,7 +122,7 @@ impl SplitActorPoolProjects { impl OptimizerRule for SplitActorPoolProjects { fn try_optimize(&self, plan: Arc) -> DaftResult>> { plan.transform_down(|node| match node.as_ref() { - LogicalPlan::Project(projection) => try_optimize_project(projection, node.clone(), 0), + LogicalPlan::Project(projection) => try_optimize_project(projection, node.clone()), _ => Ok(Transformed::no(node)), }) } @@ -370,8 +370,27 @@ fn split_projection( fn try_optimize_project( projection: &Project, plan: Arc, +) -> DaftResult>> { + // We need to add aliases to the expressions since when we split stateful UDFs, + //we create new names for intermediates, but we would like to preserve the original names + let aliased_projection_exprs = projection + .projection + .iter() + .map(|e| e.alias(e.name())) + .collect(); + + let aliased_projection = Project::try_new(projection.input.clone(), aliased_projection_exprs)?; + + recursive_optimize_project(&aliased_projection, plan, 0) +} + +fn recursive_optimize_project( + projection: &Project, + plan: Arc, recursive_count: usize, ) -> DaftResult>> { + // TODO: eliminate the need for recursive calls by doing a post-order traversal of the plan tree. + // Base case: no stateful UDFs at all let has_stateful_udfs = projection.projection.iter().any(has_stateful_udf); if !has_stateful_udfs { @@ -416,8 +435,11 @@ fn try_optimize_project( // Recursively run the rule on the new child Project let new_project = Project::try_new(projection.input.clone(), remaining)?; let new_child_project = LogicalPlan::Project(new_project.clone()).arced(); - let optimized_child_plan = - try_optimize_project(&new_project, new_child_project.clone(), recursive_count + 1)?; + let optimized_child_plan = recursive_optimize_project( + &new_project, + new_child_project.clone(), + recursive_count + 1, + )?; optimized_child_plan.data.clone() }; @@ -785,6 +807,69 @@ mod tests { Ok(()) } + #[test] + fn test_multiple_with_column_serial_no_alias() -> DaftResult<()> { + let scan_op = dummy_scan_operator(vec![Field::new("a", DataType::Utf8)]); + let scan_plan = dummy_scan_node(scan_op); + let stacked_stateful_project_expr = + create_stateful_udf(vec![create_stateful_udf(vec![col("a")])]); + + // Add a Projection with StatefulUDF and resource request + let project_plan = scan_plan + .select(vec![stacked_stateful_project_expr.clone()])? + .build(); + + println!("{}", project_plan); + + let intermediate_name = "__TruncateRootStatefulUDF_0-0-0__"; + + let expected = scan_plan.select(vec![col("a")])?.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name), + ], + )?) + .arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col(intermediate_name)])?).arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col(intermediate_name)])?).arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_name), + create_stateful_udf(vec![col(intermediate_name)]) + .clone() + .alias("a"), + ], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("a")])?).arced(); + assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; + + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + scan_plan.build(), + vec![create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name)], + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![create_stateful_udf(vec![col(intermediate_name)]) + .clone() + .alias("a")], + )?) + .arced(); + assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?; + + Ok(()) + } + #[test] fn test_multiple_with_column_serial_multiarg() -> DaftResult<()> { let scan_op = dummy_scan_operator(vec![ From 6d6490bd9cba050ffc359a3ebf33c57aefc534a1 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 4 Oct 2024 13:56:22 -0700 Subject: [PATCH 2/5] make alias condition more restrictive --- .../rules/split_actor_pool_projects.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 7728098863..b9d73fcfa3 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -376,7 +376,13 @@ fn try_optimize_project( let aliased_projection_exprs = projection .projection .iter() - .map(|e| e.alias(e.name())) + .map(|e| { + if has_stateful_udf(e) && !matches!(e.as_ref(), Expr::Alias(..)) { + e.alias(e.name()) + } else { + e.clone() + } + }) .collect(); let aliased_projection = Project::try_new(projection.input.clone(), aliased_projection_exprs)?; From 520a04ca0bc46819dc12d1ebce727e30da843121 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 4 Oct 2024 13:58:40 -0700 Subject: [PATCH 3/5] make wording better --- .../logical_optimization/rules/split_actor_pool_projects.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index b9d73fcfa3..9c786c57b5 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -371,8 +371,9 @@ fn try_optimize_project( projection: &Project, plan: Arc, ) -> DaftResult>> { - // We need to add aliases to the expressions since when we split stateful UDFs, - //we create new names for intermediates, but we would like to preserve the original names + // Add aliases to the expressions in the projection to preserve original names when splitting stateful UDFs. + // This is needed because when we split stateful UDFs, we create new names for intermediates, but we would like + // to have the same expression names as the original projection. let aliased_projection_exprs = projection .projection .iter() From 56f530560c52b8c4c08ef58db92119ceae9ec752 Mon Sep 17 00:00:00 2001 From: Kev Wang Date: Fri, 4 Oct 2024 18:45:55 -0700 Subject: [PATCH 4/5] Delete println Co-authored-by: Jay Chia <17691182+jaychia@users.noreply.github.com> --- .../src/logical_optimization/rules/split_actor_pool_projects.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 9c786c57b5..83ac730fce 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -826,7 +826,6 @@ mod tests { .select(vec![stacked_stateful_project_expr.clone()])? .build(); - println!("{}", project_plan); let intermediate_name = "__TruncateRootStatefulUDF_0-0-0__"; From e558ac7a3ed77e7ef1c56528b89b0a3e2fe3f52d Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 4 Oct 2024 19:54:25 -0700 Subject: [PATCH 5/5] format --- .../src/logical_optimization/rules/split_actor_pool_projects.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 83ac730fce..a44674db55 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -826,7 +826,6 @@ mod tests { .select(vec![stacked_stateful_project_expr.clone()])? .build(); - let intermediate_name = "__TruncateRootStatefulUDF_0-0-0__"; let expected = scan_plan.select(vec![col("a")])?.build();