Skip to content

Commit

Permalink
[PERF] Remove upfront buffer allocations for local CSV reader (#3242)
Browse files Browse the repository at this point in the history
The local CSV reader currently makes upfront buffer allocations (80 MiB
for file slabs and 80 MiB for CSV buffers). This unnecessarily blows up
the read time for small CSV files which don't use so many buffers.

Since the local CSV reader allocates additional buffers as needed, we
can remove all upfront allocations without affecting anything else in
the implementation of the reader. This speeds up reads of small files.

At the same time, I benchmarked the performance of the reader against
the test case described in
#3055 and found no consistent
slowdown without upfront comparisons.
  • Loading branch information
desmondcheongzx authored Nov 7, 2024
1 parent 104fbc3 commit 2b71ffb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 23 deletions.
8 changes: 3 additions & 5 deletions src/daft-csv/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,23 +252,21 @@ pub async fn stream_csv_local(
);

// Create CSV buffer pool.
let n_threads: usize = std::thread::available_parallelism()
.unwrap_or(NonZeroUsize::new(2).unwrap())
.into();
let record_buffer_size = (estimated_mean_row_size + estimated_std_row_size).ceil() as usize;
let chunk_size = read_options
.as_ref()
.and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8)))
.unwrap_or(DEFAULT_CSV_BUFFER_SIZE);
let chunk_size_rows = (chunk_size as f64 / record_buffer_size as f64).ceil() as usize;

// TODO(desmond): We might consider creating per-process buffer pools and slab pools.
let buffer_pool = Arc::new(CsvBufferPool::new(
record_buffer_size,
num_fields,
chunk_size_rows,
n_threads * 2,
));
let n_threads: usize = std::thread::available_parallelism()
.unwrap_or(NonZeroUsize::new(2).unwrap())
.into();
stream_csv_as_tables(
file,
buffer_pool,
Expand Down
23 changes: 5 additions & 18 deletions src/daft-csv/src/local/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use parking_lot::{Mutex, RwLock};

// The default size of a slab used for reading CSV files in chunks. Currently set to 4 MiB. This can be tuned.
pub const SLABSIZE: usize = 4 * 1024 * 1024;
// The default number of slabs in a slab pool. With 20 slabs, we reserve a total of 80 MiB of memory for reading file data.
const SLABPOOL_DEFAULT_SIZE: usize = 20;

#[derive(Clone, Debug, Default)]
pub struct CsvSlab(Vec<read::ByteRecord>);
Expand Down Expand Up @@ -53,16 +51,10 @@ pub struct CsvBuffer {
}

impl CsvBufferPool {
pub fn new(
record_size: usize,
num_fields: usize,
num_rows: usize,
initial_pool_size: usize,
) -> Self {
let chunk_buffers =
vec![CsvSlab::new(record_size, num_fields, num_rows); initial_pool_size];
pub fn new(record_size: usize, num_fields: usize, num_rows: usize) -> Self {
Self {
buffers: Mutex::new(chunk_buffers),
// We start off with an empty pool. Buffers will be allocated on demand.
buffers: Mutex::new(vec![]),
record_size,
num_fields,
num_rows,
Expand Down Expand Up @@ -108,14 +100,9 @@ pub struct FileSlabPool {

impl FileSlabPool {
pub fn new() -> Arc<Self> {
let slabs: Vec<RwLock<FileSlabState>> = (0..SLABPOOL_DEFAULT_SIZE)
// We get uninitialized buffers because we will always populate the buffers with a file read before use.
.map(|_| Box::new_uninit_slice(SLABSIZE))
.map(|x| unsafe { x.assume_init() })
.map(|buffer| RwLock::new(FileSlabState::new(buffer, 0)))
.collect();
Arc::new(Self {
slabs: Mutex::new(slabs),
// We start off with an empty pool. Slabs will be allocated on demand.
slabs: Mutex::new(vec![]),
})
}

Expand Down

0 comments on commit 2b71ffb

Please sign in to comment.