From c41671811882b003e03effbd935c6eaae165c7be Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 13 Feb 2024 11:26:46 +0800 Subject: [PATCH] [PERF] Set min_partitions for post aggregation shuffles (#1861) Added a config variable to set the minimum partitions produced after first stage aggregations. Here's a comparison between shuffling with same number of input partitions vs a much smaller number Screenshot 2024-02-09 at 4 40 08 PM Screenshot 2024-02-09 at 4 41 14 PM --- daft/context.py | 3 +++ daft/daft.pyi | 3 +++ src/common/daft-config/src/lib.rs | 2 ++ src/common/daft-config/src/python.rs | 10 ++++++++++ src/daft-plan/src/planner.rs | 12 +++++++++--- tests/dataframe/test_aggregations.py | 28 ++++++++++++++++++++++++++++ 6 files changed, 55 insertions(+), 3 deletions(-) diff --git a/daft/context.py b/daft/context.py index d12e34fd71..48ef44df94 100644 --- a/daft/context.py +++ b/daft/context.py @@ -253,6 +253,7 @@ def set_execution_config( parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`) @@ -281,6 +282,7 @@ def set_execution_config( parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0 csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 + shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200. """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context() @@ -299,6 +301,7 @@ def set_execution_config( parquet_inflation_factor=parquet_inflation_factor, csv_target_filesize=csv_target_filesize, csv_inflation_factor=csv_inflation_factor, + shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions, ) ctx._daft_execution_config = new_daft_execution_config diff --git a/daft/daft.pyi b/daft/daft.pyi index 661e179284..58e7f05bdb 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1203,6 +1203,7 @@ class PyDaftExecutionConfig: parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> PyDaftExecutionConfig: ... @property def scan_tasks_min_size_bytes(self) -> int: ... @@ -1226,6 +1227,8 @@ class PyDaftExecutionConfig: def csv_target_filesize(self) -> int: ... @property def csv_inflation_factor(self) -> float: ... + @property + def shuffle_aggregation_default_partitions(self) -> int: ... class PyDaftPlanningConfig: def with_config_values( diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index f1986868df..a3dd531d3a 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -34,6 +34,7 @@ pub struct DaftExecutionConfig { pub parquet_inflation_factor: f64, pub csv_target_filesize: usize, pub csv_inflation_factor: f64, + pub shuffle_aggregation_default_partitions: usize, } impl Default for DaftExecutionConfig { @@ -51,6 +52,7 @@ impl Default for DaftExecutionConfig { parquet_inflation_factor: 3.0, csv_target_filesize: 512 * 1024 * 1024, // 512MB csv_inflation_factor: 0.5, + shuffle_aggregation_default_partitions: 200, } } } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 0a08f8d2cb..8d2124c7ba 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -89,6 +89,7 @@ impl PyDaftExecutionConfig { parquet_inflation_factor: Option, csv_target_filesize: Option, csv_inflation_factor: Option, + shuffle_aggregation_default_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -131,6 +132,10 @@ impl PyDaftExecutionConfig { if let Some(csv_inflation_factor) = csv_inflation_factor { config.csv_inflation_factor = csv_inflation_factor; } + if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions + { + config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions; + } Ok(PyDaftExecutionConfig { config: Arc::new(config), @@ -192,6 +197,11 @@ impl PyDaftExecutionConfig { Ok(self.config.csv_inflation_factor) } + #[getter] + fn get_shuffle_aggregation_default_partitions(&self) -> PyResult { + Ok(self.config.shuffle_aggregation_default_partitions) + } + fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftExecutionConfig should be serializable to bytes"); diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 60a4ff35cc..5e55477ea8 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -1,6 +1,9 @@ use std::cmp::Ordering; use std::sync::Arc; -use std::{cmp::max, collections::HashMap}; +use std::{ + cmp::{max, min}, + collections::HashMap, +}; use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; @@ -317,7 +320,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe }) => { use daft_dsl::AggExpr::{self, *}; use daft_dsl::Expr::Column; - let input_plan = plan(input, cfg)?; + let input_plan = plan(input, cfg.clone())?; let num_input_partitions = input_plan.partition_spec().num_partitions; @@ -497,7 +500,10 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe } else { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( first_stage_agg.into(), - num_input_partitions, + min( + num_input_partitions, + cfg.shuffle_aggregation_default_partitions, + ), groupby.clone(), )); PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index 497b8bbe0b..846b534c66 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -8,6 +8,7 @@ import daft from daft import col +from daft.context import get_context from daft.datatype import DataType from daft.errors import ExpressionTypeError from daft.utils import freeze @@ -365,3 +366,30 @@ def test_groupby_agg_pyobjects(): assert res["groups"] == [1, 2] assert res["count"] == [2, 1] assert res["list"] == [[objects[0], objects[2], objects[4]], [objects[1], objects[3]]] + + +@pytest.mark.parametrize("shuffle_aggregation_default_partitions", [None, 20]) +def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_default_partitions): + if shuffle_aggregation_default_partitions is None: + min_partitions = get_context().daft_execution_config.shuffle_aggregation_default_partitions + else: + daft.set_execution_config(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions) + min_partitions = shuffle_aggregation_default_partitions + + for partition_size in [1, min_partitions, min_partitions + 1]: + df = daft.from_pydict( + {"group": [i for i in range(min_partitions + 1)], "value": [i for i in range(min_partitions + 1)]} + ) + df = df.into_partitions(partition_size) + + df = df.groupby(col("group")).agg( + [ + (col("value").alias("sum"), "sum"), + (col("value").alias("mean"), "mean"), + (col("value").alias("min"), "min"), + ] + ) + + df = df.collect() + + assert df.num_partitions() == min(min_partitions, partition_size)