Skip to content

Commit

Permalink
[PERF] Improve hash table probe side decisions for Swordfish (#3327)
Browse files Browse the repository at this point in the history
This PR lifts statistics into optimized logical plans so that they're
available for local execution plans. It then uses these newly available
statistics to make better decisions on whether to build the probe table
of a hash join on the left or right side.

## Benchmark results

For TPC-H, this gives us some notable speedups with Q5, Q8, and Q19.
- Q5: ~2.2x (in memory), ~1.5 (parquet)
- Q8: ~5.9x (in memory), ~2.2x (parquet)
- Q19:  ~7x (in memory), ~4.5x (parquet)

Crucially, with this change, our native runner is now faster (or within
some small deviation) than our previous python runner for all 22 TPC-H
queries.

For more detailed results, we have:

### Q5

#### Before
```
--------------------------------------------------------------------------------- benchmark 'q5-parts-1': 4 tests ---------------------------------------------------------------------------------
Name (time in ms)                       Min                Max               Mean            StdDev             Median               IQR            Outliers      OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_tpch[1-in-memory-python-5]     28.7213 (1.0)      30.3664 (1.0)      29.4645 (1.0)      0.3849 (1.0)      29.4894 (1.0)      0.5529 (1.0)          10;0  33.9391 (1.0)          33           1
test_tpch[1-in-memory-native-5]     30.9980 (1.08)     34.1489 (1.12)     32.2151 (1.09)     0.7150 (1.86)     32.2903 (1.09)     1.1586 (2.10)         10;0  31.0413 (0.91)         31           1
test_tpch[1-parquet-python-5]       48.8010 (1.70)     51.8535 (1.71)     50.0985 (1.70)     0.8342 (2.17)     50.0400 (1.70)     1.4193 (2.57)          9;0  19.9607 (0.59)         20           1
test_tpch[1-parquet-native-5]       51.1122 (1.78)     54.0755 (1.78)     52.3799 (1.78)     0.8317 (2.16)     52.4268 (1.78)     1.2526 (2.27)          8;0  19.0913 (0.56)         20           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
```
#### After
```
--------------------------------------------------------------------------------- benchmark 'q5-parts-1': 4 tests ---------------------------------------------------------------------------------
Name (time in ms)                       Min                Max               Mean            StdDev             Median               IQR            Outliers      OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_tpch[1-in-memory-native-5]     13.8393 (1.0)      16.2712 (1.0)      14.4352 (1.0)      0.5322 (2.13)     14.3212 (1.0)      0.5084 (1.71)          9;5  69.2750 (1.0)          62           1
test_tpch[1-in-memory-python-5]     28.3915 (2.05)     29.5304 (1.81)     28.8477 (2.00)     0.2501 (1.0)      28.8299 (2.01)     0.2971 (1.0)          10;1  34.6649 (0.50)         35           1
test_tpch[1-parquet-native-5]       34.3952 (2.49)     36.4230 (2.24)     35.4314 (2.45)     0.4543 (1.82)     35.3678 (2.47)     0.5042 (1.70)          8;1  28.2235 (0.41)         28           1
test_tpch[1-parquet-python-5]       55.7339 (4.03)     57.3051 (3.52)     56.4564 (3.91)     0.5101 (2.04)     56.2507 (3.93)     0.7570 (2.55)          4;0  17.7128 (0.26)         18           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
```

### Q8

#### Before
```
--------------------------------------------------------------------------------- benchmark 'q8-parts-1': 4 tests ---------------------------------------------------------------------------------
Name (time in ms)                       Min                Max               Mean            StdDev             Median               IQR            Outliers      OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_tpch[1-in-memory-python-8]     14.9532 (1.0)      17.2992 (1.0)      16.1906 (1.0)      0.5962 (1.22)     16.1370 (1.0)      0.8323 (1.13)         23;0  61.7642 (1.0)          60           1
test_tpch[1-parquet-python-8]       34.7310 (2.32)     52.5183 (3.04)     37.1660 (2.30)     3.2354 (6.61)     36.5454 (2.26)     2.2469 (3.04)          1;1  26.9063 (0.44)         28           1
test_tpch[1-in-memory-native-8]     44.0259 (2.94)     46.0576 (2.66)     45.0905 (2.78)     0.4898 (1.0)      45.0528 (2.79)     0.7380 (1.0)           5;0  22.1776 (0.36)         22           1
test_tpch[1-parquet-native-8]       69.8245 (4.67)     73.1332 (4.23)     71.0333 (4.39)     0.8421 (1.72)     70.8515 (4.39)     0.9827 (1.33)          3;1  14.0779 (0.23)         14           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
```

#### After

```
--------------------------------------------------------------------------------- benchmark 'q8-parts-1': 4 tests ----------------------------------------------------------------------------------
Name (time in ms)                       Min                Max               Mean            StdDev             Median               IQR            Outliers       OPS            Rounds  Iterations
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_tpch[1-in-memory-native-8]      7.2145 (1.0)       8.5953 (1.0)       7.7980 (1.0)      0.3170 (1.0)       7.8074 (1.0)      0.4959 (1.13)         44;0  128.2373 (1.0)         118           1
test_tpch[1-in-memory-python-8]     15.3458 (2.13)     17.6777 (2.06)     16.3485 (2.10)     0.5217 (1.65)     16.5234 (2.12)     0.7077 (1.62)         17;0   61.1678 (0.48)         57           1
test_tpch[1-parquet-native-8]       31.8017 (4.41)     33.5568 (3.90)     32.4955 (4.17)     0.4402 (1.39)     32.4145 (4.15)     0.4376 (1.0)           9;2   30.7735 (0.24)         30           1
test_tpch[1-parquet-python-8]       46.2703 (6.41)     48.3345 (5.62)     47.6753 (6.11)     0.5142 (1.62)     47.7092 (6.11)     0.4509 (1.03)          6;2   20.9752 (0.16)         21           1
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
```

### Q19

#### Before
```
------------------------------------------------------------------------------------ benchmark 'q19-parts-1': 4 tests -----------------------------------------------------------------------------------
Name (time in ms)                         Min                 Max                Mean             StdDev              Median                IQR            Outliers     OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_tpch[1-in-memory-python-19]     277.9533 (1.0)      281.5647 (1.0)      279.8556 (1.0)       1.4607 (1.0)      279.4186 (1.0)       2.2466 (1.0)           2;0  3.5733 (1.0)           5           1
test_tpch[1-parquet-python-19]       311.1196 (1.12)     317.8069 (1.13)     315.6200 (1.13)      2.6611 (1.82)     316.7545 (1.13)      2.8849 (1.28)          1;0  3.1684 (0.89)          5           1
test_tpch[1-in-memory-native-19]     431.2738 (1.55)     464.2194 (1.65)     442.1488 (1.58)     13.3136 (9.11)     436.8320 (1.56)     16.3197 (7.26)          1;0  2.2617 (0.63)          5           1
test_tpch[1-parquet-native-19]       455.3492 (1.64)     460.8460 (1.64)     458.0333 (1.64)      2.1169 (1.45)     457.4410 (1.64)      3.0005 (1.34)          2;0  2.1832 (0.61)          5           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
```

#### After
```
------------------------------------------------------------------------------------ benchmark 'q19-parts-1': 4 tests -----------------------------------------------------------------------------------
Name (time in ms)                         Min                 Max                Mean             StdDev              Median               IQR            Outliers      OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_tpch[1-in-memory-native-19]      62.6100 (1.0)       71.2192 (1.0)       66.0757 (1.0)       2.5634 (1.0)       65.6719 (1.0)      1.5398 (1.0)           6;4  15.1342 (1.0)          15           1
test_tpch[1-parquet-native-19]        94.8984 (1.52)     134.7584 (1.89)     103.1099 (1.56)     12.5712 (4.90)      97.7583 (1.49)     7.3370 (4.76)          1;1   9.6984 (0.64)          9           1
test_tpch[1-in-memory-python-19]     284.6653 (4.55)     295.5558 (4.15)     289.7268 (4.38)      3.9986 (1.56)     288.6982 (4.40)     4.6399 (3.01)          2;0   3.4515 (0.23)          5           1
test_tpch[1-parquet-python-19]       308.9599 (4.93)     319.0440 (4.48)     314.7801 (4.76)      4.2088 (1.64)     315.3274 (4.80)     7.0198 (4.56)          2;0   3.1768 (0.21)          5           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
```

---------

Co-authored-by: Sammy Sidhu <[email protected]>
  • Loading branch information
desmondcheongzx and samster25 authored Nov 28, 2024
1 parent 4031637 commit f2d4f73
Show file tree
Hide file tree
Showing 61 changed files with 1,556 additions and 416 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,10 @@ def explain(
return None

def num_partitions(self) -> int:
daft_execution_config = get_context().daft_execution_config
# We need to run the optimizer since that could change the number of partitions
return self.__builder.optimize().to_physical_plan_scheduler(daft_execution_config).num_partitions()
return (
self.__builder.optimize().to_physical_plan_scheduler(get_context().daft_execution_config).num_partitions()
)

@DataframePublicAPI
def schema(self) -> Schema:
Expand Down
2 changes: 1 addition & 1 deletion src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl DaftPlanningConfig {
/// 3. Task generation from physical plan
/// 4. Task scheduling
/// 5. Task local execution
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DaftExecutionConfig {
pub scan_tasks_min_size_bytes: usize,
pub scan_tasks_max_size_bytes: usize,
Expand Down
34 changes: 29 additions & 5 deletions src/common/scan-info/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod scan_operator;
mod scan_task;
pub mod test;

use std::{fmt::Debug, hash::Hash};
use std::{fmt::Debug, hash::Hash, sync::Arc};

use daft_schema::schema::SchemaRef;
pub use expr_rewriter::{rewrite_predicate_for_partitioning, PredicateGroups};
Expand All @@ -19,11 +19,35 @@ pub use pushdowns::Pushdowns;
#[cfg(feature = "python")]
pub use python::register_modules;
pub use scan_operator::{ScanOperator, ScanOperatorRef};
pub use scan_task::{BoxScanTaskLikeIter, ScanTaskLike, ScanTaskLikeRef};
pub use scan_task::{ScanTaskLike, ScanTaskLikeRef, SPLIT_AND_MERGE_PASS};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ScanState {
Operator(ScanOperatorRef),
Tasks(Arc<Vec<ScanTaskLikeRef>>),
}

impl ScanState {
pub fn multiline_display(&self) -> Vec<String> {
match self {
Self::Operator(scan_op) => scan_op.0.multiline_display(),
Self::Tasks(scan_tasks) => {
vec![format!("Num Scan Tasks = {}", scan_tasks.len())]
}
}
}

pub fn get_scan_op(&self) -> &ScanOperatorRef {
match self {
Self::Operator(scan_op) => scan_op,
Self::Tasks(_) => panic!("Tried to get scan op from materialized physical scan info"),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PhysicalScanInfo {
pub scan_op: ScanOperatorRef,
pub scan_state: ScanState,
pub source_schema: SchemaRef,
pub partitioning_keys: Vec<PartitionField>,
pub pushdowns: Pushdowns,
Expand All @@ -38,7 +62,7 @@ impl PhysicalScanInfo {
pushdowns: Pushdowns,
) -> Self {
Self {
scan_op,
scan_state: ScanState::Operator(scan_op),
source_schema,
partitioning_keys,
pushdowns,
Expand All @@ -48,7 +72,7 @@ impl PhysicalScanInfo {
#[must_use]
pub fn with_pushdowns(&self, pushdowns: Pushdowns) -> Self {
Self {
scan_op: self.scan_op.clone(),
scan_state: self.scan_state.clone(),
source_schema: self.source_schema.clone(),
partitioning_keys: self.partitioning_keys.clone(),
pushdowns,
Expand Down
7 changes: 1 addition & 6 deletions src/common/scan-info/src/scan_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
sync::Arc,
};

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use daft_schema::schema::SchemaRef;

Expand Down Expand Up @@ -33,11 +32,7 @@ pub trait ScanOperator: Send + Sync + Debug {

/// If cfg provided, `to_scan_tasks` should apply the appropriate transformations
/// (merging, splitting) to the outputted scan tasks
fn to_scan_tasks(
&self,
pushdowns: Pushdowns,
config: Option<&DaftExecutionConfig>,
) -> DaftResult<Vec<ScanTaskLikeRef>>;
fn to_scan_tasks(&self, pushdowns: Pushdowns) -> DaftResult<Vec<ScanTaskLikeRef>>;
}

impl Display for dyn ScanOperator {
Expand Down
27 changes: 25 additions & 2 deletions src/common/scan-info/src/scan_task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{any::Any, fmt::Debug, sync::Arc};
use std::{
any::Any,
fmt::Debug,
hash::{Hash, Hasher},
sync::{Arc, OnceLock},
};

use common_daft_config::DaftExecutionConfig;
use common_display::DisplayAs;
Expand All @@ -13,6 +18,7 @@ pub trait ScanTaskLike: Debug + DisplayAs + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
fn dyn_eq(&self, other: &dyn ScanTaskLike) -> bool;
fn dyn_hash(&self, state: &mut dyn Hasher);
#[must_use]
fn materialized_schema(&self) -> SchemaRef;
#[must_use]
Expand All @@ -35,10 +41,27 @@ pub trait ScanTaskLike: Debug + DisplayAs + Send + Sync {

pub type ScanTaskLikeRef = Arc<dyn ScanTaskLike>;

impl Eq for dyn ScanTaskLike + '_ {}

impl PartialEq for dyn ScanTaskLike + '_ {
fn eq(&self, other: &Self) -> bool {
self.dyn_eq(other)
}
}

pub type BoxScanTaskLikeIter = Box<dyn Iterator<Item = DaftResult<Arc<dyn ScanTaskLike>>>>;
impl Hash for dyn ScanTaskLike + '_ {
fn hash<H: Hasher>(&self, state: &mut H) {
self.dyn_hash(state);
}
}

// Forward declare splitting and merging pass so that scan tasks can be split and merged
// with common/scan-info without importing daft-scan.
pub type SplitAndMergePass = dyn Fn(
Arc<Vec<ScanTaskLikeRef>>,
&Pushdowns,
&DaftExecutionConfig,
) -> DaftResult<Arc<Vec<ScanTaskLikeRef>>>
+ Sync
+ Send;
pub static SPLIT_AND_MERGE_PASS: OnceLock<&SplitAndMergePass> = OnceLock::new();
18 changes: 11 additions & 7 deletions src/common/scan-info/src/test/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{any::Any, sync::Arc};
use std::{
any::Any,
hash::{Hash, Hasher},
sync::Arc,
};

use common_daft_config::DaftExecutionConfig;
use common_display::DisplayAs;
Expand All @@ -9,7 +13,7 @@ use serde::{Deserialize, Serialize};

use crate::{PartitionField, Pushdowns, ScanOperator, ScanTaskLike, ScanTaskLikeRef};

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Hash)]
struct DummyScanTask {
pub schema: SchemaRef,
pub pushdowns: Pushdowns,
Expand Down Expand Up @@ -38,6 +42,10 @@ impl ScanTaskLike for DummyScanTask {
.map_or(false, |a| a == self)
}

fn dyn_hash(&self, mut state: &mut dyn Hasher) {
self.hash(&mut state);
}

fn materialized_schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down Expand Up @@ -121,11 +129,7 @@ impl ScanOperator for DummyScanOperator {
vec!["DummyScanOperator".to_string()]
}

fn to_scan_tasks(
&self,
pushdowns: Pushdowns,
_: Option<&DaftExecutionConfig>,
) -> DaftResult<Vec<ScanTaskLikeRef>> {
fn to_scan_tasks(&self, pushdowns: Pushdowns) -> DaftResult<Vec<ScanTaskLikeRef>> {
let scan_task = Arc::new(DummyScanTask {
schema: self.schema.clone(),
pushdowns,
Expand Down
12 changes: 6 additions & 6 deletions src/daft-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,21 @@ mod tests {
])
.unwrap(),
);
LogicalPlan::Source(Source {
output_schema: schema.clone(),
source_info: Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
LogicalPlan::Source(Source::new(
schema.clone(),
Arc::new(SourceInfo::PlaceHolder(PlaceHolderInfo {
source_schema: schema,
clustering_spec: Arc::new(ClusteringSpec::unknown()),
source_id: 0,
})),
})
))
.arced()
}

#[test]
fn test_register_and_unregister_named_table() {
let mut catalog = DaftMetaCatalog::new_from_env();
let plan = LogicalPlanBuilder::new(mock_plan(), None);
let plan = LogicalPlanBuilder::from(mock_plan());

// Register a table
assert!(catalog
Expand All @@ -198,7 +198,7 @@ mod tests {
#[test]
fn test_read_registered_table() {
let mut catalog = DaftMetaCatalog::new_from_env();
let plan = LogicalPlanBuilder::new(mock_plan(), None);
let plan = LogicalPlanBuilder::from(mock_plan());

catalog.register_named_table("test_table", plan).unwrap();

Expand Down
6 changes: 3 additions & 3 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, future::ready};
use std::{collections::HashMap, future::ready, sync::Arc};

use common_daft_config::DaftExecutionConfig;
use daft_local_execution::NativeExecutor;
Expand Down Expand Up @@ -33,10 +33,10 @@ impl Session {
let execution_fut = async {
let plan = translation::to_logical_plan(command)?;
let optimized_plan = plan.optimize()?;
let cfg = DaftExecutionConfig::default();
let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor
.run(HashMap::new(), cfg.into(), None)?
.run(HashMap::new(), cfg, None)?
.into_stream();

while let Some(result) = result_stream.next().await {
Expand Down
51 changes: 44 additions & 7 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use daft_local_plan::{
Limit, LocalPhysicalPlan, MonotonicallyIncreasingId, PhysicalWrite, Pivot, Project, Sample,
Sort, UnGroupedAggregate, Unpivot,
};
use daft_logical_plan::JoinType;
use daft_logical_plan::{stats::StatsState, JoinType};
use daft_micropartition::MicroPartition;
use daft_physical_plan::{extract_agg_expr, populate_aggregation_stages};
use daft_scan::ScanTaskRef;
Expand Down Expand Up @@ -319,18 +319,54 @@ pub fn physical_plan_to_pipeline(
null_equals_null,
join_type,
schema,
..
}) => {
let left_schema = left.schema();
let right_schema = right.schema();

// Determine the build and probe sides based on the join type
// Currently it is a naive determination, in the future we should leverage the cardinality of the tables
// to determine the build and probe sides
// To determine whether to use the left or right side of a join for building a probe table, we consider:
// 1. Cardinality of the sides. Probe tables should be built on the smaller side.
// 2. Join type. Different join types have different requirements for which side can build the probe table.
let left_stats_state = left.get_stats_state();
let right_stats_state = right.get_stats_state();
let build_on_left = match (left_stats_state, right_stats_state) {
(StatsState::Materialized(left_stats), StatsState::Materialized(right_stats)) => {
left_stats.approx_stats.upper_bound_bytes
<= right_stats.approx_stats.upper_bound_bytes
}
// If stats are only available on the right side of the join, and the upper bound bytes on the
// right are under the broadcast join size threshold, we build on the right instead of the left.
(StatsState::NotMaterialized, StatsState::Materialized(right_stats)) => right_stats
.approx_stats
.upper_bound_bytes
.map_or(true, |size| size > cfg.broadcast_join_size_bytes_threshold),
// If stats are not available, we fall back and build on the left by default.
_ => true,
};

// TODO(desmond): We might potentially want to flip the probe table side for
// left/right outer joins if one side is significantly larger. Needs to be tuned.
//
// In greater detail, consider a right outer join where the left side is several orders
// of magnitude larger than the right. An extreme example might have 1B rows on the left,
// and 10 rows on the right.
//
// Typically we would build the probe table on the left, then stream rows from the right
// to match against the probe table. But in this case we would have a giant intermediate
// probe table.
//
// An alternative 2-pass algorithm would be to:
// 1. Build the probe table on the right, but add a second data structure to keep track of
// which rows on the right have been matched.
// 2. Stream rows on the left until all rows have been seen.
// 3. Finally, emit all unmatched rows from the right.
let build_on_left = match join_type {
JoinType::Inner => true,
JoinType::Right => true,
JoinType::Outer => true,
JoinType::Inner => build_on_left,
JoinType::Outer => build_on_left,
// For left outer joins, we build on right so we can stream the left side.
JoinType::Left => false,
// For right outer joins, we build on left so we can stream the right side.
JoinType::Right => true,
JoinType::Anti | JoinType::Semi => false,
};
let (build_on, probe_on, build_child, probe_child) = match build_on_left {
Expand Down Expand Up @@ -421,6 +457,7 @@ pub fn physical_plan_to_pipeline(
left_schema,
right_schema,
*join_type,
build_on_left,
common_join_keys,
schema,
probe_state_bridge,
Expand Down
Loading

0 comments on commit f2d4f73

Please sign in to comment.