Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT]: shuffle_join_default_partitions param #2844

Merged
3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def set_execution_config(
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
shuffle_join_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
Expand Down Expand Up @@ -344,6 +345,7 @@ def set_execution_config(
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
shuffle_join_default_partitions: Minimum number of partitions to create when performing joins. Defaults to 200, unless the number of input partitions is less than 200.
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables new local executor. Defaults to False
Expand All @@ -369,6 +371,7 @@ def set_execution_config(
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
shuffle_join_default_partitions=shuffle_join_default_partitions,
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
Expand Down
3 changes: 3 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,7 @@ class PyDaftExecutionConfig:
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
shuffle_join_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
Expand Down Expand Up @@ -1783,6 +1784,8 @@ class PyDaftExecutionConfig:
@property
def shuffle_aggregation_default_partitions(self) -> int: ...
@property
def shuffle_join_default_partitions(self) -> int: ...
@property
def read_sql_partition_size_bytes(self) -> int: ...
@property
def enable_aqe(self) -> bool: ...
Expand Down
2 changes: 2 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct DaftExecutionConfig {
pub csv_target_filesize: usize,
pub csv_inflation_factor: f64,
pub shuffle_aggregation_default_partitions: usize,
pub shuffle_join_default_partitions: usize,
pub read_sql_partition_size_bytes: usize,
pub enable_aqe: bool,
pub enable_native_executor: bool,
Expand All @@ -75,6 +76,7 @@ impl Default for DaftExecutionConfig {
csv_target_filesize: 512 * 1024 * 1024, // 512MB
csv_inflation_factor: 0.5,
shuffle_aggregation_default_partitions: 200,
shuffle_join_default_partitions: 200,
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB
enable_aqe: false,
enable_native_executor: false,
Expand Down
12 changes: 12 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl PyDaftExecutionConfig {
csv_target_filesize: Option<usize>,
csv_inflation_factor: Option<f64>,
shuffle_aggregation_default_partitions: Option<usize>,
shuffle_join_default_partitions: Option<usize>,
read_sql_partition_size_bytes: Option<usize>,
enable_aqe: Option<bool>,
enable_native_executor: Option<bool>,
Expand Down Expand Up @@ -143,10 +144,16 @@ impl PyDaftExecutionConfig {
if let Some(csv_inflation_factor) = csv_inflation_factor {
config.csv_inflation_factor = csv_inflation_factor;
}

if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions
{
config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions;
}

if let Some(shuffle_join_default_partitions) = shuffle_join_default_partitions {
config.shuffle_join_default_partitions = shuffle_join_default_partitions;
}

if let Some(read_sql_partition_size_bytes) = read_sql_partition_size_bytes {
config.read_sql_partition_size_bytes = read_sql_partition_size_bytes;
}
Expand Down Expand Up @@ -231,6 +238,11 @@ impl PyDaftExecutionConfig {
Ok(self.config.shuffle_aggregation_default_partitions)
}

#[getter]
fn get_shuffle_join_default_partitions(&self) -> PyResult<usize> {
Ok(self.config.shuffle_join_default_partitions)
}

#[getter]
fn get_read_sql_partition_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.read_sql_partition_size_bytes)
Expand Down
130 changes: 96 additions & 34 deletions src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ pub(super) fn translate_single_logical_node(
left_clustering_spec.num_partitions(),
right_clustering_spec.num_partitions(),
);
let num_partitions = min(num_partitions, cfg.shuffle_join_default_partitions);
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved

let is_left_hash_partitioned =
matches!(left_clustering_spec.as_ref(), ClusteringSpec::Hash(..))
Expand Down Expand Up @@ -616,7 +617,7 @@ pub(super) fn translate_single_logical_node(
// allow for leniency in partition size to avoid minor repartitions
let num_left_partitions = left_clustering_spec.num_partitions();
let num_right_partitions = right_clustering_spec.num_partitions();

// 100, 300
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
let num_partitions = match (
is_left_hash_partitioned,
is_right_hash_partitioned,
Expand All @@ -637,6 +638,7 @@ pub(super) fn translate_single_logical_node(
}
(_, _, a, b) => max(a, b),
};
let num_partitions = min(num_partitions, cfg.shuffle_join_default_partitions);
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved

if num_left_partitions != num_partitions
|| (num_partitions > 1 && !is_left_hash_partitioned)
Expand Down Expand Up @@ -1076,17 +1078,28 @@ mod tests {
Self::Reversed(v) => Self::Reversed(v * x),
}
}
fn unwrap(&self) -> usize {
match self {
Self::Good(v) => *v,
Self::Bad(v) => *v,
Self::Reversed(v) => *v,
}
}
}

fn force_repartition(
node: LogicalPlanBuilder,
opts: RepartitionOptions,
) -> DaftResult<LogicalPlanBuilder> {
match opts {
RepartitionOptions::Good(x) => node.hash_repartition(Some(x), vec![col("a"), col("b")]),
RepartitionOptions::Bad(x) => node.hash_repartition(Some(x), vec![col("a"), col("c")]),
RepartitionOptions::Good(x) => {
node.hash_repartition(Some(dbg!(x)), vec![col("a"), col("b")])
}
RepartitionOptions::Bad(x) => {
node.hash_repartition(Some(dbg!(x)), vec![col("a"), col("c")])
}
RepartitionOptions::Reversed(x) => {
node.hash_repartition(Some(x), vec![col("b"), col("a")])
node.hash_repartition(Some(dbg!(x)), vec![col("b"), col("a")])
}
}
}
Expand Down Expand Up @@ -1128,21 +1141,31 @@ mod tests {

fn check_physical_matches(
plan: PhysicalPlanRef,
left_partition_size: usize,
right_partition_size: usize,
left_repartitions: bool,
right_repartitions: bool,
shuffle_join_default_partitions: usize,
) -> bool {
match plan.as_ref() {
PhysicalPlan::HashJoin(HashJoin { left, right, .. }) => {
let left_works = match (left.as_ref(), left_repartitions) {
let left_works = match (
left.as_ref(),
left_repartitions || left_partition_size > shuffle_join_default_partitions,
) {
(PhysicalPlan::ReduceMerge(_), true) => true,
(PhysicalPlan::Project(_), false) => true,
_ => false,
};
let right_works = match (right.as_ref(), right_repartitions) {
let right_works = match (
right.as_ref(),
right_repartitions || right_partition_size > shuffle_join_default_partitions,
) {
(PhysicalPlan::ReduceMerge(_), true) => true,
(PhysicalPlan::Project(_), false) => true,
_ => false,
};

left_works && right_works
}
_ => false,
Expand All @@ -1152,7 +1175,7 @@ mod tests {
/// Tests a variety of settings regarding hash join repartitioning.
#[test]
fn repartition_hash_join_tests() -> DaftResult<()> {
use RepartitionOptions::*;
use RepartitionOptions::{Bad, Good, Reversed};
let cases = vec![
(Good(30), Good(30), false, false),
(Good(30), Good(40), true, false),
Expand All @@ -1170,19 +1193,33 @@ mod tests {
let cfg: Arc<DaftExecutionConfig> = DaftExecutionConfig::default().into();
for (l_opts, r_opts, l_exp, r_exp) in cases {
for mult in [1, 10] {
let plan =
get_hash_join_plan(cfg.clone(), l_opts.scale_by(mult), r_opts.scale_by(mult))?;
if !check_physical_matches(plan, l_exp, r_exp) {
let l_opts = l_opts.scale_by(mult);
let r_opts = r_opts.scale_by(mult);
let plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
l_exp,
r_exp,
cfg.shuffle_join_default_partitions,
) {
panic!(
"Failed hash join test on case ({:?}, {:?}, {}, {}) with mult {}",
l_opts, r_opts, l_exp, r_exp, mult
);
}

// reversed direction
let plan =
get_hash_join_plan(cfg.clone(), r_opts.scale_by(mult), l_opts.scale_by(mult))?;
if !check_physical_matches(plan, r_exp, l_exp) {
let plan = get_hash_join_plan(cfg.clone(), r_opts.clone(), l_opts.clone())?;
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
r_exp,
l_exp,
cfg.shuffle_join_default_partitions,
) {
panic!(
"Failed hash join test on case ({:?}, {:?}, {}, {}) with mult {}",
r_opts, l_opts, r_exp, l_exp, mult
Expand All @@ -1199,27 +1236,38 @@ mod tests {
let mut cfg = DaftExecutionConfig::default();
cfg.hash_join_partition_size_leniency = 0.8;
let cfg = Arc::new(cfg);
let (l_opts, r_opts) = (RepartitionOptions::Good(30), RepartitionOptions::Bad(40));
let physical_plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
assert!(check_physical_matches(
physical_plan,
l_opts.unwrap(),
r_opts.unwrap(),
true,
true,
cfg.shuffle_join_default_partitions
));

let physical_plan = get_hash_join_plan(
cfg.clone(),
RepartitionOptions::Good(20),
RepartitionOptions::Bad(40),
)?;
assert!(check_physical_matches(physical_plan, true, true));

let physical_plan = get_hash_join_plan(
cfg.clone(),
RepartitionOptions::Good(20),
RepartitionOptions::Bad(25),
)?;
assert!(check_physical_matches(physical_plan, false, true));
let (l_opts, r_opts) = (RepartitionOptions::Good(20), RepartitionOptions::Bad(25));
let physical_plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
assert!(check_physical_matches(
physical_plan,
l_opts.unwrap(),
r_opts.unwrap(),
false,
true,
cfg.shuffle_join_default_partitions
));

let physical_plan = get_hash_join_plan(
cfg.clone(),
RepartitionOptions::Good(20),
RepartitionOptions::Bad(26),
)?;
assert!(check_physical_matches(physical_plan, true, true));
let (l_opts, r_opts) = (RepartitionOptions::Good(20), RepartitionOptions::Bad(26));
let physical_plan = get_hash_join_plan(cfg.clone(), l_opts.clone(), r_opts.clone())?;
assert!(check_physical_matches(
physical_plan,
l_opts.unwrap(),
r_opts.unwrap(),
true,
true,
cfg.shuffle_join_default_partitions
));
Ok(())
}

Expand All @@ -1237,7 +1285,14 @@ mod tests {
let cfg: Arc<DaftExecutionConfig> = DaftExecutionConfig::default().into();
for (l_opts, r_opts, l_exp, r_exp) in cases {
let plan = get_hash_join_plan(cfg.clone(), l_opts, r_opts)?;
if !check_physical_matches(plan, l_exp, r_exp) {
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
l_exp,
r_exp,
cfg.shuffle_join_default_partitions,
) {
panic!(
"Failed single partition hash join test on case ({:?}, {:?}, {}, {})",
l_opts, r_opts, l_exp, r_exp
Expand All @@ -1246,7 +1301,14 @@ mod tests {

// reversed direction
let plan = get_hash_join_plan(cfg.clone(), r_opts, l_opts)?;
if !check_physical_matches(plan, r_exp, l_exp) {
if !check_physical_matches(
plan,
l_opts.unwrap(),
r_opts.unwrap(),
r_exp,
l_exp,
cfg.shuffle_join_default_partitions,
) {
panic!(
"Failed single partition hash join test on case ({:?}, {:?}, {}, {})",
r_opts, l_opts, r_exp, l_exp
Expand Down
Loading
Loading