From fedc085f3c3b9cccb85bb1dff907de428f62223d Mon Sep 17 00:00:00 2001 From: Kev Wang Date: Fri, 4 Oct 2024 20:17:00 -0700 Subject: [PATCH] [BUG] Fix actor pool project splitting when column is not renamed (#2998) Previously, this would fail: ```py import os os.environ["DAFT_ENABLE_ACTOR_POOL_PROJECTIONS"] = "1" import daft from daft import udf @udf( return_dtype=daft.DataType.int64(), batch_size=1 ) class MyUDF: def __init__(self): # import time # time.sleep(10) pass def __call__(self, _): # import time # time.sleep(10) import os pid = os.getpid() return [pid] MyUDF = MyUDF.with_concurrency(4) df = daft.from_pydict({"a": list(range(10))}) df = df.into_partitions(4) df = df.select(MyUDF(df["a"])) df = df.select(MyUDF(df["a"])) df.show() ``` This is because when we split the project into multiple actor pool projects, we create new names for intermediate columns and lose the information about the original name. This PR fixes that by adding an alias to the end of the actor pool projects. --------- Co-authored-by: Jay Chia <17691182+jaychia@users.noreply.github.com> --- .../rules/split_actor_pool_projects.rs | 96 ++++++++++++++++++- 1 file changed, 93 insertions(+), 3 deletions(-) diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 1d77502221..a44674db55 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -122,7 +122,7 @@ impl SplitActorPoolProjects { impl OptimizerRule for SplitActorPoolProjects { fn try_optimize(&self, plan: Arc) -> DaftResult>> { plan.transform_down(|node| match node.as_ref() { - LogicalPlan::Project(projection) => try_optimize_project(projection, node.clone(), 0), + LogicalPlan::Project(projection) => try_optimize_project(projection, node.clone()), _ => Ok(Transformed::no(node)), }) } @@ -370,8 +370,34 @@ fn split_projection( fn try_optimize_project( projection: &Project, plan: Arc, +) -> DaftResult>> { + // Add aliases to the expressions in the projection to preserve original names when splitting stateful UDFs. + // This is needed because when we split stateful UDFs, we create new names for intermediates, but we would like + // to have the same expression names as the original projection. + let aliased_projection_exprs = projection + .projection + .iter() + .map(|e| { + if has_stateful_udf(e) && !matches!(e.as_ref(), Expr::Alias(..)) { + e.alias(e.name()) + } else { + e.clone() + } + }) + .collect(); + + let aliased_projection = Project::try_new(projection.input.clone(), aliased_projection_exprs)?; + + recursive_optimize_project(&aliased_projection, plan, 0) +} + +fn recursive_optimize_project( + projection: &Project, + plan: Arc, recursive_count: usize, ) -> DaftResult>> { + // TODO: eliminate the need for recursive calls by doing a post-order traversal of the plan tree. + // Base case: no stateful UDFs at all let has_stateful_udfs = projection.projection.iter().any(has_stateful_udf); if !has_stateful_udfs { @@ -416,8 +442,11 @@ fn try_optimize_project( // Recursively run the rule on the new child Project let new_project = Project::try_new(projection.input.clone(), remaining)?; let new_child_project = LogicalPlan::Project(new_project.clone()).arced(); - let optimized_child_plan = - try_optimize_project(&new_project, new_child_project.clone(), recursive_count + 1)?; + let optimized_child_plan = recursive_optimize_project( + &new_project, + new_child_project.clone(), + recursive_count + 1, + )?; optimized_child_plan.data.clone() }; @@ -785,6 +814,67 @@ mod tests { Ok(()) } + #[test] + fn test_multiple_with_column_serial_no_alias() -> DaftResult<()> { + let scan_op = dummy_scan_operator(vec![Field::new("a", DataType::Utf8)]); + let scan_plan = dummy_scan_node(scan_op); + let stacked_stateful_project_expr = + create_stateful_udf(vec![create_stateful_udf(vec![col("a")])]); + + // Add a Projection with StatefulUDF and resource request + let project_plan = scan_plan + .select(vec![stacked_stateful_project_expr.clone()])? + .build(); + + let intermediate_name = "__TruncateRootStatefulUDF_0-0-0__"; + + let expected = scan_plan.select(vec![col("a")])?.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name), + ], + )?) + .arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col(intermediate_name)])?).arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col(intermediate_name)])?).arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_name), + create_stateful_udf(vec![col(intermediate_name)]) + .clone() + .alias("a"), + ], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("a")])?).arced(); + assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; + + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + scan_plan.build(), + vec![create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name)], + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![create_stateful_udf(vec![col(intermediate_name)]) + .clone() + .alias("a")], + )?) + .arced(); + assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?; + + Ok(()) + } + #[test] fn test_multiple_with_column_serial_multiarg() -> DaftResult<()> { let scan_op = dummy_scan_operator(vec![