From 6211135e27341232b51b878b66ab0e834b2175d6 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 9 Feb 2024 16:02:39 +0800 Subject: [PATCH 1/4] aggregation_min_partitions --- daft/context.py | 3 +++ daft/daft.pyi | 3 +++ src/common/daft-config/src/lib.rs | 2 ++ src/common/daft-config/src/python.rs | 9 +++++++++ src/daft-plan/src/planner.rs | 9 ++++++--- tests/dataframe/test_aggregations.py | 28 ++++++++++++++++++++++++++++ 6 files changed, 51 insertions(+), 3 deletions(-) diff --git a/daft/context.py b/daft/context.py index 84e219e27f..3b36a072c9 100644 --- a/daft/context.py +++ b/daft/context.py @@ -215,6 +215,7 @@ def set_execution_config( parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, + aggregation_min_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`) @@ -243,6 +244,7 @@ def set_execution_config( parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0 csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 + aggregation_min_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 10, unless the number of input partitions is less than 10. """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context() @@ -260,6 +262,7 @@ def set_execution_config( parquet_inflation_factor=parquet_inflation_factor, csv_target_filesize=csv_target_filesize, csv_inflation_factor=csv_inflation_factor, + aggregation_min_partitions=aggregation_min_partitions, ) ctx.daft_execution_config = new_daft_execution_config diff --git a/daft/daft.pyi b/daft/daft.pyi index 6d2b51ebfe..20aa1cadba 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1197,6 +1197,7 @@ class PyDaftExecutionConfig: parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, + aggregation_min_partitions: int | None = None, ) -> PyDaftExecutionConfig: ... @property def scan_tasks_min_size_bytes(self) -> int: ... @@ -1220,6 +1221,8 @@ class PyDaftExecutionConfig: def csv_target_filesize(self) -> int: ... @property def csv_inflation_factor(self) -> float: ... + @property + def aggregation_min_partitions(self) -> int: ... class PyDaftPlanningConfig: def with_config_values( diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index f1986868df..85b646172e 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -34,6 +34,7 @@ pub struct DaftExecutionConfig { pub parquet_inflation_factor: f64, pub csv_target_filesize: usize, pub csv_inflation_factor: f64, + pub aggregation_min_partitions: usize, } impl Default for DaftExecutionConfig { @@ -51,6 +52,7 @@ impl Default for DaftExecutionConfig { parquet_inflation_factor: 3.0, csv_target_filesize: 512 * 1024 * 1024, // 512MB csv_inflation_factor: 0.5, + aggregation_min_partitions: 10, } } } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 0a08f8d2cb..de3a585df2 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -89,6 +89,7 @@ impl PyDaftExecutionConfig { parquet_inflation_factor: Option, csv_target_filesize: Option, csv_inflation_factor: Option, + aggregation_min_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -131,6 +132,9 @@ impl PyDaftExecutionConfig { if let Some(csv_inflation_factor) = csv_inflation_factor { config.csv_inflation_factor = csv_inflation_factor; } + if let Some(aggregation_min_partitions) = aggregation_min_partitions { + config.aggregation_min_partitions = aggregation_min_partitions; + } Ok(PyDaftExecutionConfig { config: Arc::new(config), @@ -192,6 +196,11 @@ impl PyDaftExecutionConfig { Ok(self.config.csv_inflation_factor) } + #[getter] + fn get_aggregation_min_partitions(&self) -> PyResult { + Ok(self.config.aggregation_min_partitions) + } + fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftExecutionConfig should be serializable to bytes"); diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 60a4ff35cc..50a0e2fe19 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -1,6 +1,9 @@ use std::cmp::Ordering; use std::sync::Arc; -use std::{cmp::max, collections::HashMap}; +use std::{ + cmp::{max, min}, + collections::HashMap, +}; use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; @@ -317,7 +320,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe }) => { use daft_dsl::AggExpr::{self, *}; use daft_dsl::Expr::Column; - let input_plan = plan(input, cfg)?; + let input_plan = plan(input, cfg.clone())?; let num_input_partitions = input_plan.partition_spec().num_partitions; @@ -497,7 +500,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe } else { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( first_stage_agg.into(), - num_input_partitions, + min(num_input_partitions, cfg.aggregation_min_partitions), groupby.clone(), )); PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index 497b8bbe0b..b04502ae67 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -8,6 +8,7 @@ import daft from daft import col +from daft.context import get_context from daft.datatype import DataType from daft.errors import ExpressionTypeError from daft.utils import freeze @@ -365,3 +366,30 @@ def test_groupby_agg_pyobjects(): assert res["groups"] == [1, 2] assert res["count"] == [2, 1] assert res["list"] == [[objects[0], objects[2], objects[4]], [objects[1], objects[3]]] + + +@pytest.mark.parametrize("aggregation_min_partitions", [None, 20]) +def test_groupby_result_partitions_smaller_than_input(aggregation_min_partitions): + if aggregation_min_partitions is None: + min_partitions = get_context().daft_execution_config.aggregation_min_partitions + else: + daft.set_execution_config(aggregation_min_partitions=aggregation_min_partitions) + min_partitions = aggregation_min_partitions + + for partition_size in [1, min_partitions, min_partitions + 1]: + df = daft.from_pydict( + {"group": [i for i in range(min_partitions + 1)], "value": [i for i in range(min_partitions + 1)]} + ) + df = df.into_partitions(partition_size) + + df = df.groupby(col("group")).agg( + [ + (col("value").alias("sum"), "sum"), + (col("value").alias("mean"), "mean"), + (col("value").alias("min"), "min"), + ] + ) + + df = df.collect() + + assert df.num_partitions() == min(min_partitions, partition_size) From f314f39b02ce99ac41074e221d3f39140cdc485e Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 13 Feb 2024 08:39:15 +0800 Subject: [PATCH 2/4] use plannign config --- daft/context.py | 6 ++-- daft/daft.pyi | 10 +++--- daft/dataframe/dataframe.py | 14 ++++++-- daft/logical/builder.py | 13 +++++--- src/common/daft-config/src/lib.rs | 14 ++++++-- src/common/daft-config/src/python.rs | 20 ++++++------ src/daft-plan/src/builder.rs | 32 ++++++++++++++++--- src/daft-plan/src/logical_ops/agg.rs | 4 +++ src/daft-plan/src/logical_plan.rs | 2 +- .../rules/push_down_projection.rs | 8 ++++- src/daft-plan/src/planner.rs | 9 ++++-- tests/dataframe/test_aggregations.py | 12 +++---- 12 files changed, 104 insertions(+), 40 deletions(-) diff --git a/daft/context.py b/daft/context.py index 3b36a072c9..d73a3f855c 100644 --- a/daft/context.py +++ b/daft/context.py @@ -180,6 +180,7 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext: def set_planning_config( config: PyDaftPlanningConfig | None = None, default_io_config: IOConfig | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control Daft plan construction behavior. These configuration values are used when a Dataframe is being constructed (e.g. calls to create a Dataframe, or to build on an existing Dataframe) @@ -189,12 +190,14 @@ def set_planning_config( that the old (current) config should be used. default_io_config: A default IOConfig to use in the absence of one being explicitly passed into any Expression (e.g. `.url.download()`) or Dataframe operation (e.g. `daft.read_parquet()`). + shuffle_aggregation_default_partitions: Default number of partitions to use when shuffling data during aggregation operations. Defaults to 200, unless the number of input partitions is lower. """ # Replace values in the DaftPlanningConfig with user-specified overrides ctx = get_context() old_daft_planning_config = ctx.daft_planning_config if config is None else config new_daft_planning_config = old_daft_planning_config.with_config_values( default_io_config=default_io_config, + shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions, ) ctx.daft_planning_config = new_daft_planning_config @@ -215,7 +218,6 @@ def set_execution_config( parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, - aggregation_min_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`) @@ -244,7 +246,6 @@ def set_execution_config( parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0 csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 - aggregation_min_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 10, unless the number of input partitions is less than 10. """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context() @@ -262,7 +263,6 @@ def set_execution_config( parquet_inflation_factor=parquet_inflation_factor, csv_target_filesize=csv_target_filesize, csv_inflation_factor=csv_inflation_factor, - aggregation_min_partitions=aggregation_min_partitions, ) ctx.daft_execution_config = new_daft_execution_config diff --git a/daft/daft.pyi b/daft/daft.pyi index 20aa1cadba..6078417d49 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1158,7 +1158,9 @@ class LogicalPlanBuilder: def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: ... def distinct(self) -> LogicalPlanBuilder: ... def sample(self, fraction: float, with_replacement: bool, seed: int | None) -> LogicalPlanBuilder: ... - def aggregate(self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr]) -> LogicalPlanBuilder: ... + def aggregate( + self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr], shuffle_aggregation_default_partitions: int + ) -> LogicalPlanBuilder: ... def join( self, right: LogicalPlanBuilder, @@ -1197,7 +1199,6 @@ class PyDaftExecutionConfig: parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, - aggregation_min_partitions: int | None = None, ) -> PyDaftExecutionConfig: ... @property def scan_tasks_min_size_bytes(self) -> int: ... @@ -1221,16 +1222,17 @@ class PyDaftExecutionConfig: def csv_target_filesize(self) -> int: ... @property def csv_inflation_factor(self) -> float: ... - @property - def aggregation_min_partitions(self) -> int: ... class PyDaftPlanningConfig: def with_config_values( self, default_io_config: IOConfig | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> PyDaftPlanningConfig: ... @property def default_io_config(self) -> IOConfig: ... + @property + def shuffle_aggregation_default_partitions(self) -> int: ... def build_type() -> str: ... def version() -> str: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index a370d998ed..4bbf16bb87 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -949,12 +949,22 @@ def _agg( exprs_to_agg: List[Tuple[Expression, str]] = list( zip(self.__column_input_to_expression([c for c, _ in to_agg]), [op for _, op in to_agg]) ) + shuffle_aggregation_default_partitions = ( + get_context().daft_planning_config.shuffle_aggregation_default_partitions + ) - builder = self._builder.agg(exprs_to_agg, list(group_by) if group_by is not None else None) + builder = self._builder.agg( + exprs_to_agg, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions + ) return DataFrame(builder) def _map_groups(self, udf: Expression, group_by: Optional[ExpressionsProjection] = None) -> "DataFrame": - builder = self._builder.map_groups(udf, list(group_by) if group_by is not None else None) + shuffle_aggregation_default_partitions = ( + get_context().daft_planning_config.shuffle_aggregation_default_partitions + ) + builder = self._builder.map_groups( + udf, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions + ) return DataFrame(builder) @DataframePublicAPI diff --git a/daft/logical/builder.py b/daft/logical/builder.py index f5ad0451aa..1aababff0b 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -128,7 +128,7 @@ def explode(self, explode_expressions: list[Expression]) -> LogicalPlanBuilder: def count(self) -> LogicalPlanBuilder: # TODO(Clark): Add dedicated logical/physical ops when introducing metadata-based count optimizations. first_col = col(self.schema().column_names()[0]) - builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], []) + builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], [], 1) builder = builder.project([first_col.alias("count")._expr], ResourceRequest()) return LogicalPlanBuilder(builder) @@ -158,6 +158,7 @@ def agg( self, to_agg: list[tuple[Expression, str]], group_by: list[Expression] | None, + shuffle_aggregation_default_partitions: int, ) -> LogicalPlanBuilder: exprs = [] for expr, op in to_agg: @@ -179,12 +180,16 @@ def agg( raise NotImplementedError(f"Aggregation {op} is not implemented.") group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else [] - builder = self._builder.aggregate([expr._expr for expr in exprs], group_by_pyexprs) + builder = self._builder.aggregate( + [expr._expr for expr in exprs], group_by_pyexprs, shuffle_aggregation_default_partitions + ) return LogicalPlanBuilder(builder) - def map_groups(self, udf: Expression, group_by: list[Expression] | None) -> LogicalPlanBuilder: + def map_groups( + self, udf: Expression, group_by: list[Expression] | None, shuffle_aggregation_default_partitions: int + ) -> LogicalPlanBuilder: group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else [] - builder = self._builder.aggregate([udf._expr], group_by_pyexprs) + builder = self._builder.aggregate([udf._expr], group_by_pyexprs, shuffle_aggregation_default_partitions) return LogicalPlanBuilder(builder) def join( # type: ignore[override] diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 85b646172e..30c890f167 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -6,9 +6,19 @@ use serde::{Deserialize, Serialize}; /// 1. Creation of a Dataframe including any file listing and schema inference that needs to happen. Note /// that this does not include the actual scan, which is taken care of by the DaftExecutionConfig. /// 2. Building of logical plan nodes -#[derive(Clone, Serialize, Deserialize, Default)] +#[derive(Clone, Serialize, Deserialize)] pub struct DaftPlanningConfig { pub default_io_config: IOConfig, + pub shuffle_aggregation_default_partitions: usize, +} + +impl Default for DaftPlanningConfig { + fn default() -> Self { + DaftPlanningConfig { + default_io_config: IOConfig::default(), + shuffle_aggregation_default_partitions: 200, + } + } } /// Configurations for Daft to use during the execution of a Dataframe @@ -34,7 +44,6 @@ pub struct DaftExecutionConfig { pub parquet_inflation_factor: f64, pub csv_target_filesize: usize, pub csv_inflation_factor: f64, - pub aggregation_min_partitions: usize, } impl Default for DaftExecutionConfig { @@ -52,7 +61,6 @@ impl Default for DaftExecutionConfig { parquet_inflation_factor: 3.0, csv_target_filesize: 512 * 1024 * 1024, // 512MB csv_inflation_factor: 0.5, - aggregation_min_partitions: 10, } } } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index de3a585df2..f6e9463cfe 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -22,6 +22,7 @@ impl PyDaftPlanningConfig { fn with_config_values( &mut self, default_io_config: Option, + shuffle_aggregation_default_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -29,6 +30,11 @@ impl PyDaftPlanningConfig { config.default_io_config = default_io_config.config; } + if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions + { + config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions; + } + Ok(PyDaftPlanningConfig { config: Arc::new(config), }) @@ -41,6 +47,11 @@ impl PyDaftPlanningConfig { }) } + #[getter(shuffle_aggregation_default_partitions)] + fn shuffle_aggregation_default_partitions(&self) -> PyResult { + Ok(self.config.shuffle_aggregation_default_partitions) + } + fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftPlanningConfig should be serializable to bytes"); @@ -89,7 +100,6 @@ impl PyDaftExecutionConfig { parquet_inflation_factor: Option, csv_target_filesize: Option, csv_inflation_factor: Option, - aggregation_min_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -132,9 +142,6 @@ impl PyDaftExecutionConfig { if let Some(csv_inflation_factor) = csv_inflation_factor { config.csv_inflation_factor = csv_inflation_factor; } - if let Some(aggregation_min_partitions) = aggregation_min_partitions { - config.aggregation_min_partitions = aggregation_min_partitions; - } Ok(PyDaftExecutionConfig { config: Arc::new(config), @@ -196,11 +203,6 @@ impl PyDaftExecutionConfig { Ok(self.config.csv_inflation_factor) } - #[getter] - fn get_aggregation_min_partitions(&self) -> PyResult { - Ok(self.config.aggregation_min_partitions) - } - fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftExecutionConfig should be serializable to bytes"); diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 30a84c9c82..5ef1a35f9e 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -190,7 +190,12 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } - pub fn aggregate(&self, agg_exprs: Vec, groupby_exprs: Vec) -> DaftResult { + pub fn aggregate( + &self, + agg_exprs: Vec, + groupby_exprs: Vec, + shuffle_aggregation_default_partitions: usize, + ) -> DaftResult { let agg_exprs = agg_exprs .iter() .map(|expr| match expr { @@ -205,8 +210,13 @@ impl LogicalPlanBuilder { }) .collect::>>()?; - let logical_plan: LogicalPlan = - logical_ops::Aggregate::try_new(self.plan.clone(), agg_exprs, groupby_exprs)?.into(); + let logical_plan: LogicalPlan = logical_ops::Aggregate::try_new( + self.plan.clone(), + agg_exprs, + groupby_exprs, + shuffle_aggregation_default_partitions, + )? + .into(); Ok(logical_plan.into()) } @@ -410,7 +420,12 @@ impl PyLogicalPlanBuilder { .into()) } - pub fn aggregate(&self, agg_exprs: Vec, groupby_exprs: Vec) -> PyResult { + pub fn aggregate( + &self, + agg_exprs: Vec, + groupby_exprs: Vec, + shuffle_aggregation_default_partitions: usize, + ) -> PyResult { let agg_exprs = agg_exprs .iter() .map(|expr| expr.clone().into()) @@ -419,7 +434,14 @@ impl PyLogicalPlanBuilder { .iter() .map(|expr| expr.clone().into()) .collect::>(); - Ok(self.builder.aggregate(agg_exprs, groupby_exprs)?.into()) + Ok(self + .builder + .aggregate( + agg_exprs, + groupby_exprs, + shuffle_aggregation_default_partitions, + )? + .into()) } pub fn join( diff --git a/src/daft-plan/src/logical_ops/agg.rs b/src/daft-plan/src/logical_ops/agg.rs index f9b6ca21f1..9e2e58fbbe 100644 --- a/src/daft-plan/src/logical_ops/agg.rs +++ b/src/daft-plan/src/logical_ops/agg.rs @@ -21,6 +21,8 @@ pub struct Aggregate { pub groupby: Vec, pub output_schema: SchemaRef, + + pub shuffle_aggregation_default_partitions: usize, } impl Aggregate { @@ -28,6 +30,7 @@ impl Aggregate { input: Arc, aggregations: Vec, groupby: Vec, + shuffle_aggregation_default_partitions: usize, ) -> logical_plan::Result { let output_schema = { let upstream_schema = input.schema(); @@ -45,6 +48,7 @@ impl Aggregate { groupby, output_schema, input, + shuffle_aggregation_default_partitions, }) } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 5db0c5fee7..44a7f8c29d 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -161,7 +161,7 @@ impl LogicalPlan { Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()), Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()), Self::Distinct(_) => Self::Distinct(Distinct::new(input.clone())), - Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone()).unwrap()), + Self::Aggregate(Aggregate { aggregations, groupby, shuffle_aggregation_default_partitions,..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone(),*shuffle_aggregation_default_partitions).unwrap()), Self::Sink(Sink { sink_info, .. }) => Self::Sink(Sink::try_new(input.clone(), sink_info.clone()).unwrap()), _ => panic!("Logical op {} has two inputs, but got one", self), }, diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index afcce3f2b3..54d4e31592 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -228,6 +228,7 @@ impl PushDownProjection { aggregate.input.clone(), pruned_aggregate_exprs, aggregate.groupby.clone(), + aggregate.shuffle_aggregation_default_partitions, )? .into(); @@ -501,6 +502,7 @@ impl OptimizerRule for PushDownProjection { mod tests { use std::sync::Arc; + use common_daft_config::DaftPlanningConfig; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit}; @@ -666,7 +668,11 @@ mod tests { Field::new("b", DataType::Int64), Field::new("c", DataType::Int64), ]) - .aggregate(vec![col("a").mean(), col("b").mean()], vec![col("c")])? + .aggregate( + vec![col("a").mean(), col("b").mean()], + vec![col("c")], + DaftPlanningConfig::default().shuffle_aggregation_default_partitions, + )? .project(vec![col("a")], Default::default())? .build(); diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 50a0e2fe19..3f17396a7d 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -316,6 +316,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe aggregations, groupby, input, + shuffle_aggregation_default_partitions, .. }) => { use daft_dsl::AggExpr::{self, *}; @@ -500,7 +501,10 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe } else { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( first_stage_agg.into(), - min(num_input_partitions, cfg.aggregation_min_partitions), + min( + num_input_partitions, + *shuffle_aggregation_default_partitions, + ), groupby.clone(), )); PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) @@ -763,7 +767,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe #[cfg(test)] mod tests { - use common_daft_config::DaftExecutionConfig; + use common_daft_config::{DaftExecutionConfig, DaftPlanningConfig}; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit, AggExpr, Expr}; @@ -861,6 +865,7 @@ mod tests { .aggregate( vec![Expr::Agg(AggExpr::Sum(col("a").into()))], vec![col("b")], + DaftPlanningConfig::default().shuffle_aggregation_default_partitions, )? .repartition(Some(10), vec![col("b")], PartitionScheme::Hash)? .build(); diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index b04502ae67..aacbc01605 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -368,13 +368,13 @@ def test_groupby_agg_pyobjects(): assert res["list"] == [[objects[0], objects[2], objects[4]], [objects[1], objects[3]]] -@pytest.mark.parametrize("aggregation_min_partitions", [None, 20]) -def test_groupby_result_partitions_smaller_than_input(aggregation_min_partitions): - if aggregation_min_partitions is None: - min_partitions = get_context().daft_execution_config.aggregation_min_partitions +@pytest.mark.parametrize("shuffle_aggregation_default_partitions", [None, 201]) +def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_default_partitions): + if shuffle_aggregation_default_partitions is None: + min_partitions = get_context().daft_planning_config.shuffle_aggregation_default_partitions else: - daft.set_execution_config(aggregation_min_partitions=aggregation_min_partitions) - min_partitions = aggregation_min_partitions + daft.set_planning_config(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions) + min_partitions = shuffle_aggregation_default_partitions for partition_size in [1, min_partitions, min_partitions + 1]: df = daft.from_pydict( From 151931cfb5a4a6974b98e6f29f721e6c62bf81bf Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 13 Feb 2024 10:19:37 +0800 Subject: [PATCH 3/4] back to exec config --- daft/context.py | 6 ++-- daft/daft.pyi | 10 +++--- daft/dataframe/dataframe.py | 14 ++------ daft/logical/builder.py | 13 +++----- src/common/daft-config/src/lib.rs | 14 ++------ src/common/daft-config/src/python.rs | 21 ++++++------ src/daft-plan/src/builder.rs | 32 +++---------------- src/daft-plan/src/logical_ops/agg.rs | 4 --- src/daft-plan/src/logical_plan.rs | 2 +- .../rules/push_down_projection.rs | 8 +---- src/daft-plan/src/planner.rs | 6 ++-- tests/dataframe/test_aggregations.py | 6 ++-- 12 files changed, 38 insertions(+), 98 deletions(-) diff --git a/daft/context.py b/daft/context.py index d73a3f855c..7c099217a2 100644 --- a/daft/context.py +++ b/daft/context.py @@ -180,7 +180,6 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext: def set_planning_config( config: PyDaftPlanningConfig | None = None, default_io_config: IOConfig | None = None, - shuffle_aggregation_default_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control Daft plan construction behavior. These configuration values are used when a Dataframe is being constructed (e.g. calls to create a Dataframe, or to build on an existing Dataframe) @@ -190,14 +189,12 @@ def set_planning_config( that the old (current) config should be used. default_io_config: A default IOConfig to use in the absence of one being explicitly passed into any Expression (e.g. `.url.download()`) or Dataframe operation (e.g. `daft.read_parquet()`). - shuffle_aggregation_default_partitions: Default number of partitions to use when shuffling data during aggregation operations. Defaults to 200, unless the number of input partitions is lower. """ # Replace values in the DaftPlanningConfig with user-specified overrides ctx = get_context() old_daft_planning_config = ctx.daft_planning_config if config is None else config new_daft_planning_config = old_daft_planning_config.with_config_values( default_io_config=default_io_config, - shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions, ) ctx.daft_planning_config = new_daft_planning_config @@ -218,6 +215,7 @@ def set_execution_config( parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`) @@ -246,6 +244,7 @@ def set_execution_config( parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0 csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 + shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 10, unless the number of input partitions is less than 10. """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context() @@ -263,6 +262,7 @@ def set_execution_config( parquet_inflation_factor=parquet_inflation_factor, csv_target_filesize=csv_target_filesize, csv_inflation_factor=csv_inflation_factor, + shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions, ) ctx.daft_execution_config = new_daft_execution_config diff --git a/daft/daft.pyi b/daft/daft.pyi index 6078417d49..f98d4c0588 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1158,9 +1158,7 @@ class LogicalPlanBuilder: def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: ... def distinct(self) -> LogicalPlanBuilder: ... def sample(self, fraction: float, with_replacement: bool, seed: int | None) -> LogicalPlanBuilder: ... - def aggregate( - self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr], shuffle_aggregation_default_partitions: int - ) -> LogicalPlanBuilder: ... + def aggregate(self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr]) -> LogicalPlanBuilder: ... def join( self, right: LogicalPlanBuilder, @@ -1199,6 +1197,7 @@ class PyDaftExecutionConfig: parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> PyDaftExecutionConfig: ... @property def scan_tasks_min_size_bytes(self) -> int: ... @@ -1222,17 +1221,16 @@ class PyDaftExecutionConfig: def csv_target_filesize(self) -> int: ... @property def csv_inflation_factor(self) -> float: ... + @property + def shuffle_aggregation_default_partitions(self) -> int: ... class PyDaftPlanningConfig: def with_config_values( self, default_io_config: IOConfig | None = None, - shuffle_aggregation_default_partitions: int | None = None, ) -> PyDaftPlanningConfig: ... @property def default_io_config(self) -> IOConfig: ... - @property - def shuffle_aggregation_default_partitions(self) -> int: ... def build_type() -> str: ... def version() -> str: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 4bbf16bb87..a370d998ed 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -949,22 +949,12 @@ def _agg( exprs_to_agg: List[Tuple[Expression, str]] = list( zip(self.__column_input_to_expression([c for c, _ in to_agg]), [op for _, op in to_agg]) ) - shuffle_aggregation_default_partitions = ( - get_context().daft_planning_config.shuffle_aggregation_default_partitions - ) - builder = self._builder.agg( - exprs_to_agg, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions - ) + builder = self._builder.agg(exprs_to_agg, list(group_by) if group_by is not None else None) return DataFrame(builder) def _map_groups(self, udf: Expression, group_by: Optional[ExpressionsProjection] = None) -> "DataFrame": - shuffle_aggregation_default_partitions = ( - get_context().daft_planning_config.shuffle_aggregation_default_partitions - ) - builder = self._builder.map_groups( - udf, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions - ) + builder = self._builder.map_groups(udf, list(group_by) if group_by is not None else None) return DataFrame(builder) @DataframePublicAPI diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 1aababff0b..f5ad0451aa 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -128,7 +128,7 @@ def explode(self, explode_expressions: list[Expression]) -> LogicalPlanBuilder: def count(self) -> LogicalPlanBuilder: # TODO(Clark): Add dedicated logical/physical ops when introducing metadata-based count optimizations. first_col = col(self.schema().column_names()[0]) - builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], [], 1) + builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], []) builder = builder.project([first_col.alias("count")._expr], ResourceRequest()) return LogicalPlanBuilder(builder) @@ -158,7 +158,6 @@ def agg( self, to_agg: list[tuple[Expression, str]], group_by: list[Expression] | None, - shuffle_aggregation_default_partitions: int, ) -> LogicalPlanBuilder: exprs = [] for expr, op in to_agg: @@ -180,16 +179,12 @@ def agg( raise NotImplementedError(f"Aggregation {op} is not implemented.") group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else [] - builder = self._builder.aggregate( - [expr._expr for expr in exprs], group_by_pyexprs, shuffle_aggregation_default_partitions - ) + builder = self._builder.aggregate([expr._expr for expr in exprs], group_by_pyexprs) return LogicalPlanBuilder(builder) - def map_groups( - self, udf: Expression, group_by: list[Expression] | None, shuffle_aggregation_default_partitions: int - ) -> LogicalPlanBuilder: + def map_groups(self, udf: Expression, group_by: list[Expression] | None) -> LogicalPlanBuilder: group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else [] - builder = self._builder.aggregate([udf._expr], group_by_pyexprs, shuffle_aggregation_default_partitions) + builder = self._builder.aggregate([udf._expr], group_by_pyexprs) return LogicalPlanBuilder(builder) def join( # type: ignore[override] diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 30c890f167..a3dd531d3a 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -6,19 +6,9 @@ use serde::{Deserialize, Serialize}; /// 1. Creation of a Dataframe including any file listing and schema inference that needs to happen. Note /// that this does not include the actual scan, which is taken care of by the DaftExecutionConfig. /// 2. Building of logical plan nodes -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, Default)] pub struct DaftPlanningConfig { pub default_io_config: IOConfig, - pub shuffle_aggregation_default_partitions: usize, -} - -impl Default for DaftPlanningConfig { - fn default() -> Self { - DaftPlanningConfig { - default_io_config: IOConfig::default(), - shuffle_aggregation_default_partitions: 200, - } - } } /// Configurations for Daft to use during the execution of a Dataframe @@ -44,6 +34,7 @@ pub struct DaftExecutionConfig { pub parquet_inflation_factor: f64, pub csv_target_filesize: usize, pub csv_inflation_factor: f64, + pub shuffle_aggregation_default_partitions: usize, } impl Default for DaftExecutionConfig { @@ -61,6 +52,7 @@ impl Default for DaftExecutionConfig { parquet_inflation_factor: 3.0, csv_target_filesize: 512 * 1024 * 1024, // 512MB csv_inflation_factor: 0.5, + shuffle_aggregation_default_partitions: 200, } } } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index f6e9463cfe..8d2124c7ba 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -22,7 +22,6 @@ impl PyDaftPlanningConfig { fn with_config_values( &mut self, default_io_config: Option, - shuffle_aggregation_default_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -30,11 +29,6 @@ impl PyDaftPlanningConfig { config.default_io_config = default_io_config.config; } - if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions - { - config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions; - } - Ok(PyDaftPlanningConfig { config: Arc::new(config), }) @@ -47,11 +41,6 @@ impl PyDaftPlanningConfig { }) } - #[getter(shuffle_aggregation_default_partitions)] - fn shuffle_aggregation_default_partitions(&self) -> PyResult { - Ok(self.config.shuffle_aggregation_default_partitions) - } - fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftPlanningConfig should be serializable to bytes"); @@ -100,6 +89,7 @@ impl PyDaftExecutionConfig { parquet_inflation_factor: Option, csv_target_filesize: Option, csv_inflation_factor: Option, + shuffle_aggregation_default_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -142,6 +132,10 @@ impl PyDaftExecutionConfig { if let Some(csv_inflation_factor) = csv_inflation_factor { config.csv_inflation_factor = csv_inflation_factor; } + if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions + { + config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions; + } Ok(PyDaftExecutionConfig { config: Arc::new(config), @@ -203,6 +197,11 @@ impl PyDaftExecutionConfig { Ok(self.config.csv_inflation_factor) } + #[getter] + fn get_shuffle_aggregation_default_partitions(&self) -> PyResult { + Ok(self.config.shuffle_aggregation_default_partitions) + } + fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftExecutionConfig should be serializable to bytes"); diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 5ef1a35f9e..30a84c9c82 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -190,12 +190,7 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } - pub fn aggregate( - &self, - agg_exprs: Vec, - groupby_exprs: Vec, - shuffle_aggregation_default_partitions: usize, - ) -> DaftResult { + pub fn aggregate(&self, agg_exprs: Vec, groupby_exprs: Vec) -> DaftResult { let agg_exprs = agg_exprs .iter() .map(|expr| match expr { @@ -210,13 +205,8 @@ impl LogicalPlanBuilder { }) .collect::>>()?; - let logical_plan: LogicalPlan = logical_ops::Aggregate::try_new( - self.plan.clone(), - agg_exprs, - groupby_exprs, - shuffle_aggregation_default_partitions, - )? - .into(); + let logical_plan: LogicalPlan = + logical_ops::Aggregate::try_new(self.plan.clone(), agg_exprs, groupby_exprs)?.into(); Ok(logical_plan.into()) } @@ -420,12 +410,7 @@ impl PyLogicalPlanBuilder { .into()) } - pub fn aggregate( - &self, - agg_exprs: Vec, - groupby_exprs: Vec, - shuffle_aggregation_default_partitions: usize, - ) -> PyResult { + pub fn aggregate(&self, agg_exprs: Vec, groupby_exprs: Vec) -> PyResult { let agg_exprs = agg_exprs .iter() .map(|expr| expr.clone().into()) @@ -434,14 +419,7 @@ impl PyLogicalPlanBuilder { .iter() .map(|expr| expr.clone().into()) .collect::>(); - Ok(self - .builder - .aggregate( - agg_exprs, - groupby_exprs, - shuffle_aggregation_default_partitions, - )? - .into()) + Ok(self.builder.aggregate(agg_exprs, groupby_exprs)?.into()) } pub fn join( diff --git a/src/daft-plan/src/logical_ops/agg.rs b/src/daft-plan/src/logical_ops/agg.rs index 9e2e58fbbe..f9b6ca21f1 100644 --- a/src/daft-plan/src/logical_ops/agg.rs +++ b/src/daft-plan/src/logical_ops/agg.rs @@ -21,8 +21,6 @@ pub struct Aggregate { pub groupby: Vec, pub output_schema: SchemaRef, - - pub shuffle_aggregation_default_partitions: usize, } impl Aggregate { @@ -30,7 +28,6 @@ impl Aggregate { input: Arc, aggregations: Vec, groupby: Vec, - shuffle_aggregation_default_partitions: usize, ) -> logical_plan::Result { let output_schema = { let upstream_schema = input.schema(); @@ -48,7 +45,6 @@ impl Aggregate { groupby, output_schema, input, - shuffle_aggregation_default_partitions, }) } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 44a7f8c29d..5db0c5fee7 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -161,7 +161,7 @@ impl LogicalPlan { Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()), Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()), Self::Distinct(_) => Self::Distinct(Distinct::new(input.clone())), - Self::Aggregate(Aggregate { aggregations, groupby, shuffle_aggregation_default_partitions,..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone(),*shuffle_aggregation_default_partitions).unwrap()), + Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone()).unwrap()), Self::Sink(Sink { sink_info, .. }) => Self::Sink(Sink::try_new(input.clone(), sink_info.clone()).unwrap()), _ => panic!("Logical op {} has two inputs, but got one", self), }, diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index 54d4e31592..afcce3f2b3 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -228,7 +228,6 @@ impl PushDownProjection { aggregate.input.clone(), pruned_aggregate_exprs, aggregate.groupby.clone(), - aggregate.shuffle_aggregation_default_partitions, )? .into(); @@ -502,7 +501,6 @@ impl OptimizerRule for PushDownProjection { mod tests { use std::sync::Arc; - use common_daft_config::DaftPlanningConfig; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit}; @@ -668,11 +666,7 @@ mod tests { Field::new("b", DataType::Int64), Field::new("c", DataType::Int64), ]) - .aggregate( - vec![col("a").mean(), col("b").mean()], - vec![col("c")], - DaftPlanningConfig::default().shuffle_aggregation_default_partitions, - )? + .aggregate(vec![col("a").mean(), col("b").mean()], vec![col("c")])? .project(vec![col("a")], Default::default())? .build(); diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 3f17396a7d..5e55477ea8 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -316,7 +316,6 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe aggregations, groupby, input, - shuffle_aggregation_default_partitions, .. }) => { use daft_dsl::AggExpr::{self, *}; @@ -503,7 +502,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe first_stage_agg.into(), min( num_input_partitions, - *shuffle_aggregation_default_partitions, + cfg.shuffle_aggregation_default_partitions, ), groupby.clone(), )); @@ -767,7 +766,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe #[cfg(test)] mod tests { - use common_daft_config::{DaftExecutionConfig, DaftPlanningConfig}; + use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit, AggExpr, Expr}; @@ -865,7 +864,6 @@ mod tests { .aggregate( vec![Expr::Agg(AggExpr::Sum(col("a").into()))], vec![col("b")], - DaftPlanningConfig::default().shuffle_aggregation_default_partitions, )? .repartition(Some(10), vec![col("b")], PartitionScheme::Hash)? .build(); diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index aacbc01605..846b534c66 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -368,12 +368,12 @@ def test_groupby_agg_pyobjects(): assert res["list"] == [[objects[0], objects[2], objects[4]], [objects[1], objects[3]]] -@pytest.mark.parametrize("shuffle_aggregation_default_partitions", [None, 201]) +@pytest.mark.parametrize("shuffle_aggregation_default_partitions", [None, 20]) def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_default_partitions): if shuffle_aggregation_default_partitions is None: - min_partitions = get_context().daft_planning_config.shuffle_aggregation_default_partitions + min_partitions = get_context().daft_execution_config.shuffle_aggregation_default_partitions else: - daft.set_planning_config(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions) + daft.set_execution_config(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions) min_partitions = shuffle_aggregation_default_partitions for partition_size in [1, min_partitions, min_partitions + 1]: From 5e5a69c5b96c88d450d0a553f7b8d2b5902eee8c Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 13 Feb 2024 10:21:34 +0800 Subject: [PATCH 4/4] fix comment --- daft/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daft/context.py b/daft/context.py index 7c099217a2..c0f936f990 100644 --- a/daft/context.py +++ b/daft/context.py @@ -244,7 +244,7 @@ def set_execution_config( parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0 csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 - shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 10, unless the number of input partitions is less than 10. + shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200. """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context()