From 4ad4f90d86c57226a4e0fb1f79dfaaf0d404c273 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 16 Apr 2024 15:31:17 -0700 Subject: [PATCH] Adding TPCH benchmarks for Sort Merge Join (#10092) * Adding TPCH bencmarks for Sort Merge Join * Update benchmarks/bench.sh Co-authored-by: Andy Grove * fix benches * fmt * comments --------- Co-authored-by: Andy Grove --- benchmarks/bench.sh | 21 +++++++++++++++++++++ benchmarks/src/tpch/run.rs | 15 +++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index a72400892752..088edc56dfb0 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -213,6 +213,12 @@ main() { tpch_mem10) run_tpch_mem "10" ;; + tpch_smj) + run_tpch_smj "1" + ;; + tpch_smj10) + run_tpch_smj "10" + ;; parquet) run_parquet ;; @@ -320,6 +326,21 @@ run_tpch() { $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE} } +# Runs the tpch benchmark with sort merge join +run_tpch_smj() { + SCALE_FACTOR=$1 + if [ -z "$SCALE_FACTOR" ] ; then + echo "Internal error: Scale factor not specified" + exit 1 + fi + TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}" + + RESULTS_FILE="${RESULTS_DIR}/tpch_smj_sf${SCALE_FACTOR}.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running tpch SMJ benchmark..." + $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join false --format parquet -o ${RESULTS_FILE} +} + # Runs the tpch in memory run_tpch_mem() { SCALE_FACTOR=$1 diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 564a2f05b6fe..f2a93d2ea549 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -42,6 +42,9 @@ use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION}; use log::info; use structopt::StructOpt; +// hack to avoid `default_value is meaningless for bool` errors +type BoolDefaultTrue = bool; + /// Run the tpch benchmark. /// /// This benchmarks is derived from the [TPC-H][1] version @@ -81,6 +84,11 @@ pub struct RunOpt { /// Whether to disable collection of statistics (and cost based optimizations) or not. #[structopt(short = "S", long = "disable-statistics")] disable_statistics: bool, + + /// If true then hash join used, if false then sort merge join + /// True by default. + #[structopt(short = "j", long = "prefer_hash_join", default_value = "true")] + prefer_hash_join: BoolDefaultTrue, } const TPCH_QUERY_START_ID: usize = 1; @@ -107,10 +115,11 @@ impl RunOpt { } async fn benchmark_query(&self, query_id: usize) -> Result> { - let config = self + let mut config = self .common .config() .with_collect_statistics(!self.disable_statistics); + config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; let ctx = SessionContext::new_with_config(config); // register tables @@ -304,7 +313,7 @@ mod tests { use super::*; use datafusion::common::exec_err; - use datafusion::error::{DataFusionError, Result}; + use datafusion::error::Result; use datafusion_proto::bytes::{ logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes, physical_plan_to_bytes, @@ -339,6 +348,7 @@ mod tests { mem_table: false, output_path: None, disable_statistics: false, + prefer_hash_join: true, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?; @@ -371,6 +381,7 @@ mod tests { mem_table: false, output_path: None, disable_statistics: false, + prefer_hash_join: true, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?;