Skip to content

Commit

Permalink
use plannign config
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-ho committed Feb 13, 2024
1 parent 6211135 commit f314f39
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 40 deletions.
6 changes: 3 additions & 3 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext:
def set_planning_config(
config: PyDaftPlanningConfig | None = None,
default_io_config: IOConfig | None = None,
shuffle_aggregation_default_partitions: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control Daft plan construction behavior. These configuration values
are used when a Dataframe is being constructed (e.g. calls to create a Dataframe, or to build on an existing Dataframe)
Expand All @@ -189,12 +190,14 @@ def set_planning_config(
that the old (current) config should be used.
default_io_config: A default IOConfig to use in the absence of one being explicitly passed into any Expression (e.g. `.url.download()`)
or Dataframe operation (e.g. `daft.read_parquet()`).
shuffle_aggregation_default_partitions: Default number of partitions to use when shuffling data during aggregation operations. Defaults to 200, unless the number of input partitions is lower.
"""
# Replace values in the DaftPlanningConfig with user-specified overrides
ctx = get_context()
old_daft_planning_config = ctx.daft_planning_config if config is None else config
new_daft_planning_config = old_daft_planning_config.with_config_values(
default_io_config=default_io_config,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
)

ctx.daft_planning_config = new_daft_planning_config
Expand All @@ -215,7 +218,6 @@ def set_execution_config(
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
aggregation_min_partitions: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values
are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`)
Expand Down Expand Up @@ -244,7 +246,6 @@ def set_execution_config(
parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
aggregation_min_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 10, unless the number of input partitions is less than 10.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand All @@ -262,7 +263,6 @@ def set_execution_config(
parquet_inflation_factor=parquet_inflation_factor,
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
aggregation_min_partitions=aggregation_min_partitions,
)

ctx.daft_execution_config = new_daft_execution_config
Expand Down
10 changes: 6 additions & 4 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,9 @@ class LogicalPlanBuilder:
def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: ...
def distinct(self) -> LogicalPlanBuilder: ...
def sample(self, fraction: float, with_replacement: bool, seed: int | None) -> LogicalPlanBuilder: ...
def aggregate(self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr]) -> LogicalPlanBuilder: ...
def aggregate(
self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr], shuffle_aggregation_default_partitions: int
) -> LogicalPlanBuilder: ...
def join(
self,
right: LogicalPlanBuilder,
Expand Down Expand Up @@ -1197,7 +1199,6 @@ class PyDaftExecutionConfig:
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
aggregation_min_partitions: int | None = None,
) -> PyDaftExecutionConfig: ...
@property
def scan_tasks_min_size_bytes(self) -> int: ...
Expand All @@ -1221,16 +1222,17 @@ class PyDaftExecutionConfig:
def csv_target_filesize(self) -> int: ...
@property
def csv_inflation_factor(self) -> float: ...
@property
def aggregation_min_partitions(self) -> int: ...

class PyDaftPlanningConfig:
def with_config_values(
self,
default_io_config: IOConfig | None = None,
shuffle_aggregation_default_partitions: int | None = None,
) -> PyDaftPlanningConfig: ...
@property
def default_io_config(self) -> IOConfig: ...
@property
def shuffle_aggregation_default_partitions(self) -> int: ...

def build_type() -> str: ...
def version() -> str: ...
Expand Down
14 changes: 12 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,12 +949,22 @@ def _agg(
exprs_to_agg: List[Tuple[Expression, str]] = list(
zip(self.__column_input_to_expression([c for c, _ in to_agg]), [op for _, op in to_agg])
)
shuffle_aggregation_default_partitions = (
get_context().daft_planning_config.shuffle_aggregation_default_partitions
)

builder = self._builder.agg(exprs_to_agg, list(group_by) if group_by is not None else None)
builder = self._builder.agg(
exprs_to_agg, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions
)
return DataFrame(builder)

def _map_groups(self, udf: Expression, group_by: Optional[ExpressionsProjection] = None) -> "DataFrame":
builder = self._builder.map_groups(udf, list(group_by) if group_by is not None else None)
shuffle_aggregation_default_partitions = (
get_context().daft_planning_config.shuffle_aggregation_default_partitions
)
builder = self._builder.map_groups(
udf, list(group_by) if group_by is not None else None, shuffle_aggregation_default_partitions
)
return DataFrame(builder)

@DataframePublicAPI
Expand Down
13 changes: 9 additions & 4 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def explode(self, explode_expressions: list[Expression]) -> LogicalPlanBuilder:
def count(self) -> LogicalPlanBuilder:
# TODO(Clark): Add dedicated logical/physical ops when introducing metadata-based count optimizations.
first_col = col(self.schema().column_names()[0])
builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], [])
builder = self._builder.aggregate([first_col._count(CountMode.All)._expr], [], 1)
builder = builder.project([first_col.alias("count")._expr], ResourceRequest())
return LogicalPlanBuilder(builder)

Expand Down Expand Up @@ -158,6 +158,7 @@ def agg(
self,
to_agg: list[tuple[Expression, str]],
group_by: list[Expression] | None,
shuffle_aggregation_default_partitions: int,
) -> LogicalPlanBuilder:
exprs = []
for expr, op in to_agg:
Expand All @@ -179,12 +180,16 @@ def agg(
raise NotImplementedError(f"Aggregation {op} is not implemented.")

group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else []
builder = self._builder.aggregate([expr._expr for expr in exprs], group_by_pyexprs)
builder = self._builder.aggregate(
[expr._expr for expr in exprs], group_by_pyexprs, shuffle_aggregation_default_partitions
)
return LogicalPlanBuilder(builder)

def map_groups(self, udf: Expression, group_by: list[Expression] | None) -> LogicalPlanBuilder:
def map_groups(
self, udf: Expression, group_by: list[Expression] | None, shuffle_aggregation_default_partitions: int
) -> LogicalPlanBuilder:
group_by_pyexprs = [expr._expr for expr in group_by] if group_by is not None else []
builder = self._builder.aggregate([udf._expr], group_by_pyexprs)
builder = self._builder.aggregate([udf._expr], group_by_pyexprs, shuffle_aggregation_default_partitions)
return LogicalPlanBuilder(builder)

def join( # type: ignore[override]
Expand Down
14 changes: 11 additions & 3 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,19 @@ use serde::{Deserialize, Serialize};
/// 1. Creation of a Dataframe including any file listing and schema inference that needs to happen. Note
/// that this does not include the actual scan, which is taken care of by the DaftExecutionConfig.
/// 2. Building of logical plan nodes
#[derive(Clone, Serialize, Deserialize, Default)]
#[derive(Clone, Serialize, Deserialize)]
pub struct DaftPlanningConfig {
pub default_io_config: IOConfig,
pub shuffle_aggregation_default_partitions: usize,
}

impl Default for DaftPlanningConfig {
fn default() -> Self {
DaftPlanningConfig {
default_io_config: IOConfig::default(),
shuffle_aggregation_default_partitions: 200,
}
}
}

/// Configurations for Daft to use during the execution of a Dataframe
Expand All @@ -34,7 +44,6 @@ pub struct DaftExecutionConfig {
pub parquet_inflation_factor: f64,
pub csv_target_filesize: usize,
pub csv_inflation_factor: f64,
pub aggregation_min_partitions: usize,
}

impl Default for DaftExecutionConfig {
Expand All @@ -52,7 +61,6 @@ impl Default for DaftExecutionConfig {
parquet_inflation_factor: 3.0,
csv_target_filesize: 512 * 1024 * 1024, // 512MB
csv_inflation_factor: 0.5,
aggregation_min_partitions: 10,
}
}
}
Expand Down
20 changes: 11 additions & 9 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ impl PyDaftPlanningConfig {
fn with_config_values(
&mut self,
default_io_config: Option<PyIOConfig>,
shuffle_aggregation_default_partitions: Option<usize>,
) -> PyResult<PyDaftPlanningConfig> {
let mut config = self.config.as_ref().clone();

if let Some(default_io_config) = default_io_config {
config.default_io_config = default_io_config.config;
}

if let Some(shuffle_aggregation_default_partitions) = shuffle_aggregation_default_partitions
{
config.shuffle_aggregation_default_partitions = shuffle_aggregation_default_partitions;
}

Ok(PyDaftPlanningConfig {
config: Arc::new(config),
})
Expand All @@ -41,6 +47,11 @@ impl PyDaftPlanningConfig {
})
}

#[getter(shuffle_aggregation_default_partitions)]
fn shuffle_aggregation_default_partitions(&self) -> PyResult<usize> {
Ok(self.config.shuffle_aggregation_default_partitions)
}

fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec<u8>,))> {
let bin_data = bincode::serialize(self.config.as_ref())
.expect("DaftPlanningConfig should be serializable to bytes");
Expand Down Expand Up @@ -89,7 +100,6 @@ impl PyDaftExecutionConfig {
parquet_inflation_factor: Option<f64>,
csv_target_filesize: Option<usize>,
csv_inflation_factor: Option<f64>,
aggregation_min_partitions: Option<usize>,
) -> PyResult<PyDaftExecutionConfig> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -132,9 +142,6 @@ impl PyDaftExecutionConfig {
if let Some(csv_inflation_factor) = csv_inflation_factor {
config.csv_inflation_factor = csv_inflation_factor;
}
if let Some(aggregation_min_partitions) = aggregation_min_partitions {
config.aggregation_min_partitions = aggregation_min_partitions;
}

Ok(PyDaftExecutionConfig {
config: Arc::new(config),
Expand Down Expand Up @@ -196,11 +203,6 @@ impl PyDaftExecutionConfig {
Ok(self.config.csv_inflation_factor)
}

#[getter]
fn get_aggregation_min_partitions(&self) -> PyResult<usize> {
Ok(self.config.aggregation_min_partitions)
}

fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec<u8>,))> {
let bin_data = bincode::serialize(self.config.as_ref())
.expect("DaftExecutionConfig should be serializable to bytes");
Expand Down
32 changes: 27 additions & 5 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,12 @@ impl LogicalPlanBuilder {
Ok(logical_plan.into())
}

pub fn aggregate(&self, agg_exprs: Vec<Expr>, groupby_exprs: Vec<Expr>) -> DaftResult<Self> {
pub fn aggregate(
&self,
agg_exprs: Vec<Expr>,
groupby_exprs: Vec<Expr>,
shuffle_aggregation_default_partitions: usize,
) -> DaftResult<Self> {
let agg_exprs = agg_exprs
.iter()
.map(|expr| match expr {
Expand All @@ -205,8 +210,13 @@ impl LogicalPlanBuilder {
})
.collect::<DaftResult<Vec<daft_dsl::AggExpr>>>()?;

let logical_plan: LogicalPlan =
logical_ops::Aggregate::try_new(self.plan.clone(), agg_exprs, groupby_exprs)?.into();
let logical_plan: LogicalPlan = logical_ops::Aggregate::try_new(
self.plan.clone(),
agg_exprs,
groupby_exprs,
shuffle_aggregation_default_partitions,
)?
.into();
Ok(logical_plan.into())
}

Expand Down Expand Up @@ -410,7 +420,12 @@ impl PyLogicalPlanBuilder {
.into())
}

pub fn aggregate(&self, agg_exprs: Vec<PyExpr>, groupby_exprs: Vec<PyExpr>) -> PyResult<Self> {
pub fn aggregate(
&self,
agg_exprs: Vec<PyExpr>,
groupby_exprs: Vec<PyExpr>,
shuffle_aggregation_default_partitions: usize,
) -> PyResult<Self> {
let agg_exprs = agg_exprs
.iter()
.map(|expr| expr.clone().into())
Expand All @@ -419,7 +434,14 @@ impl PyLogicalPlanBuilder {
.iter()
.map(|expr| expr.clone().into())
.collect::<Vec<Expr>>();
Ok(self.builder.aggregate(agg_exprs, groupby_exprs)?.into())
Ok(self
.builder
.aggregate(
agg_exprs,
groupby_exprs,
shuffle_aggregation_default_partitions,
)?
.into())
}

pub fn join(
Expand Down
4 changes: 4 additions & 0 deletions src/daft-plan/src/logical_ops/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ pub struct Aggregate {
pub groupby: Vec<Expr>,

pub output_schema: SchemaRef,

pub shuffle_aggregation_default_partitions: usize,
}

impl Aggregate {
pub(crate) fn try_new(
input: Arc<LogicalPlan>,
aggregations: Vec<AggExpr>,
groupby: Vec<Expr>,
shuffle_aggregation_default_partitions: usize,
) -> logical_plan::Result<Self> {
let output_schema = {
let upstream_schema = input.schema();
Expand All @@ -45,6 +48,7 @@ impl Aggregate {
groupby,
output_schema,
input,
shuffle_aggregation_default_partitions,
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl LogicalPlan {
Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()),
Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()),
Self::Distinct(_) => Self::Distinct(Distinct::new(input.clone())),
Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone()).unwrap()),
Self::Aggregate(Aggregate { aggregations, groupby, shuffle_aggregation_default_partitions,..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone(),*shuffle_aggregation_default_partitions).unwrap()),
Self::Sink(Sink { sink_info, .. }) => Self::Sink(Sink::try_new(input.clone(), sink_info.clone()).unwrap()),
_ => panic!("Logical op {} has two inputs, but got one", self),
},
Expand Down
8 changes: 7 additions & 1 deletion src/daft-plan/src/optimization/rules/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl PushDownProjection {
aggregate.input.clone(),
pruned_aggregate_exprs,
aggregate.groupby.clone(),
aggregate.shuffle_aggregation_default_partitions,
)?
.into();

Expand Down Expand Up @@ -501,6 +502,7 @@ impl OptimizerRule for PushDownProjection {
mod tests {
use std::sync::Arc;

use common_daft_config::DaftPlanningConfig;
use common_error::DaftResult;
use daft_core::{datatypes::Field, DataType};
use daft_dsl::{col, lit};
Expand Down Expand Up @@ -666,7 +668,11 @@ mod tests {
Field::new("b", DataType::Int64),
Field::new("c", DataType::Int64),
])
.aggregate(vec![col("a").mean(), col("b").mean()], vec![col("c")])?
.aggregate(
vec![col("a").mean(), col("b").mean()],
vec![col("c")],
DaftPlanningConfig::default().shuffle_aggregation_default_partitions,
)?
.project(vec![col("a")], Default::default())?
.build();

Expand Down
Loading

0 comments on commit f314f39

Please sign in to comment.