diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index cdecb0ed7d..0c4a53ad28 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -329,11 +329,12 @@ pub fn physical_plan_to_pipeline( let left_schema = left.schema(); let right_schema = right.schema(); - // TODO(desmond): Stats should always be materialized by this point. We should fix this on the Daft Connect side. - // If stats are materialized, use them to determine the build and probe sides. + // To determine whether to use the left or right side of a join for building a probe table, we consider: + // 1. Cardinality of the sides. Probe tables should be built on the smaller side. + // 2. Join type. Different join types have different requirements for which side can build the probe table. let left_stats_state = left.get_stats_state(); let right_stats_state = right.get_stats_state(); - let left_smaller_than_right = match (left_stats_state, right_stats_state) { + let build_on_left = match (left_stats_state, right_stats_state) { (StatsState::Materialized(left_stats), StatsState::Materialized(right_stats)) => { left_stats.approx_stats.upper_bound_bytes <= right_stats.approx_stats.upper_bound_bytes @@ -365,8 +366,8 @@ pub fn physical_plan_to_pipeline( // 2. Stream rows on the left until all rows have been seen. // 3. Finally, emit all unmatched rows from the right. let build_on_left = match join_type { - JoinType::Inner => left_smaller_than_right, - JoinType::Outer => left_smaller_than_right, + JoinType::Inner => build_on_left, + JoinType::Outer => build_on_left, // For left outer joins, we build on right so we can stream the left side. JoinType::Left => false, // For right outer joins, we build on left so we can stream the right side.