From 151931cfb5a4a6974b98e6f29f721e6c62bf81bf Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 13 Feb 2024 10:19:37 +0800 Subject: [PATCH] back to exec config --- daft/context.py | 6 ++-- daft/daft.pyi | 10 +++--- daft/dataframe/dataframe.py | 14 ++------ daft/logical/builder.py | 13 +++----- src/common/daft-config/src/lib.rs | 14 ++------ src/common/daft-config/src/python.rs | 21 ++++++------ src/daft-plan/src/builder.rs | 32 +++---------------- src/daft-plan/src/logical_ops/agg.rs | 4 --- src/daft-plan/src/logical_plan.rs | 2 +- .../rules/push_down_projection.rs | 8 +---- src/daft-plan/src/planner.rs | 6 ++-- tests/dataframe/test_aggregations.py | 6 ++-- 12 files changed, 38 insertions(+), 98 deletions(-) diff --git a/daft/context.py b/daft/context.py index d73a3f855c..7c099217a2 100644 --- a/daft/context.py +++ b/daft/context.py @@ -180,7 +180,6 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext: def set_planning_config( config: PyDaftPlanningConfig | None = None, default_io_config: IOConfig | None = None, - shuffle_aggregation_default_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control Daft plan construction behavior. These configuration values are used when a Dataframe is being constructed (e.g. calls to create a Dataframe, or to build on an existing Dataframe) @@ -190,14 +189,12 @@ def set_planning_config( that the old (current) config should be used. default_io_config: A default IOConfig to use in the absence of one being explicitly passed into any Expression (e.g. `.url.download()`) or Dataframe operation (e.g. `daft.read_parquet()`). - shuffle_aggregation_default_partitions: Default number of partitions to use when shuffling data during aggregation operations. Defaults to 200, unless the number of input partitions is lower. """ # Replace values in the DaftPlanningConfig with user-specified overrides ctx = get_context() old_daft_planning_config = ctx.daft_planning_config if config is None else config new_daft_planning_config = old_daft_planning_config.with_config_values( default_io_config=default_io_config, - shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions, ) ctx.daft_planning_config = new_daft_planning_config @@ -218,6 +215,7 @@ def set_execution_config( parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`) @@ -246,6 +244,7 @@ def set_execution_config( parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0 csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 + shuffle_aggregation_default_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 10, unless the number of input partitions is less than 10. """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context() @@ -263,6 +262,7 @@ def set_execution_config( parquet_inflation_factor=parquet_inflation_factor, csv_target_filesize=csv_target_filesize, csv_inflation_factor=csv_inflation_factor, + shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions, ) ctx.daft_execution_config = new_daft_execution_config diff --git a/daft/daft.pyi b/daft/daft.pyi index 6078417d49..f98d4c0588 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1158,9 +1158,7 @@ class LogicalPlanBuilder: def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: ... def distinct(self) -> LogicalPlanBuilder: ... def sample(self, fraction: float, with_replacement: bool, seed: int | None) -> LogicalPlanBuilder: ... - def aggregate( - self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr], shuffle_aggregation_default_partitions: int - ) -> LogicalPlanBuilder: ... + def aggregate(self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr]) -> LogicalPlanBuilder: ... def join( self, right: LogicalPlanBuilder, @@ -1199,6 +1197,7 @@ class PyDaftExecutionConfig: parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, + shuffle_aggregation_default_partitions: int | None = None, ) -> PyDaftExecutionConfig: ... @property def scan_tasks_min_size_bytes(self) -> int: ... @@ -1222,17 +1221,16 @@ class PyDaftExecutionConfig: def csv_target_filesize(self) -> int: ... @property def csv_inflation_factor(self) -> float: ... + @property + def shuffle_aggregation_default_partitions(self) -> int: ... class PyDaftPlanningConfig: def with_config_values( self, default_io_config: IOConfig | None = None, - shuffle_aggregation_default_partitions: int | None = None, ) -> PyDaftPlanningConfig: ... @property def default_io_config(self) -> IOConfig: ... - @property - def shuffle_aggregation_default_partitions(self) -> int: ... def build_type() -> str: ... def version() -> str: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 4bbf16bb87..a370d998ed 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -949,22 +949,12 @@ def _agg( exprs_to_agg: List[Tuple[Expression, str]] = list( zip(self.__column_input_to_expression([c for c, _ in to_agg]), [op for _, op in to_agg]) ) - shuffle_aggregation_default_partitions = ( - get_context().daft_planning_config.shuffle_aggregation_default_partitions - ) - builder = self._builder.agg( - exprs_to_agg, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions - ) + builder = self._builder.agg(exprs_to_agg, list(group_by) if group_by is not None else None) return DataFrame(builder) def _map_groups(self, udf: Expression, group_by: Optional[ExpressionsProjection] = None) -> "DataFrame": - shuffle_aggregation_default_partitions = ( - get_context().daft_planning_config.shuffle_aggregation_default_partitions - ) - builder = self._builder.map_groups( - udf, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions - ) + builder = self._builder.map_groups(udf, list(group_by) if group_by is not None else None) return DataFrame(builder) @DataframePublicAPI diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 1aababff0b..f5ad0451aa 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -128,7 +128,7 @@ def explode(self, explode_expressions: list[Expression]) -> LogicalPlanBuilder: def count(self) -> LogicalPlanBuilder: # TODO(Clark): Add dedicated logical/physical ops when introducing metadata-based count optimizations. first_col = col(self.schema().column_names()[0]) - builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], [], 1) + builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], []) builder = builder.project([first_col.alias("count")._expr], ResourceRequest()) return LogicalPlanBuilder(builder) @@ -158,7 +158,6 @@ def agg( self, to_agg: list[tuple[Expression, str]], group_by: list[Expression] | None, - shuffle_aggregation_default_partitions: int, ) -> LogicalPlanBuilder: exprs = [] for expr, op in to_agg: @@ -180,16 +179,12 @@ def agg( raise NotImplementedError(f"Aggregation {op} is not implemented.") group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else [] - builder = self._builder.aggregate( - [expr._expr for expr in exprs], group_by_pyexprs, shuffle_aggregation_default_partitions - ) + builder = self._builder.aggregate([expr._expr for expr in exprs], group_by_pyexprs) return LogicalPlanBuilder(builder) - def map_groups( - self, udf: Expression, group_by: list[Expression] | None, shuffle_aggregation_default_partitions: int - ) -> LogicalPlanBuilder: + def map_groups(self, udf: Expression, group_by: list[Expression] | None) -> LogicalPlanBuilder: group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else [] - builder = self._builder.aggregate([udf._expr], group_by_pyexprs, shuffle_aggregation_default_partitions) + builder = self._builder.aggregate([udf._expr], group_by_pyexprs) return LogicalPlanBuilder(builder) def join( # type: ignore[override] diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 30c890f167..a3dd531d3a 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -6,19 +6,9 @@ use serde::{Deserialize, Serialize}; /// 1. Creation of a Dataframe including any file listing and schema inference that needs to happen. Note /// that this does not include the actual scan, which is taken care of by the DaftExecutionConfig. /// 2. Building of logical plan nodes -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, Default)] pub struct DaftPlanningConfig { pub default_io_config: IOConfig, - pub shuffle_aggregation_default_partitions: usize, -} - -impl Default for DaftPlanningConfig { - fn default() -> Self { - DaftPlanningConfig { - default_io_config: IOConfig::default(), - shuffle_aggregation_default_partitions: 200, - } - } } /// Configurations for Daft to use during the execution of a Dataframe @@ -44,6 +34,7 @@ pub struct DaftExecutionConfig { pub parquet_inflation_factor: f64, pub csv_target_filesize: usize, pub csv_inflation_factor: f64, + pub shuffle_aggregation_default_partitions: usize, } impl Default for DaftExecutionConfig { @@ -61,6 +52,7 @@ impl Default for DaftExecutionConfig { parquet_inflation_factor: 3.0, csv_target_filesize: 512 * 1024 * 1024, // 512MB csv_inflation_factor: 0.5, + shuffle_aggregation_default_partitions: 200, } } } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index f6e9463cfe..8d2124c7ba 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -22,7 +22,6 @@ impl PyDaftPlanningConfig { fn with_config_values( &mut self, default_io_config: Option, - shuffle_aggregation_default_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -30,11 +29,6 @@ impl PyDaftPlanningConfig { config.default_io_config = default_io_config.config; } - if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions - { - config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions; - } - Ok(PyDaftPlanningConfig { config: Arc::new(config), }) @@ -47,11 +41,6 @@ impl PyDaftPlanningConfig { }) } - #[getter(shuffle_aggregation_default_partitions)] - fn shuffle_aggregation_default_partitions(&self) -> PyResult { - Ok(self.config.shuffle_aggregation_default_partitions) - } - fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftPlanningConfig should be serializable to bytes"); @@ -100,6 +89,7 @@ impl PyDaftExecutionConfig { parquet_inflation_factor: Option, csv_target_filesize: Option, csv_inflation_factor: Option, + shuffle_aggregation_default_partitions: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -142,6 +132,10 @@ impl PyDaftExecutionConfig { if let Some(csv_inflation_factor) = csv_inflation_factor { config.csv_inflation_factor = csv_inflation_factor; } + if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions + { + config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions; + } Ok(PyDaftExecutionConfig { config: Arc::new(config), @@ -203,6 +197,11 @@ impl PyDaftExecutionConfig { Ok(self.config.csv_inflation_factor) } + #[getter] + fn get_shuffle_aggregation_default_partitions(&self) -> PyResult { + Ok(self.config.shuffle_aggregation_default_partitions) + } + fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftExecutionConfig should be serializable to bytes"); diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 5ef1a35f9e..30a84c9c82 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -190,12 +190,7 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } - pub fn aggregate( - &self, - agg_exprs: Vec, - groupby_exprs: Vec, - shuffle_aggregation_default_partitions: usize, - ) -> DaftResult { + pub fn aggregate(&self, agg_exprs: Vec, groupby_exprs: Vec) -> DaftResult { let agg_exprs = agg_exprs .iter() .map(|expr| match expr { @@ -210,13 +205,8 @@ impl LogicalPlanBuilder { }) .collect::>>()?; - let logical_plan: LogicalPlan = logical_ops::Aggregate::try_new( - self.plan.clone(), - agg_exprs, - groupby_exprs, - shuffle_aggregation_default_partitions, - )? - .into(); + let logical_plan: LogicalPlan = + logical_ops::Aggregate::try_new(self.plan.clone(), agg_exprs, groupby_exprs)?.into(); Ok(logical_plan.into()) } @@ -420,12 +410,7 @@ impl PyLogicalPlanBuilder { .into()) } - pub fn aggregate( - &self, - agg_exprs: Vec, - groupby_exprs: Vec, - shuffle_aggregation_default_partitions: usize, - ) -> PyResult { + pub fn aggregate(&self, agg_exprs: Vec, groupby_exprs: Vec) -> PyResult { let agg_exprs = agg_exprs .iter() .map(|expr| expr.clone().into()) @@ -434,14 +419,7 @@ impl PyLogicalPlanBuilder { .iter() .map(|expr| expr.clone().into()) .collect::>(); - Ok(self - .builder - .aggregate( - agg_exprs, - groupby_exprs, - shuffle_aggregation_default_partitions, - )? - .into()) + Ok(self.builder.aggregate(agg_exprs, groupby_exprs)?.into()) } pub fn join( diff --git a/src/daft-plan/src/logical_ops/agg.rs b/src/daft-plan/src/logical_ops/agg.rs index 9e2e58fbbe..f9b6ca21f1 100644 --- a/src/daft-plan/src/logical_ops/agg.rs +++ b/src/daft-plan/src/logical_ops/agg.rs @@ -21,8 +21,6 @@ pub struct Aggregate { pub groupby: Vec, pub output_schema: SchemaRef, - - pub shuffle_aggregation_default_partitions: usize, } impl Aggregate { @@ -30,7 +28,6 @@ impl Aggregate { input: Arc, aggregations: Vec, groupby: Vec, - shuffle_aggregation_default_partitions: usize, ) -> logical_plan::Result { let output_schema = { let upstream_schema = input.schema(); @@ -48,7 +45,6 @@ impl Aggregate { groupby, output_schema, input, - shuffle_aggregation_default_partitions, }) } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 44a7f8c29d..5db0c5fee7 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -161,7 +161,7 @@ impl LogicalPlan { Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()), Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()), Self::Distinct(_) => Self::Distinct(Distinct::new(input.clone())), - Self::Aggregate(Aggregate { aggregations, groupby, shuffle_aggregation_default_partitions,..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone(),*shuffle_aggregation_default_partitions).unwrap()), + Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone()).unwrap()), Self::Sink(Sink { sink_info, .. }) => Self::Sink(Sink::try_new(input.clone(), sink_info.clone()).unwrap()), _ => panic!("Logical op {} has two inputs, but got one", self), }, diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index 54d4e31592..afcce3f2b3 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -228,7 +228,6 @@ impl PushDownProjection { aggregate.input.clone(), pruned_aggregate_exprs, aggregate.groupby.clone(), - aggregate.shuffle_aggregation_default_partitions, )? .into(); @@ -502,7 +501,6 @@ impl OptimizerRule for PushDownProjection { mod tests { use std::sync::Arc; - use common_daft_config::DaftPlanningConfig; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit}; @@ -668,11 +666,7 @@ mod tests { Field::new("b", DataType::Int64), Field::new("c", DataType::Int64), ]) - .aggregate( - vec![col("a").mean(), col("b").mean()], - vec![col("c")], - DaftPlanningConfig::default().shuffle_aggregation_default_partitions, - )? + .aggregate(vec![col("a").mean(), col("b").mean()], vec![col("c")])? .project(vec![col("a")], Default::default())? .build(); diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 3f17396a7d..5e55477ea8 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -316,7 +316,6 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe aggregations, groupby, input, - shuffle_aggregation_default_partitions, .. }) => { use daft_dsl::AggExpr::{self, *}; @@ -503,7 +502,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe first_stage_agg.into(), min( num_input_partitions, - *shuffle_aggregation_default_partitions, + cfg.shuffle_aggregation_default_partitions, ), groupby.clone(), )); @@ -767,7 +766,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftRe #[cfg(test)] mod tests { - use common_daft_config::{DaftExecutionConfig, DaftPlanningConfig}; + use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit, AggExpr, Expr}; @@ -865,7 +864,6 @@ mod tests { .aggregate( vec![Expr::Agg(AggExpr::Sum(col("a").into()))], vec![col("b")], - DaftPlanningConfig::default().shuffle_aggregation_default_partitions, )? .repartition(Some(10), vec![col("b")], PartitionScheme::Hash)? .build(); diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index aacbc01605..846b534c66 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -368,12 +368,12 @@ def test_groupby_agg_pyobjects(): assert res["list"] == [[objects[0], objects[2], objects[4]], [objects[1], objects[3]]] -@pytest.mark.parametrize("shuffle_aggregation_default_partitions", [None, 201]) +@pytest.mark.parametrize("shuffle_aggregation_default_partitions", [None, 20]) def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_default_partitions): if shuffle_aggregation_default_partitions is None: - min_partitions = get_context().daft_planning_config.shuffle_aggregation_default_partitions + min_partitions = get_context().daft_execution_config.shuffle_aggregation_default_partitions else: - daft.set_planning_config(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions) + daft.set_execution_config(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions) min_partitions = shuffle_aggregation_default_partitions for partition_size in [1, min_partitions, min_partitions + 1]: