From 1a43fb599e5b6f66875fb1e97a4be812c247a32c Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Thu, 30 Nov 2023 12:28:23 -0800 Subject: [PATCH] Buffer/chunk size tweaks. --- daft/daft.pyi | 1 + daft/table/table.py | 2 ++ src/daft-json/src/python.rs | 4 +++- src/daft-json/src/read.rs | 28 ++++++++++++++++++++-------- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index 708c9e0b36..0128dedafe 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -638,6 +638,7 @@ def read_json( read_options: JsonReadOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, + max_chunks_in_flight: int | None = None, ): ... def read_json_schema( uri: str, diff --git a/daft/table/table.py b/daft/table/table.py index 4fb7203052..6b87c2ee03 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -483,6 +483,7 @@ def read_json( read_options: JsonReadOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, + max_chunks_in_flight: int | None = None, ) -> Table: return Table._from_pytable( _read_json( @@ -492,6 +493,7 @@ def read_json( read_options=read_options, io_config=io_config, multithreaded_io=multithreaded_io, + max_chunks_in_flight=max_chunks_in_flight, ) ) diff --git a/src/daft-json/src/python.rs b/src/daft-json/src/python.rs index 5ec656b520..788cc01bf3 100644 --- a/src/daft-json/src/python.rs +++ b/src/daft-json/src/python.rs @@ -8,6 +8,7 @@ pub mod pylib { use crate::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; + #[allow(clippy::too_many_arguments)] #[pyfunction] pub fn read_json( py: Python, @@ -17,6 +18,7 @@ pub mod pylib { read_options: Option, io_config: Option, multithreaded_io: Option, + max_chunks_in_flight: Option, ) -> PyResult { py.allow_threads(|| { let io_stats = IOStatsContext::new(format!("read_json: for uri {uri}")); @@ -33,7 +35,7 @@ pub mod pylib { io_client, Some(io_stats), multithreaded_io.unwrap_or(true), - None, + max_chunks_in_flight, )? .into()) }) diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index d9f485a119..2e1642f28a 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -221,8 +221,11 @@ async fn read_json_single_into_stream( // Use user-provided buffer size, falling back to 8 * the user-provided chunk size if that exists, otherwise falling back to 512 KiB as the default. let buffer_size = read_options .as_ref() - .and_then(|opt| opt.buffer_size.or_else(|| opt.chunk_size.map(|cs| 8 * cs))) - .unwrap_or(512 * 1024); + .and_then(|opt| { + opt.buffer_size + .or_else(|| opt.chunk_size.map(|cs| (64 * cs).min(256 * 1024 * 1024))) + }) + .unwrap_or(256 * 1024); ( Box::new(BufReader::with_capacity( buffer_size, @@ -231,8 +234,11 @@ async fn read_json_single_into_stream( buffer_size, read_options .as_ref() - .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) - .unwrap_or(64 * 1024), + .and_then(|opt| { + opt.chunk_size + .or_else(|| opt.buffer_size.map(|bs| (bs / 64).max(16))) + }) + .unwrap_or(64), ) } GetResult::Stream(stream, _, _) => ( @@ -240,12 +246,18 @@ async fn read_json_single_into_stream( // Use user-provided buffer size, falling back to 8 * the user-provided chunk size if that exists, otherwise falling back to 512 KiB as the default. read_options .as_ref() - .and_then(|opt| opt.buffer_size.or_else(|| opt.chunk_size.map(|cs| 8 * cs))) - .unwrap_or(512 * 1024), + .and_then(|opt| { + opt.buffer_size + .or_else(|| opt.chunk_size.map(|cs| (256 * cs).min(256 * 1024 * 1024))) + }) + .unwrap_or(8 * 1024 * 1024), read_options .as_ref() - .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) - .unwrap_or(64 * 1024), + .and_then(|opt| { + opt.chunk_size + .or_else(|| opt.buffer_size.map(|bs| (bs / 256).max(16))) + }) + .unwrap_or(64), ), }; // If file is compressed, wrap stream in decoding stream.