Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Set min_partitions for post aggregation shuffles #1861

Merged
merged 5 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def set_execution_config(
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
aggregation_min_partitions: int | None = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rename this to shuffle_aggregation_default_partitions and the default value should be 200 to mimic spark. Also this should be in the planning config!

) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values
are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`)
Expand Down Expand Up @@ -243,6 +244,7 @@ def set_execution_config(
parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
aggregation_min_partitions: Minimum number of partitions to create when performing aggregations. Defaults to 10, unless the number of input partitions is less than 10.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand All @@ -260,6 +262,7 @@ def set_execution_config(
parquet_inflation_factor=parquet_inflation_factor,
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
aggregation_min_partitions=aggregation_min_partitions,
)

ctx.daft_execution_config = new_daft_execution_config
Expand Down
3 changes: 3 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ class PyDaftExecutionConfig:
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
aggregation_min_partitions: int | None = None,
) -> PyDaftExecutionConfig: ...
@property
def scan_tasks_min_size_bytes(self) -> int: ...
Expand All @@ -1220,6 +1221,8 @@ class PyDaftExecutionConfig:
def csv_target_filesize(self) -> int: ...
@property
def csv_inflation_factor(self) -> float: ...
@property
def aggregation_min_partitions(self) -> int: ...

class PyDaftPlanningConfig:
def with_config_values(
Expand Down
2 changes: 2 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct DaftExecutionConfig {
pub parquet_inflation_factor: f64,
pub csv_target_filesize: usize,
pub csv_inflation_factor: f64,
pub aggregation_min_partitions: usize,
}

impl Default for DaftExecutionConfig {
Expand All @@ -51,6 +52,7 @@ impl Default for DaftExecutionConfig {
parquet_inflation_factor: 3.0,
csv_target_filesize: 512 * 1024 * 1024, // 512MB
csv_inflation_factor: 0.5,
aggregation_min_partitions: 10,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this too small?

}
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl PyDaftExecutionConfig {
parquet_inflation_factor: Option<f64>,
csv_target_filesize: Option<usize>,
csv_inflation_factor: Option<f64>,
aggregation_min_partitions: Option<usize>,
) -> PyResult<PyDaftExecutionConfig> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -131,6 +132,9 @@ impl PyDaftExecutionConfig {
if let Some(csv_inflation_factor) = csv_inflation_factor {
config.csv_inflation_factor = csv_inflation_factor;
}
if let Some(aggregation_min_partitions) = aggregation_min_partitions {
config.aggregation_min_partitions = aggregation_min_partitions;
}

Ok(PyDaftExecutionConfig {
config: Arc::new(config),
Expand Down Expand Up @@ -192,6 +196,11 @@ impl PyDaftExecutionConfig {
Ok(self.config.csv_inflation_factor)
}

#[getter]
fn get_aggregation_min_partitions(&self) -> PyResult<usize> {
Ok(self.config.aggregation_min_partitions)
}

fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec<u8>,))> {
let bin_data = bincode::serialize(self.config.as_ref())
.expect("DaftExecutionConfig should be serializable to bytes");
Expand Down
9 changes: 6 additions & 3 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::cmp::Ordering;
use std::sync::Arc;
use std::{cmp::max, collections::HashMap};
use std::{
cmp::{max, min},
collections::HashMap,
};

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
Expand Down Expand Up @@ -317,7 +320,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftExecutionConfig>) -> DaftRe
}) => {
use daft_dsl::AggExpr::{self, *};
use daft_dsl::Expr::Column;
let input_plan = plan(input, cfg)?;
let input_plan = plan(input, cfg.clone())?;

let num_input_partitions = input_plan.partition_spec().num_partitions;

Expand Down Expand Up @@ -497,7 +500,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftExecutionConfig>) -> DaftRe
} else {
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
first_stage_agg.into(),
num_input_partitions,
min(num_input_partitions, cfg.aggregation_min_partitions),
groupby.clone(),
));
PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))
Expand Down
28 changes: 28 additions & 0 deletions tests/dataframe/test_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import daft
from daft import col
from daft.context import get_context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from daft.utils import freeze
Expand Down Expand Up @@ -365,3 +366,30 @@ def test_groupby_agg_pyobjects():
assert res["groups"] == [1, 2]
assert res["count"] == [2, 1]
assert res["list"] == [[objects[0], objects[2], objects[4]], [objects[1], objects[3]]]


@pytest.mark.parametrize("aggregation_min_partitions", [None, 20])
def test_groupby_result_partitions_smaller_than_input(aggregation_min_partitions):
if aggregation_min_partitions is None:
min_partitions = get_context().daft_execution_config.aggregation_min_partitions
else:
daft.set_execution_config(aggregation_min_partitions=aggregation_min_partitions)
min_partitions = aggregation_min_partitions

for partition_size in [1, min_partitions, min_partitions + 1]:
df = daft.from_pydict(
{"group": [i for i in range(min_partitions + 1)], "value": [i for i in range(min_partitions + 1)]}
)
df = df.into_partitions(partition_size)

df = df.groupby(col("group")).agg(
[
(col("value").alias("sum"), "sum"),
(col("value").alias("mean"), "mean"),
(col("value").alias("min"), "min"),
]
)

df = df.collect()

assert df.num_partitions() == min(min_partitions, partition_size)
Loading