Skip to content

Commit

Permalink
refactor: add get_available_parallelism function (apache#13595)
Browse files Browse the repository at this point in the history
Co-authored-by: jonahgao <[email protected]>
  • Loading branch information
alan910127 and jonahgao authored Nov 29, 2024
1 parent 703b10d commit eca0e07
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 77 deletions.
11 changes: 4 additions & 7 deletions benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
//! external_aggr binary entrypoint
use std::collections::HashMap;
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use std::thread::available_parallelism;
use structopt::StructOpt;

use arrow::record_batch::RecordBatch;
Expand All @@ -41,6 +39,7 @@ use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt};
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{exec_datafusion_err, exec_err, DEFAULT_PARQUET_EXTENSION};

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -327,11 +326,9 @@ impl ExternalAggrConfig {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
self.common
.partitions
.unwrap_or(get_available_parallelism())
}

/// Parse memory limit from string to number of bytes
Expand Down
7 changes: 2 additions & 5 deletions benchmarks/src/bin/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ use datafusion::datasource::MemTable;
use datafusion::prelude::CsvReadOptions;
use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext};
use datafusion_benchmarks::util::BenchmarkRun;
use std::num::NonZero;
use datafusion_common::utils::get_available_parallelism;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;
use structopt::StructOpt;
use tokio::time::Instant;

Expand Down Expand Up @@ -93,9 +92,7 @@ async fn group_by(opt: &GroupBy) -> Result<()> {
.with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default())))
.with_schema(Arc::new(schema));
let csv = ListingTable::try_new(listing_config)?;
let partition_size = available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get();
let partition_size = get_available_parallelism();
let memtable =
MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?;
ctx.register_table("x", Arc::new(memtable))?;
Expand Down
11 changes: 4 additions & 7 deletions benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use super::{get_imdb_table_schema, get_query_sql, IMDB_TABLES};
use crate::util::{BenchmarkRun, CommonOpt};
Expand All @@ -37,6 +35,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use log::info;
Expand Down Expand Up @@ -470,11 +469,9 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}

Expand Down
12 changes: 4 additions & 8 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

Expand All @@ -30,7 +28,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::parquet::TestParquetFile;
use datafusion_common::instant::Instant;

use datafusion_common::utils::get_available_parallelism;
use structopt::StructOpt;

/// Test performance of sorting large datasets
Expand Down Expand Up @@ -149,11 +147,9 @@ impl RunOpt {
rundata.start_new_case(title);
for i in 0..self.common.iterations {
let config = SessionConfig::new().with_target_partitions(
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
),
self.common
.partitions
.unwrap_or(get_available_parallelism()),
);
let ctx = SessionContext::new_with_config(config);
let (rows, elapsed) =
Expand Down
11 changes: 4 additions & 7 deletions benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
//! runs end-to-end sort queries and test the performance on multiple CPU cores.
use futures::StreamExt;
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;
use structopt::StructOpt;

use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand All @@ -39,6 +37,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::DEFAULT_PARQUET_EXTENSION;

use crate::util::{BenchmarkRun, CommonOpt};
Expand Down Expand Up @@ -317,10 +316,8 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}
11 changes: 4 additions & 7 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use super::{
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES,
Expand All @@ -39,6 +37,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use log::info;
Expand Down Expand Up @@ -298,11 +297,9 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}

Expand Down
9 changes: 2 additions & 7 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::{num::NonZero, thread::available_parallelism};

use datafusion::prelude::SessionConfig;
use datafusion_common::utils::get_available_parallelism;
use structopt::StructOpt;

// Common benchmark options (don't use doc comments otherwise this doc
Expand Down Expand Up @@ -51,11 +50,7 @@ impl CommonOpt {
pub fn update_config(&self, config: SessionConfig) -> SessionConfig {
config
.with_target_partitions(
self.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
),
self.partitions.unwrap_or(get_available_parallelism()),
)
.with_batch_size(self.batch_size)
}
Expand Down
7 changes: 2 additions & 5 deletions benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
// under the License.

use datafusion::{error::Result, DATAFUSION_VERSION};
use datafusion_common::utils::get_available_parallelism;
use serde::{Serialize, Serializer};
use serde_json::Value;
use std::{
collections::HashMap,
num::NonZero,
path::Path,
thread::available_parallelism,
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -70,9 +69,7 @@ impl RunContext {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
num_cpus: available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
num_cpus: get_available_parallelism(),
start_time: SystemTime::now(),
arguments: std::env::args().skip(1).collect::<Vec<String>>(),
}
Expand Down
7 changes: 3 additions & 4 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Display};
use std::num::NonZero;
use std::str::FromStr;
use std::thread::available_parallelism;

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};

/// A macro that wraps a configuration struct and automatically derives
Expand Down Expand Up @@ -252,7 +251,7 @@ config_namespace! {
/// concurrency.
///
/// Defaults to the number of CPU cores on the system
pub target_partitions: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get()
pub target_partitions: usize, default = get_available_parallelism()

/// The default time zone
///
Expand All @@ -268,7 +267,7 @@ config_namespace! {
/// This is mostly use to plan `UNION` children in parallel.
///
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get()
pub planning_concurrency: usize, default = get_available_parallelism()

/// When set to true, skips verifying that the schema produced by
/// planning the input of `LogicalPlan::Aggregate` exactly matches the
Expand Down
12 changes: 12 additions & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ use sqlparser::parser::Parser;
use std::borrow::{Borrow, Cow};
use std::cmp::{min, Ordering};
use std::collections::HashSet;
use std::num::NonZero;
use std::ops::Range;
use std::sync::Arc;
use std::thread::available_parallelism;

/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
Expand Down Expand Up @@ -761,6 +763,16 @@ pub fn combine_limit(
(combined_skip, combined_fetch)
}

/// Returns the estimated number of threads available for parallel execution.
///
/// This is a wrapper around `std::thread::available_parallelism`, providing a default value
/// of `1` if the system's parallelism cannot be determined.
pub fn get_available_parallelism() -> usize {
available_parallelism()
.unwrap_or(NonZero::new(1).expect("literal value `1` shouldn't be zero"))
.get()
}

#[cfg(test)]
mod tests {
use crate::ScalarValue::Null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use std::{cmp, num::NonZero, sync::Arc, thread::available_parallelism};
use std::{cmp, sync::Arc};

use datafusion::{
datasource::MemTable,
prelude::{SessionConfig, SessionContext},
};
use datafusion_catalog::TableProvider;
use datafusion_common::error::Result;
use datafusion_common::ScalarValue;
use datafusion_common::{error::Result, utils::get_available_parallelism};
use datafusion_expr::col;
use rand::{thread_rng, Rng};

Expand Down Expand Up @@ -73,9 +73,7 @@ impl SessionContextGenerator {
];

let max_batch_size = cmp::max(1, dataset_ref.total_rows_num);
let max_target_partitions = available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get();
let max_target_partitions = get_available_parallelism();

Self {
dataset: dataset_ref,
Expand Down
10 changes: 2 additions & 8 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::sync::Arc;
use std::thread::available_parallelism;

use arrow::{
array::*, datatypes::*, record_batch::RecordBatch,
Expand All @@ -34,6 +32,7 @@ use datafusion::prelude::*;
use datafusion::test_util;
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion::{execution::context::SessionContext, physical_plan::displayable};
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{assert_contains, assert_not_contains};
use object_store::path::Path;
use std::fs::File;
Expand Down Expand Up @@ -261,12 +260,7 @@ impl ExplainNormalizer {

// convert things like partitioning=RoundRobinBatch(16)
// to partitioning=RoundRobinBatch(NUM_CORES)
let needle = format!(
"RoundRobinBatch({})",
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get()
);
let needle = format!("RoundRobinBatch({})", get_available_parallelism());
replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string()));

Self { replacements }
Expand Down
9 changes: 2 additions & 7 deletions datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

use std::ffi::OsStr;
use std::fs;
use std::num::NonZero;
use std::path::{Path, PathBuf};
use std::thread::available_parallelism;

use clap::Parser;
use datafusion_common::utils::get_available_parallelism;
use datafusion_sqllogictest::{DataFusion, TestContext};
use futures::stream::StreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -114,11 +113,7 @@ async fn run_tests() -> Result<()> {
.join()
})
// run up to num_cpus streams in parallel
.buffer_unordered(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
.buffer_unordered(get_available_parallelism())
.flat_map(|result| {
// Filter out any Ok() leaving only the DataFusionErrors
futures::stream::iter(match result {
Expand Down

0 comments on commit eca0e07

Please sign in to comment.