From 8ad7fda98e1c182b200f7184531883e23c213359 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Wed, 11 Oct 2023 15:28:06 -0700 Subject: [PATCH] [PERF] Update default max_connections 64->8 because it is now per-io-thread (#1485) Updates default max_connections value from 64 to 8 Also renames `max_connections` in internal APIs to `max_connections_per_io_thread` to be more explicit, but keeps naming for external-facing APIs for backwards compatibility Note that the total number of connections being spawned for PyRunner is: `8.min(Num CPUs) * max_connections`, and theses are shared throughout the multithreaded backend The total number of connections being spawned for RayRunner after #1484 is: `num_ray_workers * 1 (sine we run single-threaded) * max_connections` --------- Co-authored-by: Jay Chia --- daft/io/_parquet.py | 6 +++++- src/common/io-config/src/python.rs | 5 +++-- src/common/io-config/src/s3.rs | 6 +++--- src/daft-io/src/s3_like.rs | 2 +- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index 0228cc1d0a..8a824867d0 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -26,6 +26,7 @@ def read_parquet( fs: Optional[fsspec.AbstractFileSystem] = None, io_config: Optional["IOConfig"] = None, use_native_downloader: bool = False, + _multithreaded_io: Optional[bool] = None, ) -> DataFrame: """Creates a DataFrame from Parquet file(s) @@ -44,6 +45,9 @@ def read_parquet( io_config (IOConfig): Config to be used with the native downloader use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This is currently experimental. + _multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing + the amount of system resources (number of connections and thread contention) when running in the Ray runner. + Defaults to None, which will let Daft decide based on the runner it is currently using. returns: DataFrame: parsed DataFrame @@ -54,7 +58,7 @@ def read_parquet( # If running on Ray, we want to limit the amount of concurrency and requests being made. # This is because each Ray worker process receives its own pool of thread workers and connections - multithreaded_io = not context.get_context().is_ray_runner + multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io file_format_config = FileFormatConfig.from_parquet_config( ParquetSourceConfig( diff --git a/src/common/io-config/src/python.rs b/src/common/io-config/src/python.rs index 91aed62b28..ccea7f34b1 100644 --- a/src/common/io-config/src/python.rs +++ b/src/common/io-config/src/python.rs @@ -163,7 +163,8 @@ impl S3Config { key_id: key_id.or(def.key_id), session_token: session_token.or(def.session_token), access_key: access_key.or(def.access_key), - max_connections: max_connections.unwrap_or(def.max_connections), + max_connections_per_io_thread: max_connections + .unwrap_or(def.max_connections_per_io_thread), retry_initial_backoff_ms: retry_initial_backoff_ms .unwrap_or(def.retry_initial_backoff_ms), connect_timeout_ms: connect_timeout_ms.unwrap_or(def.connect_timeout_ms), @@ -214,7 +215,7 @@ impl S3Config { /// AWS max connections per IO thread #[getter] pub fn max_connections(&self) -> PyResult { - Ok(self.config.max_connections) + Ok(self.config.max_connections_per_io_thread) } /// AWS Retry Initial Backoff Time in Milliseconds diff --git a/src/common/io-config/src/s3.rs b/src/common/io-config/src/s3.rs index f7ab25095c..cd35c3f49c 100644 --- a/src/common/io-config/src/s3.rs +++ b/src/common/io-config/src/s3.rs @@ -11,7 +11,7 @@ pub struct S3Config { pub key_id: Option, pub session_token: Option, pub access_key: Option, - pub max_connections: u32, + pub max_connections_per_io_thread: u32, pub retry_initial_backoff_ms: u64, pub connect_timeout_ms: u64, pub read_timeout_ms: u64, @@ -30,7 +30,7 @@ impl Default for S3Config { key_id: None, session_token: None, access_key: None, - max_connections: 64, + max_connections_per_io_thread: 8, retry_initial_backoff_ms: 1000, connect_timeout_ms: 10_000, read_timeout_ms: 10_000, @@ -68,7 +68,7 @@ impl Display for S3Config { self.session_token, self.access_key, self.retry_initial_backoff_ms, - self.max_connections, + self.max_connections_per_io_thread, self.connect_timeout_ms, self.read_timeout_ms, self.num_tries, diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index 46b9831712..4638077eb4 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -311,7 +311,7 @@ async fn build_client(config: &S3Config) -> super::Result { Ok(S3LikeSource { region_to_client_map: tokio::sync::RwLock::new(client_map), connection_pool_sema: Arc::new(tokio::sync::Semaphore::new( - (config.max_connections as usize) + (config.max_connections_per_io_thread as usize) * get_io_pool_num_threads() .await .expect("Should be running in tokio pool"),