Skip to content

Commit

Permalink
[PERF] Set min_partitions for post aggregation shuffles (#1861)
Browse files Browse the repository at this point in the history
Added a config variable to set the minimum partitions produced after
first stage aggregations.

Here's a comparison between shuffling with same number of input
partitions vs a much smaller number

<img width="824" alt="Screenshot 2024-02-09 at 4 40 08 PM"
src="https://github.com/Eventual-Inc/Daft/assets/77712970/09566028-6776-441e-909f-db0af6ebf3f2">

<img width="816" alt="Screenshot 2024-02-09 at 4 41 14 PM"
src="https://github.com/Eventual-Inc/Daft/assets/77712970/d45b4801-4044-4879-92ec-f9196f06c0cc">
  • Loading branch information
colin-ho authored Feb 13, 2024
1 parent adac24c commit c416718
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 3 deletions.
3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def set_execution_config(
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values
are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`)
Expand Down Expand Up @@ -281,6 +282,7 @@ def set_execution_config(
parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand All @@ -299,6 +301,7 @@ def set_execution_config(
parquet_inflation_factor=parquet_inflation_factor,
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
)

ctx._daft_execution_config = new_daft_execution_config
Expand Down
3 changes: 3 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,7 @@ class PyDaftExecutionConfig:
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
) -> PyDaftExecutionConfig: ...
@property
def scan_tasks_min_size_bytes(self) -> int: ...
Expand All @@ -1226,6 +1227,8 @@ class PyDaftExecutionConfig:
def csv_target_filesize(self) -> int: ...
@property
def csv_inflation_factor(self) -> float: ...
@property
def shuffle_aggregation_default_partitions(self) -> int: ...

class PyDaftPlanningConfig:
def with_config_values(
Expand Down
2 changes: 2 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct DaftExecutionConfig {
pub parquet_inflation_factor: f64,
pub csv_target_filesize: usize,
pub csv_inflation_factor: f64,
pub shuffle_aggregation_default_partitions: usize,
}

impl Default for DaftExecutionConfig {
Expand All @@ -51,6 +52,7 @@ impl Default for DaftExecutionConfig {
parquet_inflation_factor: 3.0,
csv_target_filesize: 512 * 1024 * 1024, // 512MB
csv_inflation_factor: 0.5,
shuffle_aggregation_default_partitions: 200,
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl PyDaftExecutionConfig {
parquet_inflation_factor: Option<f64>,
csv_target_filesize: Option<usize>,
csv_inflation_factor: Option<f64>,
shuffle_aggregation_default_partitions: Option<usize>,
) -> PyResult<PyDaftExecutionConfig> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -131,6 +132,10 @@ impl PyDaftExecutionConfig {
if let Some(csv_inflation_factor) = csv_inflation_factor {
config.csv_inflation_factor = csv_inflation_factor;
}
if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions
{
config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions;
}

Ok(PyDaftExecutionConfig {
config: Arc::new(config),
Expand Down Expand Up @@ -192,6 +197,11 @@ impl PyDaftExecutionConfig {
Ok(self.config.csv_inflation_factor)
}

#[getter]
fn get_shuffle_aggregation_default_partitions(&self) -> PyResult<usize> {
Ok(self.config.shuffle_aggregation_default_partitions)
}

fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec<u8>,))> {
let bin_data = bincode::serialize(self.config.as_ref())
.expect("DaftExecutionConfig should be serializable to bytes");
Expand Down
12 changes: 9 additions & 3 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::cmp::Ordering;
use std::sync::Arc;
use std::{cmp::max, collections::HashMap};
use std::{
cmp::{max, min},
collections::HashMap,
};

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
Expand Down Expand Up @@ -317,7 +320,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftExecutionConfig>) -> DaftRe
}) => {
use daft_dsl::AggExpr::{self, *};
use daft_dsl::Expr::Column;
let input_plan = plan(input, cfg)?;
let input_plan = plan(input, cfg.clone())?;

let num_input_partitions = input_plan.partition_spec().num_partitions;

Expand Down Expand Up @@ -497,7 +500,10 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftExecutionConfig>) -> DaftRe
} else {
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
first_stage_agg.into(),
num_input_partitions,
min(
num_input_partitions,
cfg.shuffle_aggregation_default_partitions,
),
groupby.clone(),
));
PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))
Expand Down
28 changes: 28 additions & 0 deletions tests/dataframe/test_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import daft
from daft import col
from daft.context import get_context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from daft.utils import freeze
Expand Down Expand Up @@ -365,3 +366,30 @@ def test_groupby_agg_pyobjects():
assert res["groups"] == [1, 2]
assert res["count"] == [2, 1]
assert res["list"] == [[objects[0], objects[2], objects[4]], [objects[1], objects[3]]]


@pytest.mark.parametrize("shuffle_aggregation_default_partitions", [None, 20])
def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_default_partitions):
if shuffle_aggregation_default_partitions is None:
min_partitions = get_context().daft_execution_config.shuffle_aggregation_default_partitions
else:
daft.set_execution_config(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions)
min_partitions = shuffle_aggregation_default_partitions

for partition_size in [1, min_partitions, min_partitions + 1]:
df = daft.from_pydict(
{"group": [i for i in range(min_partitions + 1)], "value": [i for i in range(min_partitions + 1)]}
)
df = df.into_partitions(partition_size)

df = df.groupby(col("group")).agg(
[
(col("value").alias("sum"), "sum"),
(col("value").alias("mean"), "mean"),
(col("value").alias("min"), "min"),
]
)

df = df.collect()

assert df.num_partitions() == min(min_partitions, partition_size)

0 comments on commit c416718

Please sign in to comment.