Skip to content

Commit

Permalink
[PERF] Update default max_connections 64->8 because it is now per-io-…
Browse files Browse the repository at this point in the history
…thread (#1485)

Updates default max_connections value from 64 to 8

Also renames `max_connections` in internal APIs to
`max_connections_per_io_thread` to be more explicit, but keeps naming
for external-facing APIs for backwards compatibility

Note that the total number of connections being spawned for PyRunner is:
`8.min(Num CPUs) * max_connections`, and theses are shared throughout
the multithreaded backend

The total number of connections being spawned for RayRunner after #1484
is: `num_ray_workers * 1 (sine we run single-threaded) *
max_connections`

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Oct 11, 2023
1 parent a24e918 commit 8ad7fda
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 7 deletions.
6 changes: 5 additions & 1 deletion daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def read_parquet(
fs: Optional[fsspec.AbstractFileSystem] = None,
io_config: Optional["IOConfig"] = None,
use_native_downloader: bool = False,
_multithreaded_io: Optional[bool] = None,
) -> DataFrame:
"""Creates a DataFrame from Parquet file(s)
Expand All @@ -44,6 +45,9 @@ def read_parquet(
io_config (IOConfig): Config to be used with the native downloader
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
the amount of system resources (number of connections and thread contention) when running in the Ray runner.
Defaults to None, which will let Daft decide based on the runner it is currently using.
returns:
DataFrame: parsed DataFrame
Expand All @@ -54,7 +58,7 @@ def read_parquet(

# If running on Ray, we want to limit the amount of concurrency and requests being made.
# This is because each Ray worker process receives its own pool of thread workers and connections
multithreaded_io = not context.get_context().is_ray_runner
multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io

file_format_config = FileFormatConfig.from_parquet_config(
ParquetSourceConfig(
Expand Down
5 changes: 3 additions & 2 deletions src/common/io-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ impl S3Config {
key_id: key_id.or(def.key_id),
session_token: session_token.or(def.session_token),
access_key: access_key.or(def.access_key),
max_connections: max_connections.unwrap_or(def.max_connections),
max_connections_per_io_thread: max_connections
.unwrap_or(def.max_connections_per_io_thread),
retry_initial_backoff_ms: retry_initial_backoff_ms
.unwrap_or(def.retry_initial_backoff_ms),
connect_timeout_ms: connect_timeout_ms.unwrap_or(def.connect_timeout_ms),
Expand Down Expand Up @@ -214,7 +215,7 @@ impl S3Config {
/// AWS max connections per IO thread
#[getter]
pub fn max_connections(&self) -> PyResult<u32> {
Ok(self.config.max_connections)
Ok(self.config.max_connections_per_io_thread)
}

/// AWS Retry Initial Backoff Time in Milliseconds
Expand Down
6 changes: 3 additions & 3 deletions src/common/io-config/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct S3Config {
pub key_id: Option<String>,
pub session_token: Option<String>,
pub access_key: Option<String>,
pub max_connections: u32,
pub max_connections_per_io_thread: u32,
pub retry_initial_backoff_ms: u64,
pub connect_timeout_ms: u64,
pub read_timeout_ms: u64,
Expand All @@ -30,7 +30,7 @@ impl Default for S3Config {
key_id: None,
session_token: None,
access_key: None,
max_connections: 64,
max_connections_per_io_thread: 8,
retry_initial_backoff_ms: 1000,
connect_timeout_ms: 10_000,
read_timeout_ms: 10_000,
Expand Down Expand Up @@ -68,7 +68,7 @@ impl Display for S3Config {
self.session_token,
self.access_key,
self.retry_initial_backoff_ms,
self.max_connections,
self.max_connections_per_io_thread,
self.connect_timeout_ms,
self.read_timeout_ms,
self.num_tries,
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/s3_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ async fn build_client(config: &S3Config) -> super::Result<S3LikeSource> {
Ok(S3LikeSource {
region_to_client_map: tokio::sync::RwLock::new(client_map),
connection_pool_sema: Arc::new(tokio::sync::Semaphore::new(
(config.max_connections as usize)
(config.max_connections_per_io_thread as usize)
* get_io_pool_num_threads()
.await
.expect("Should be running in tokio pool"),
Expand Down

0 comments on commit 8ad7fda

Please sign in to comment.