Skip to content

Commit

Permalink
Don't use broadcast join if larger table is already partitioned on jo…
Browse files Browse the repository at this point in the history
…in key.
  • Loading branch information
clarkzinzow committed Dec 11, 2023
1 parent 404d2f8 commit 68160ab
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,23 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftConfig>) -> DaftResult<Phys
let mut left_physical = plan(left, cfg.clone())?;
let mut right_physical = plan(right, cfg.clone())?;

let left_pspec = left_physical.partition_spec();
let right_pspec = right_physical.partition_spec();
let num_partitions = max(left_pspec.num_partitions, right_pspec.num_partitions);
let new_left_pspec = Arc::new(PartitionSpec::new_internal(
PartitionScheme::Hash,
num_partitions,
Some(left_on.clone()),
));
let new_right_pspec = Arc::new(PartitionSpec::new_internal(
PartitionScheme::Hash,
num_partitions,
Some(right_on.clone()),
));

let is_left_partitioned = left_pspec == new_left_pspec;
let is_right_partitioned = right_pspec == new_right_pspec;

// If either the left or right side of the join are very small tables, perform a broadcast join with the
// entire smaller table broadcast to each of the partitions of the larger table.

Expand All @@ -507,29 +524,21 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftConfig>) -> DaftResult<Phys
(None, Some(right_size_bytes)) => (Some(right_size_bytes), true),
(None, None) => (None, false),
};
// If smaller table is under broadcast size threshold, use broadcast join.
if let Some(smaller_size_bytes) = smaller_size_bytes && smaller_size_bytes <= cfg.broadcast_join_size_bytes_threshold {
let is_larger_partitioned = if do_swap {
is_right_partitioned
} else {
is_left_partitioned
};
// If larger table is not already partitioned on the join key AND the smaller table is under broadcast size threshold, use broadcast join.
if !is_larger_partitioned && let Some(smaller_size_bytes) = smaller_size_bytes && smaller_size_bytes <= cfg.broadcast_join_size_bytes_threshold {
if do_swap {
// These will get swapped back when doing the actual local joins.
(left_physical, right_physical) = (right_physical, left_physical);
}
return Ok(PhysicalPlan::BroadcastJoin(BroadcastJoin::new(left_physical.into(), right_physical.into(), left_on.clone(), right_on.clone(), *join_type, do_swap)));
}
let left_pspec = left_physical.partition_spec();
let right_pspec = right_physical.partition_spec();
let num_partitions = max(left_pspec.num_partitions, right_pspec.num_partitions);
let new_left_pspec = Arc::new(PartitionSpec::new_internal(
PartitionScheme::Hash,
num_partitions,
Some(left_on.clone()),
));
let new_right_pspec = Arc::new(PartitionSpec::new_internal(
PartitionScheme::Hash,
num_partitions,
Some(right_on.clone()),
));
if (num_partitions > 1 || left_pspec.num_partitions != num_partitions)
&& left_pspec != new_left_pspec
&& !is_left_partitioned
{
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
left_physical.into(),
Expand All @@ -539,7 +548,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftConfig>) -> DaftResult<Phys
left_physical = PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()));
}
if (num_partitions > 1 || right_pspec.num_partitions != num_partitions)
&& right_pspec != new_right_pspec
&& !is_right_partitioned
{
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
right_physical.into(),
Expand Down

0 comments on commit 68160ab

Please sign in to comment.