diff --git a/daft/context.py b/daft/context.py index 3b36a072c9..d73a3f855c 100644 --- a/daft/context.py +++ b/daft/context.py @@ -180,6 +180,7 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext: def set_planning_config( config: PyDaftPlanningConfig | None = None, default_io_config: IOConfig | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control Daft plan construction behavior. These configuration values are used when a Dataframe is being constructed (e.g. calls to create a Dataframe, or to build on an existing Dataframe) @@ -189,12 +190,14 @@ def set_planning_config( that the old (current) config should be used. default_io_config: A default IOConfig to use in the absence of one being explicitly passed into any Expression (e.g. `.url.download()`) or Dataframe operation (e.g. `daft.read_parquet()`). + shuffle_aggregation_default_partitions: Default number of partitions to use when shuffling data during aggregation operations. Defaults to 200, unless the number of input partitions is lower. """ # Replace values in the DaftPlanningConfig with user-specified overrides ctx = get_context() old_daft_planning_config = ctx.daft_planning_config if config is None else config new_daft_planning_config = old_daft_planning_config.with_config_values( default_io_config=default_io_config, + shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions, ) ctx.daft_planning_config = new_daft_planning_config @@ -215,7 +218,6 @@ def set_execution_config( parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, - aggregation_min_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`) @@ -244,7 +246,6 @@ def set_execution_config( parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0 csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 - aggregation_min_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 10, unless the number of input partitions is less than 10. """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context() @@ -262,7 +263,6 @@ def set_execution_config( parquet_inflation_factor=parquet_inflation_factor, csv_target_filesize=csv_target_filesize, csv_inflation_factor=csv_inflation_factor, - aggregation_min_partitions=aggregation_min_partitions, ) ctx.daft_execution_config = new_daft_execution_config diff --git a/daft/daft.pyi b/daft/daft.pyi index 20aa1cadba..6078417d49 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1158,7 +1158,9 @@ class LogicalPlanBuilder: def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: ... def distinct(self) -> LogicalPlanBuilder: ... def sample(self, fraction: float, with_replacement: bool, seed: int | None) -> LogicalPlanBuilder: ... - def aggregate(self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr]) -> LogicalPlanBuilder: ... + def aggregate( + self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr], shuffle_aggregation_default_partitions: int + ) -> LogicalPlanBuilder: ... def join( self, right: LogicalPlanBuilder, @@ -1197,7 +1199,6 @@ class PyDaftExecutionConfig: parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, - aggregation_min_partitions: int | None = None, ) -> PyDaftExecutionConfig: ... @property def scan_tasks_min_size_bytes(self) -> int: ... @@ -1221,16 +1222,17 @@ class PyDaftExecutionConfig: def csv_target_filesize(self) -> int: ... @property def csv_inflation_factor(self) -> float: ... - @property - def aggregation_min_partitions(self) -> int: ... class PyDaftPlanningConfig: def with_config_values( self, default_io_config: IOConfig | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> PyDaftPlanningConfig: ... @property def default_io_config(self) -> IOConfig: ... + @property + def shuffle_aggregation_default_partitions(self) -> int: ... def build_type() -> str: ... def version() -> str: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index a370d998ed..4bbf16bb87 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -949,12 +949,22 @@ def _agg( exprs_to_agg: List[Tuple[Expression, str]] = list( zip(self.__column_input_to_expression([c for c, _ in to_agg]), [op for _, op in to_agg]) ) + shuffle_aggregation_default_partitions = ( + get_context().daft_planning_config.shuffle_aggregation_default_partitions + ) - builder = self._builder.agg(exprs_to_agg, list(group_by) if group_by is not None else None) + builder = self._builder.agg( + exprs_to_agg, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions + ) return DataFrame(builder) def _map_groups(self, udf: Expression, group_by: Optional[ExpressionsProjection] = None) -> "DataFrame": - builder = self._builder.map_groups(udf, list(group_by) if group_by is not None else None) + shuffle_aggregation_default_partitions = ( + get_context().daft_planning_config.shuffle_aggregation_default_partitions + ) + builder = self._builder.map_groups( + udf, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions + ) return DataFrame(builder) @DataframePublicAPI diff --git a/daft/logical/builder.py b/daft/logical/builder.py index f5ad0451aa..1aababff0b 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -128,7 +128,7 @@ def explode(self, explode_expressions: list[Expression]) -> LogicalPlanBuilder: def count(self) -> LogicalPlanBuilder: # TODO(Clark): Add dedicated logical/physical ops when introducing metadata-based count optimizations. first_col = col(self.schema().column_names()[0]) - builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], []) + builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], [], 1) builder = builder.project([first_col.alias("count")._expr], ResourceRequest()) return LogicalPlanBuilder(builder) @@ -158,6 +158,7 @@ def agg( self, to_agg: list[tuple[Expression, str]], group_by: list[Expression] | None, + shuffle_aggregation_default_partitions: int, ) -> LogicalPlanBuilder: exprs = [] for expr, op in to_agg: @@ -179,12 +180,16 @@ def agg( raise NotImplementedError(f"Aggregation {op} is not implemented.") group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else [] - builder = self._builder.aggregate([expr._expr for expr in exprs], group_by_pyexprs) + builder = self._builder.aggregate( + [expr._expr for expr in exprs], group_by_pyexprs, shuffle_aggregation_default_partitions + ) return LogicalPlanBuilder(builder) - def map_groups(self, udf: Expression, group_by: list[Expression] | None) -> LogicalPlanBuilder: + def map_groups( + self, udf: Expression, group_by: list[Expression] | None, shuffle_aggregation_default_partitions: int + ) -> LogicalPlanBuilder: group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else [] - builder = self._builder.aggregate([udf._expr], group_by_pyexprs) + builder = self._builder.aggregate([udf._expr], group_by_pyexprs, shuffle_aggregation_default_partitions) return LogicalPlanBuilder(builder) def join( # type: ignore[override] diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 85b646172e..30c890f167 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -6,9 +6,19 @@ use serde::{Deserialize, Serialize}; /// 1. Creation of a Dataframe including any file listing and schema inference that needs to happen. Note /// that this does not include the actual scan, which is taken care of by the DaftExecutionConfig. /// 2. Building of logical plan nodes -#[derive(Clone, Serialize, Deserialize, Default)] +#[derive(Clone, Serialize, Deserialize)] pub struct DaftPlanningConfig { pub default_io_config: IOConfig, + pub shuffle_aggregation_default_partitions: usize, +} + +impl Default for DaftPlanningConfig { + fn default() -> Self { + DaftPlanningConfig { + default_io_config: IOConfig::default(), + shuffle_aggregation_default_partitions: 200, + } + } } /// Configurations for Daft to use during the execution of a Dataframe @@ -34,7 +44,6 @@ pub struct DaftExecutionConfig { pub parquet_inflation_factor: f64, pub csv_target_filesize: usize, pub csv_inflation_factor: f64, - pub aggregation_min_partitions: usize, } impl Default for DaftExecutionConfig { @@ -52,7 +61,6 @@ impl Default for DaftExecutionConfig { parquet_inflation_factor: 3.0, csv_target_filesize: 512 * 1024 * 1024, // 512MB csv_inflation_factor: 0.5, - aggregation_min_partitions: 10, } } } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index de3a585df2..f6e9463cfe 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -22,6 +22,7 @@ impl PyDaftPlanningConfig { fn with_config_values( &mut self, default_io_config: Option, + shuffle_aggregation_default_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -29,6 +30,11 @@ impl PyDaftPlanningConfig { config.default_io_config = default_io_config.config; } + if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions + { + config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions; + } + Ok(PyDaftPlanningConfig { config: Arc::new(config), }) @@ -41,6 +47,11 @@ impl PyDaftPlanningConfig { }) } + #[getter(shuffle_aggregation_default_partitions)] + fn shuffle_aggregation_default_partitions(&self) -> PyResult { + Ok(self.config.shuffle_aggregation_default_partitions) + } + fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftPlanningConfig should be serializable to bytes"); @@ -89,7 +100,6 @@ impl PyDaftExecutionConfig { parquet_inflation_factor: Option, csv_target_filesize: Option, csv_inflation_factor: Option, - aggregation_min_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -132,9 +142,6 @@ impl PyDaftExecutionConfig { if let Some(csv_inflation_factor) = csv_inflation_factor { config.csv_inflation_factor = csv_inflation_factor; } - if let Some(aggregation_min_partitions) = aggregation_min_partitions { - config.aggregation_min_partitions = aggregation_min_partitions; - } Ok(PyDaftExecutionConfig { config: Arc::new(config), @@ -196,11 +203,6 @@ impl PyDaftExecutionConfig { Ok(self.config.csv_inflation_factor) } - #[getter] - fn get_aggregation_min_partitions(&self) -> PyResult { - Ok(self.config.aggregation_min_partitions) - } - fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftExecutionConfig should be serializable to bytes"); diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 30a84c9c82..5ef1a35f9e 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -190,7 +190,12 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } - pub fn aggregate(&self, agg_exprs: Vec, groupby_exprs: Vec) -> DaftResult { + pub fn aggregate( + &self, + agg_exprs: Vec, + groupby_exprs: Vec, + shuffle_aggregation_default_partitions: usize, + ) -> DaftResult { let agg_exprs = agg_exprs .iter() .map(|expr| match expr { @@ -205,8 +210,13 @@ impl LogicalPlanBuilder { }) .collect::>>()?; - let logical_plan: LogicalPlan = - logical_ops::Aggregate::try_new(self.plan.clone(), agg_exprs, groupby_exprs)?.into(); + let logical_plan: LogicalPlan = logical_ops::Aggregate::try_new( + self.plan.clone(), + agg_exprs, + groupby_exprs, + shuffle_aggregation_default_partitions, + )? + .into(); Ok(logical_plan.into()) } @@ -410,7 +420,12 @@ impl PyLogicalPlanBuilder { .into()) } - pub fn aggregate(&self, agg_exprs: Vec, groupby_exprs: Vec) -> PyResult { + pub fn aggregate( + &self, + agg_exprs: Vec, + groupby_exprs: Vec, + shuffle_aggregation_default_partitions: usize, + ) -> PyResult { let agg_exprs = agg_exprs .iter() .map(|expr| expr.clone().into()) @@ -419,7 +434,14 @@ impl PyLogicalPlanBuilder { .iter() .map(|expr| expr.clone().into()) .collect::>(); - Ok(self.builder.aggregate(agg_exprs, groupby_exprs)?.into()) + Ok(self + .builder + .aggregate( + agg_exprs, + groupby_exprs, + shuffle_aggregation_default_partitions, + )? + .into()) } pub fn join( diff --git a/src/daft-plan/src/logical_ops/agg.rs b/src/daft-plan/src/logical_ops/agg.rs index f9b6ca21f1..9e2e58fbbe 100644 --- a/src/daft-plan/src/logical_ops/agg.rs +++ b/src/daft-plan/src/logical_ops/agg.rs @@ -21,6 +21,8 @@ pub struct Aggregate { pub groupby: Vec, pub output_schema: SchemaRef, + + pub shuffle_aggregation_default_partitions: usize, } impl Aggregate { @@ -28,6 +30,7 @@ impl Aggregate { input: Arc, aggregations: Vec, groupby: Vec, + shuffle_aggregation_default_partitions: usize, ) -> logical_plan::Result { let output_schema = { let upstream_schema = input.schema(); @@ -45,6 +48,7 @@ impl Aggregate { groupby, output_schema, input, + shuffle_aggregation_default_partitions, }) } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 5db0c5fee7..44a7f8c29d 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -161,7 +161,7 @@ impl LogicalPlan { Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()), Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()), Self::Distinct(_) => Self::Distinct(Distinct::new(input.clone())), - Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone()).unwrap()), + Self::Aggregate(Aggregate { aggregations, groupby, shuffle_aggregation_default_partitions,..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone(),*shuffle_aggregation_default_partitions).unwrap()), Self::Sink(Sink { sink_info, .. }) => Self::Sink(Sink::try_new(input.clone(), sink_info.clone()).unwrap()), _ => panic!("Logical op {} has two inputs, but got one", self), }, diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index afcce3f2b3..54d4e31592 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -228,6 +228,7 @@ impl PushDownProjection { aggregate.input.clone(), pruned_aggregate_exprs, aggregate.groupby.clone(), + aggregate.shuffle_aggregation_default_partitions, )? .into(); @@ -501,6 +502,7 @@ impl OptimizerRule for PushDownProjection { mod tests { use std::sync::Arc; + use common_daft_config::DaftPlanningConfig; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit}; @@ -666,7 +668,11 @@ mod tests { Field::new("b", DataType::Int64), Field::new("c", DataType::Int64), ]) - .aggregate(vec![col("a").mean(), col("b").mean()], vec![col("c")])? + .aggregate( + vec![col("a").mean(), col("b").mean()], + vec![col("c")], + DaftPlanningConfig::default().shuffle_aggregation_default_partitions, + )? .project(vec![col("a")], Default::default())? .build(); diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 50a0e2fe19..3f17396a7d 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -316,6 +316,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe aggregations, groupby, input, + shuffle_aggregation_default_partitions, .. }) => { use daft_dsl::AggExpr::{self, *}; @@ -500,7 +501,10 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe } else { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( first_stage_agg.into(), - min(num_input_partitions, cfg.aggregation_min_partitions), + min( + num_input_partitions, + *shuffle_aggregation_default_partitions, + ), groupby.clone(), )); PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) @@ -763,7 +767,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe #[cfg(test)] mod tests { - use common_daft_config::DaftExecutionConfig; + use common_daft_config::{DaftExecutionConfig, DaftPlanningConfig}; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit, AggExpr, Expr}; @@ -861,6 +865,7 @@ mod tests { .aggregate( vec![Expr::Agg(AggExpr::Sum(col("a").into()))], vec![col("b")], + DaftPlanningConfig::default().shuffle_aggregation_default_partitions, )? .repartition(Some(10), vec![col("b")], PartitionScheme::Hash)? .build(); diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index b04502ae67..aacbc01605 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -368,13 +368,13 @@ def test_groupby_agg_pyobjects(): assert res["list"] == [[objects[0], objects[2], objects[4]], [objects[1], objects[3]]] -@pytest.mark.parametrize("aggregation_min_partitions", [None, 20]) -def test_groupby_result_partitions_smaller_than_input(aggregation_min_partitions): - if aggregation_min_partitions is None: - min_partitions = get_context().daft_execution_config.aggregation_min_partitions +@pytest.mark.parametrize("shuffle_aggregation_default_partitions", [None, 201]) +def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_default_partitions): + if shuffle_aggregation_default_partitions is None: + min_partitions = get_context().daft_planning_config.shuffle_aggregation_default_partitions else: - daft.set_execution_config(aggregation_min_partitions=aggregation_min_partitions) - min_partitions = aggregation_min_partitions + daft.set_planning_config(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions) + min_partitions = shuffle_aggregation_default_partitions for partition_size in [1, min_partitions, min_partitions + 1]: df = daft.from_pydict(