Skip to content

Commit

Permalink
[FEAT]: shuffle_join_default_partitions param (#2844)
Browse files Browse the repository at this point in the history
addresses #2817
  • Loading branch information
universalmind303 authored Sep 20, 2024
1 parent c481f1b commit 688150f
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 33 deletions.
3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def set_execution_config(
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
shuffle_join_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
Expand Down Expand Up @@ -344,6 +345,7 @@ def set_execution_config(
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
shuffle_join_default_partitions: Minimum number of partitions to create when performing joins. Defaults to 16, unless the number of input partitions is greater than 16.
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables new local executor. Defaults to False
Expand All @@ -369,6 +371,7 @@ def set_execution_config(
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
shuffle_join_default_partitions=shuffle_join_default_partitions,
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
Expand Down
3 changes: 3 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,7 @@ class PyDaftExecutionConfig:
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
shuffle_join_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
Expand Down Expand Up @@ -1785,6 +1786,8 @@ class PyDaftExecutionConfig:
@property
def shuffle_aggregation_default_partitions(self) -> int: ...
@property
def shuffle_join_default_partitions(self) -> int: ...
@property
def read_sql_partition_size_bytes(self) -> int: ...
@property
def enable_aqe(self) -> bool: ...
Expand Down
2 changes: 2 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct DaftExecutionConfig {
pub csv_target_filesize: usize,
pub csv_inflation_factor: f64,
pub shuffle_aggregation_default_partitions: usize,
pub shuffle_join_default_partitions: usize,
pub read_sql_partition_size_bytes: usize,
pub enable_aqe: bool,
pub enable_native_executor: bool,
Expand All @@ -75,6 +76,7 @@ impl Default for DaftExecutionConfig {
csv_target_filesize: 512 * 1024 * 1024, // 512MB
csv_inflation_factor: 0.5,
shuffle_aggregation_default_partitions: 200,
shuffle_join_default_partitions: 16,
read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB
enable_aqe: false,
enable_native_executor: false,
Expand Down
12 changes: 12 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl PyDaftExecutionConfig {
csv_target_filesize: Option<usize>,
csv_inflation_factor: Option<f64>,
shuffle_aggregation_default_partitions: Option<usize>,
shuffle_join_default_partitions: Option<usize>,
read_sql_partition_size_bytes: Option<usize>,
enable_aqe: Option<bool>,
enable_native_executor: Option<bool>,
Expand Down Expand Up @@ -143,10 +144,16 @@ impl PyDaftExecutionConfig {
if let Some(csv_inflation_factor) = csv_inflation_factor {
config.csv_inflation_factor = csv_inflation_factor;
}

if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions
{
config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions;
}

if let Some(shuffle_join_default_partitions) = shuffle_join_default_partitions {
config.shuffle_join_default_partitions = shuffle_join_default_partitions;
}

if let Some(read_sql_partition_size_bytes) = read_sql_partition_size_bytes {
config.read_sql_partition_size_bytes = read_sql_partition_size_bytes;
}
Expand Down Expand Up @@ -231,6 +238,11 @@ impl PyDaftExecutionConfig {
Ok(self.config.shuffle_aggregation_default_partitions)
}

#[getter]
fn get_shuffle_join_default_partitions(&self) -> PyResult<usize> {
Ok(self.config.shuffle_join_default_partitions)
}

#[getter]
fn get_read_sql_partition_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.read_sql_partition_size_bytes)
Expand Down
119 changes: 88 additions & 31 deletions src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ pub(super) fn translate_single_logical_node(
"Sort-merge join currently only supports inner joins".to_string(),
));
}
let num_partitions = max(num_partitions, cfg.shuffle_join_default_partitions);

let needs_presort = if cfg.sort_merge_join_sort_with_aligned_boundaries {
// Use the special-purpose presorting that ensures join inputs are sorted with aligned
Expand Down Expand Up @@ -616,7 +617,6 @@ pub(super) fn translate_single_logical_node(
// allow for leniency in partition size to avoid minor repartitions
let num_left_partitions = left_clustering_spec.num_partitions();
let num_right_partitions = right_clustering_spec.num_partitions();

let num_partitions = match (
is_left_hash_partitioned,
is_right_hash_partitioned,
Expand All @@ -637,6 +637,7 @@ pub(super) fn translate_single_logical_node(
}
(_, _, a, b) => max(a, b),
};
let num_partitions = max(num_partitions, cfg.shuffle_join_default_partitions);

if num_left_partitions != num_partitions
|| (num_partitions > 1 && !is_left_hash_partitioned)
Expand Down Expand Up @@ -1076,6 +1077,13 @@ mod tests {
Self::Reversed(v) => Self::Reversed(v * x),
}
}
fn unwrap(&self) -> usize {
match self {
Self::Good(v) => *v,
Self::Bad(v) => *v,
Self::Reversed(v) => *v,
}
}
}

fn force_repartition(
Expand Down Expand Up @@ -1128,21 +1136,31 @@ mod tests {

fn check_physical_matches(
plan: PhysicalPlanRef,
left_partition_size: usize,
right_partition_size: usize,
left_repartitions: bool,
right_repartitions: bool,
shuffle_join_default_partitions: usize,
) -> bool {
match plan.as_ref() {
PhysicalPlan::HashJoin(HashJoin { left, right, .. }) => {
let left_works = match (left.as_ref(), left_repartitions) {
let left_works = match (
left.as_ref(),
left_repartitions || left_partition_size < shuffle_join_default_partitions,
) {
(PhysicalPlan::ReduceMerge(_), true) => true,
(PhysicalPlan::Project(_), false) => true,
_ => false,
};
let right_works = match (right.as_ref(), right_repartitions) {
let right_works = match (
right.as_ref(),
right_repartitions || right_partition_size < shuffle_join_default_partitions,
) {
(PhysicalPlan::ReduceMerge(_), true) => true,
(PhysicalPlan::Project(_), false) => true,
_ => false,
};

left_works && right_works
}
_ => false,
Expand All @@ -1152,7 +1170,7 @@ mod tests {
/// Tests a variety of settings regarding hash join repartitioning.
#[test]
fn repartition_hash_join_tests() -> DaftResult<()> {
use RepartitionOptions::*;
use RepartitionOptions::{Bad, Good, Reversed};
let cases = vec![
(Good(30), Good(30), false, false),
(Good(30), Good(40), true, false),
Expand All @@ -1170,19 +1188,33 @@ mod tests {
let cfg: Arc<DaftExecutionConfig> = DaftExecutionConfig::default().into();
for (l_opts, r_opts, l_exp, r_exp) in cases {
for mult in [1, 10] {
let plan =
get_hash_join_plan(cfg.clone(), l_opts.scale_by(mult), r_opts.scale_by(mult))?;
if !check_physical_matches(plan, l_exp, r_exp) {
let l_opts = l_opts.scale_by(mult);
let r_opts = r_opts.scale_by(mult);
let plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
l_exp,
r_exp,
cfg.shuffle_join_default_partitions,
) {
panic!(
"Failed hash join test on case ({:?}, {:?}, {}, {}) with mult {}",
l_opts, r_opts, l_exp, r_exp, mult
);
}

// reversed direction
let plan =
get_hash_join_plan(cfg.clone(), r_opts.scale_by(mult), l_opts.scale_by(mult))?;
if !check_physical_matches(plan, r_exp, l_exp) {
let plan = get_hash_join_plan(cfg.clone(), r_opts.clone(), l_opts.clone())?;
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
r_exp,
l_exp,
cfg.shuffle_join_default_partitions,
) {
panic!(
"Failed hash join test on case ({:?}, {:?}, {}, {}) with mult {}",
r_opts, l_opts, r_exp, l_exp, mult
Expand All @@ -1199,27 +1231,38 @@ mod tests {
let mut cfg = DaftExecutionConfig::default();
cfg.hash_join_partition_size_leniency = 0.8;
let cfg = Arc::new(cfg);
let (l_opts, r_opts) = (RepartitionOptions::Good(30), RepartitionOptions::Bad(40));
let physical_plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
assert!(check_physical_matches(
physical_plan,
l_opts.unwrap(),
r_opts.unwrap(),
true,
true,
cfg.shuffle_join_default_partitions
));

let physical_plan = get_hash_join_plan(
cfg.clone(),
RepartitionOptions::Good(20),
RepartitionOptions::Bad(40),
)?;
assert!(check_physical_matches(physical_plan, true, true));

let physical_plan = get_hash_join_plan(
cfg.clone(),
RepartitionOptions::Good(20),
RepartitionOptions::Bad(25),
)?;
assert!(check_physical_matches(physical_plan, false, true));
let (l_opts, r_opts) = (RepartitionOptions::Good(20), RepartitionOptions::Bad(25));
let physical_plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
assert!(check_physical_matches(
physical_plan,
l_opts.unwrap(),
r_opts.unwrap(),
false,
true,
cfg.shuffle_join_default_partitions
));

let physical_plan = get_hash_join_plan(
cfg.clone(),
RepartitionOptions::Good(20),
RepartitionOptions::Bad(26),
)?;
assert!(check_physical_matches(physical_plan, true, true));
let (l_opts, r_opts) = (RepartitionOptions::Good(20), RepartitionOptions::Bad(26));
let physical_plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
assert!(check_physical_matches(
physical_plan,
l_opts.unwrap(),
r_opts.unwrap(),
true,
true,
cfg.shuffle_join_default_partitions
));
Ok(())
}

Expand All @@ -1237,7 +1280,14 @@ mod tests {
let cfg: Arc<DaftExecutionConfig> = DaftExecutionConfig::default().into();
for (l_opts, r_opts, l_exp, r_exp) in cases {
let plan = get_hash_join_plan(cfg.clone(), l_opts, r_opts)?;
if !check_physical_matches(plan, l_exp, r_exp) {
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
l_exp,
r_exp,
cfg.shuffle_join_default_partitions,
) {
panic!(
"Failed single partition hash join test on case ({:?}, {:?}, {}, {})",
l_opts, r_opts, l_exp, r_exp
Expand All @@ -1246,7 +1296,14 @@ mod tests {

// reversed direction
let plan = get_hash_join_plan(cfg.clone(), r_opts, l_opts)?;
if !check_physical_matches(plan, r_exp, l_exp) {
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
r_exp,
l_exp,
cfg.shuffle_join_default_partitions,
) {
panic!(
"Failed single partition hash join test on case ({:?}, {:?}, {}, {})",
r_opts, l_opts, r_exp, l_exp
Expand Down
Loading

0 comments on commit 688150f

Please sign in to comment.