Skip to content

Commit

Permalink
Revert "[FEAT]: shuffle_join_default_partitions param (#2844)"
Browse files Browse the repository at this point in the history
This reverts commit 688150f.
  • Loading branch information
jaychia authored Sep 20, 2024
1 parent 2ae875f commit 6712d1d
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 201 deletions.
3 changes: 0 additions & 3 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ def set_execution_config(
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
shuffle_join_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
Expand Down Expand Up @@ -345,7 +344,6 @@ def set_execution_config(
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
shuffle_join_default_partitions: Minimum number of partitions to create when performing joins. Defaults to 16, unless the number of input partitions is greater than 16.
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables new local executor. Defaults to False
Expand All @@ -371,7 +369,6 @@ def set_execution_config(
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
shuffle_join_default_partitions=shuffle_join_default_partitions,
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
Expand Down
3 changes: 0 additions & 3 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,6 @@ class PyDaftExecutionConfig:
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
shuffle_join_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
Expand Down Expand Up @@ -1791,8 +1790,6 @@ class PyDaftExecutionConfig:
@property
def shuffle_aggregation_default_partitions(self) -> int: ...
@property
def shuffle_join_default_partitions(self) -> int: ...
@property
def read_sql_partition_size_bytes(self) -> int: ...
@property
def enable_aqe(self) -> bool: ...
Expand Down
2 changes: 0 additions & 2 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub struct DaftExecutionConfig {
pub csv_target_filesize: usize,
pub csv_inflation_factor: f64,
pub shuffle_aggregation_default_partitions: usize,
pub shuffle_join_default_partitions: usize,
pub read_sql_partition_size_bytes: usize,
pub enable_aqe: bool,
pub enable_native_executor: bool,
Expand All @@ -76,7 +75,6 @@ impl Default for DaftExecutionConfig {
csv_target_filesize: 512 * 1024 * 1024, // 512MB
csv_inflation_factor: 0.5,
shuffle_aggregation_default_partitions: 200,
shuffle_join_default_partitions: 16,
read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB
enable_aqe: false,
enable_native_executor: false,
Expand Down
12 changes: 0 additions & 12 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ impl PyDaftExecutionConfig {
csv_target_filesize: Option<usize>,
csv_inflation_factor: Option<f64>,
shuffle_aggregation_default_partitions: Option<usize>,
shuffle_join_default_partitions: Option<usize>,
read_sql_partition_size_bytes: Option<usize>,
enable_aqe: Option<bool>,
enable_native_executor: Option<bool>,
Expand Down Expand Up @@ -144,16 +143,10 @@ impl PyDaftExecutionConfig {
if let Some(csv_inflation_factor) = csv_inflation_factor {
config.csv_inflation_factor = csv_inflation_factor;
}

if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions
{
config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions;
}

if let Some(shuffle_join_default_partitions) = shuffle_join_default_partitions {
config.shuffle_join_default_partitions = shuffle_join_default_partitions;
}

if let Some(read_sql_partition_size_bytes) = read_sql_partition_size_bytes {
config.read_sql_partition_size_bytes = read_sql_partition_size_bytes;
}
Expand Down Expand Up @@ -238,11 +231,6 @@ impl PyDaftExecutionConfig {
Ok(self.config.shuffle_aggregation_default_partitions)
}

#[getter]
fn get_shuffle_join_default_partitions(&self) -> PyResult<usize> {
Ok(self.config.shuffle_join_default_partitions)
}

#[getter]
fn get_read_sql_partition_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.read_sql_partition_size_bytes)
Expand Down
119 changes: 31 additions & 88 deletions src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,6 @@ pub(super) fn translate_single_logical_node(
"Sort-merge join currently only supports inner joins".to_string(),
));
}
let num_partitions = max(num_partitions, cfg.shuffle_join_default_partitions);

let needs_presort = if cfg.sort_merge_join_sort_with_aligned_boundaries {
// Use the special-purpose presorting that ensures join inputs are sorted with aligned
Expand Down Expand Up @@ -617,6 +616,7 @@ pub(super) fn translate_single_logical_node(
// allow for leniency in partition size to avoid minor repartitions
let num_left_partitions = left_clustering_spec.num_partitions();
let num_right_partitions = right_clustering_spec.num_partitions();

let num_partitions = match (
is_left_hash_partitioned,
is_right_hash_partitioned,
Expand All @@ -637,7 +637,6 @@ pub(super) fn translate_single_logical_node(
}
(_, _, a, b) => max(a, b),
};
let num_partitions = max(num_partitions, cfg.shuffle_join_default_partitions);

if num_left_partitions != num_partitions
|| (num_partitions > 1 && !is_left_hash_partitioned)
Expand Down Expand Up @@ -1077,13 +1076,6 @@ mod tests {
Self::Reversed(v) => Self::Reversed(v * x),
}
}
fn unwrap(&self) -> usize {
match self {
Self::Good(v) => *v,
Self::Bad(v) => *v,
Self::Reversed(v) => *v,
}
}
}

fn force_repartition(
Expand Down Expand Up @@ -1136,31 +1128,21 @@ mod tests {

fn check_physical_matches(
plan: PhysicalPlanRef,
left_partition_size: usize,
right_partition_size: usize,
left_repartitions: bool,
right_repartitions: bool,
shuffle_join_default_partitions: usize,
) -> bool {
match plan.as_ref() {
PhysicalPlan::HashJoin(HashJoin { left, right, .. }) => {
let left_works = match (
left.as_ref(),
left_repartitions || left_partition_size < shuffle_join_default_partitions,
) {
let left_works = match (left.as_ref(), left_repartitions) {
(PhysicalPlan::ReduceMerge(_), true) => true,
(PhysicalPlan::Project(_), false) => true,
_ => false,
};
let right_works = match (
right.as_ref(),
right_repartitions || right_partition_size < shuffle_join_default_partitions,
) {
let right_works = match (right.as_ref(), right_repartitions) {
(PhysicalPlan::ReduceMerge(_), true) => true,
(PhysicalPlan::Project(_), false) => true,
_ => false,
};

left_works && right_works
}
_ => false,
Expand All @@ -1170,7 +1152,7 @@ mod tests {
/// Tests a variety of settings regarding hash join repartitioning.
#[test]
fn repartition_hash_join_tests() -> DaftResult<()> {
use RepartitionOptions::{Bad, Good, Reversed};
use RepartitionOptions::*;
let cases = vec![
(Good(30), Good(30), false, false),
(Good(30), Good(40), true, false),
Expand All @@ -1188,33 +1170,19 @@ mod tests {
let cfg: Arc<DaftExecutionConfig> = DaftExecutionConfig::default().into();
for (l_opts, r_opts, l_exp, r_exp) in cases {
for mult in [1, 10] {
let l_opts = l_opts.scale_by(mult);
let r_opts = r_opts.scale_by(mult);
let plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
l_exp,
r_exp,
cfg.shuffle_join_default_partitions,
) {
let plan =
get_hash_join_plan(cfg.clone(), l_opts.scale_by(mult), r_opts.scale_by(mult))?;
if !check_physical_matches(plan, l_exp, r_exp) {
panic!(
"Failed hash join test on case ({:?}, {:?}, {}, {}) with mult {}",
l_opts, r_opts, l_exp, r_exp, mult
);
}

// reversed direction
let plan = get_hash_join_plan(cfg.clone(), r_opts.clone(), l_opts.clone())?;
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
r_exp,
l_exp,
cfg.shuffle_join_default_partitions,
) {
let plan =
get_hash_join_plan(cfg.clone(), r_opts.scale_by(mult), l_opts.scale_by(mult))?;
if !check_physical_matches(plan, r_exp, l_exp) {
panic!(
"Failed hash join test on case ({:?}, {:?}, {}, {}) with mult {}",
r_opts, l_opts, r_exp, l_exp, mult
Expand All @@ -1231,38 +1199,27 @@ mod tests {
let mut cfg = DaftExecutionConfig::default();
cfg.hash_join_partition_size_leniency = 0.8;
let cfg = Arc::new(cfg);
let (l_opts, r_opts) = (RepartitionOptions::Good(30), RepartitionOptions::Bad(40));
let physical_plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
assert!(check_physical_matches(
physical_plan,
l_opts.unwrap(),
r_opts.unwrap(),
true,
true,
cfg.shuffle_join_default_partitions
));

let (l_opts, r_opts) = (RepartitionOptions::Good(20), RepartitionOptions::Bad(25));
let physical_plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
assert!(check_physical_matches(
physical_plan,
l_opts.unwrap(),
r_opts.unwrap(),
false,
true,
cfg.shuffle_join_default_partitions
));
let physical_plan = get_hash_join_plan(
cfg.clone(),
RepartitionOptions::Good(20),
RepartitionOptions::Bad(40),
)?;
assert!(check_physical_matches(physical_plan, true, true));

let physical_plan = get_hash_join_plan(
cfg.clone(),
RepartitionOptions::Good(20),
RepartitionOptions::Bad(25),
)?;
assert!(check_physical_matches(physical_plan, false, true));

let (l_opts, r_opts) = (RepartitionOptions::Good(20), RepartitionOptions::Bad(26));
let physical_plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
assert!(check_physical_matches(
physical_plan,
l_opts.unwrap(),
r_opts.unwrap(),
true,
true,
cfg.shuffle_join_default_partitions
));
let physical_plan = get_hash_join_plan(
cfg.clone(),
RepartitionOptions::Good(20),
RepartitionOptions::Bad(26),
)?;
assert!(check_physical_matches(physical_plan, true, true));
Ok(())
}

Expand All @@ -1280,14 +1237,7 @@ mod tests {
let cfg: Arc<DaftExecutionConfig> = DaftExecutionConfig::default().into();
for (l_opts, r_opts, l_exp, r_exp) in cases {
let plan = get_hash_join_plan(cfg.clone(), l_opts, r_opts)?;
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
l_exp,
r_exp,
cfg.shuffle_join_default_partitions,
) {
if !check_physical_matches(plan, l_exp, r_exp) {
panic!(
"Failed single partition hash join test on case ({:?}, {:?}, {}, {})",
l_opts, r_opts, l_exp, r_exp
Expand All @@ -1296,14 +1246,7 @@ mod tests {

// reversed direction
let plan = get_hash_join_plan(cfg.clone(), r_opts, l_opts)?;
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
r_exp,
l_exp,
cfg.shuffle_join_default_partitions,
) {
if !check_physical_matches(plan, r_exp, l_exp) {
panic!(
"Failed single partition hash join test on case ({:?}, {:?}, {}, {})",
r_opts, l_opts, r_exp, l_exp
Expand Down
Loading

0 comments on commit 6712d1d

Please sign in to comment.