Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT]: shuffle_join_default_partitions param #2844

Merged
3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def set_execution_config(
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
shuffle_join_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
Expand Down Expand Up @@ -344,6 +345,7 @@ def set_execution_config(
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
shuffle_join_default_partitions: Minimum number of partitions to create when performing joins. Defaults to 200, unless the number of input partitions is less than 200.
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables new local executor. Defaults to False
Expand All @@ -369,6 +371,7 @@ def set_execution_config(
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
shuffle_join_default_partitions=shuffle_join_default_partitions,
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
Expand Down
3 changes: 3 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,7 @@ class PyDaftExecutionConfig:
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
shuffle_join_default_partitions: int | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
Expand Down Expand Up @@ -1779,6 +1780,8 @@ class PyDaftExecutionConfig:
@property
def shuffle_aggregation_default_partitions(self) -> int: ...
@property
def shuffle_join_default_partitions(self) -> int: ...
@property
def read_sql_partition_size_bytes(self) -> int: ...
@property
def enable_aqe(self) -> bool: ...
Expand Down
2 changes: 2 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct DaftExecutionConfig {
pub csv_target_filesize: usize,
pub csv_inflation_factor: f64,
pub shuffle_aggregation_default_partitions: usize,
pub shuffle_join_default_partitions: usize,
pub read_sql_partition_size_bytes: usize,
pub enable_aqe: bool,
pub enable_native_executor: bool,
Expand All @@ -75,6 +76,7 @@ impl Default for DaftExecutionConfig {
csv_target_filesize: 512 * 1024 * 1024, // 512MB
csv_inflation_factor: 0.5,
shuffle_aggregation_default_partitions: 200,
shuffle_join_default_partitions: 200,
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB
enable_aqe: false,
enable_native_executor: false,
Expand Down
12 changes: 12 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl PyDaftExecutionConfig {
csv_target_filesize: Option<usize>,
csv_inflation_factor: Option<f64>,
shuffle_aggregation_default_partitions: Option<usize>,
shuffle_join_default_partitions: Option<usize>,
read_sql_partition_size_bytes: Option<usize>,
enable_aqe: Option<bool>,
enable_native_executor: Option<bool>,
Expand Down Expand Up @@ -143,10 +144,16 @@ impl PyDaftExecutionConfig {
if let Some(csv_inflation_factor) = csv_inflation_factor {
config.csv_inflation_factor = csv_inflation_factor;
}

if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions
{
config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions;
}

if let Some(shuffle_join_default_partitions) = shuffle_join_default_partitions {
config.shuffle_join_default_partitions = shuffle_join_default_partitions;
}

if let Some(read_sql_partition_size_bytes) = read_sql_partition_size_bytes {
config.read_sql_partition_size_bytes = read_sql_partition_size_bytes;
}
Expand Down Expand Up @@ -231,6 +238,11 @@ impl PyDaftExecutionConfig {
Ok(self.config.shuffle_aggregation_default_partitions)
}

#[getter]
fn get_shuffle_join_default_partitions(&self) -> PyResult<usize> {
Ok(self.config.shuffle_join_default_partitions)
}

#[getter]
fn get_read_sql_partition_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.read_sql_partition_size_bytes)
Expand Down
7 changes: 5 additions & 2 deletions src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ pub(super) fn translate_single_logical_node(
left_clustering_spec.num_partitions(),
right_clustering_spec.num_partitions(),
);
println!("1. num_partitions: {}", num_partitions);
let num_partitions = min(num_partitions, cfg.shuffle_join_default_partitions);
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
println!("2. num_partitions: {}", num_partitions);
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved

let is_left_hash_partitioned =
matches!(left_clustering_spec.as_ref(), ClusteringSpec::Hash(..))
Expand Down Expand Up @@ -645,7 +648,7 @@ pub(super) fn translate_single_logical_node(
{
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
left_physical,
num_partitions,
min(num_partitions, cfg.shuffle_join_default_partitions),
left_on.clone(),
));
left_physical =
Expand All @@ -656,7 +659,7 @@ pub(super) fn translate_single_logical_node(
{
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
right_physical,
num_partitions,
min(num_partitions, cfg.shuffle_join_default_partitions),
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
right_on.clone(),
));
right_physical =
Expand Down
54 changes: 54 additions & 0 deletions tests/dataframe/test_joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import pyarrow as pa
import pytest

import daft
from daft import col
from daft.context import get_context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from tests.utils import sort_arrow_table
Expand Down Expand Up @@ -1071,3 +1073,55 @@ def test_join_same_name_alias_with_compute(join_strategy, join_type, expected, m
assert sort_arrow_table(pa.Table.from_pydict(daft_df.to_pydict()), "a") == sort_arrow_table(
pa.Table.from_pydict(expected), "a"
)


# the partition size should be the min(shuffle_join_default_partitions, max(left_partition_size, right_partition_size))
@pytest.mark.parametrize("shuffle_join_default_partitions", [None, 20])
def test_join_result_partitions_smaller_than_input(shuffle_join_default_partitions):
if shuffle_join_default_partitions is None:
min_partitions = get_context().daft_execution_config.shuffle_join_default_partitions
else:
min_partitions = shuffle_join_default_partitions

with daft.execution_config_ctx(shuffle_join_default_partitions=shuffle_join_default_partitions):
right_partition_size = 50
for left_partition_size in [1, min_partitions, min_partitions + 1]:
df_left = daft.from_pydict(
{"group": [i for i in range(min_partitions + 1)], "value": [i for i in range(min_partitions + 1)]}
)
df_left = df_left.into_partitions(left_partition_size)

df_right = daft.from_pydict(
{"group": [i for i in range(right_partition_size)], "value": [i for i in range(right_partition_size)]}
)

df_right = df_right.into_partitions(right_partition_size)

actual = df_left.join(df_right, on="group", how="inner", strategy="hash").collect()
n_partitions = actual.num_partitions()
expected_n_partitions = min(min_partitions, max(left_partition_size, right_partition_size))
assert n_partitions == expected_n_partitions


# for sort_merge, the result partitions should always be max(shuffle_join_default_partitions, max(left_partition_size, right_partition_size))
@pytest.mark.parametrize("shuffle_join_default_partitions", [None, 20])
def test_join_result_partitions_for_sortmerge(shuffle_join_default_partitions):
if shuffle_join_default_partitions is None:
min_partitions = get_context().daft_execution_config.shuffle_join_default_partitions
else:
min_partitions = shuffle_join_default_partitions

with daft.execution_config_ctx(shuffle_join_default_partitions=shuffle_join_default_partitions):
for partition_size in [1, min_partitions, min_partitions + 1]:
df_left = daft.from_pydict(
{"group": [i for i in range(min_partitions + 1)], "value": [i for i in range(min_partitions + 1)]}
)
df_left = df_left.into_partitions(partition_size)

df_right = daft.from_pydict({"group": [i for i in range(50)], "value": [i for i in range(50)]})

df_right = df_right.into_partitions(50)

actual = df_left.join(df_right, on="group", how="inner", strategy="sort_merge").collect()

assert actual.num_partitions() == max(partition_size, 50)
Loading