From 1e0022c48293d47ef6992760a1457d0f1a19b047 Mon Sep 17 00:00:00 2001 From: Andrew Gazelka Date: Mon, 7 Oct 2024 17:31:25 -0700 Subject: [PATCH] both are slabs yay (andrew stash) --- src/daft-csv/src/local.rs | 266 ++++++++++++++++++--------------- src/daft-csv/src/local/pool.rs | 23 +-- src/daft-csv/src/read.rs | 64 ++++---- 3 files changed, 186 insertions(+), 167 deletions(-) diff --git a/src/daft-csv/src/local.rs b/src/daft-csv/src/local.rs index 2cdf655d68..87ef7afc9a 100644 --- a/src/daft-csv/src/local.rs +++ b/src/daft-csv/src/local.rs @@ -3,7 +3,7 @@ use std::io::{Chain, Cursor, Read}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{num::NonZeroUsize, sync::Arc, sync::Condvar, sync::Mutex}; -use crate::local::pool::{read_slabs_windowed, FileSlab, SlabPool}; +use crate::local::pool::{read_slabs_windowed, CsvSlabData, FileSlabData, Slab, SlabPool}; use crate::ArrowSnafu; use crate::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use arrow2::{ @@ -59,68 +59,68 @@ use crate::read::{fields_to_projection_indices, tables_concat}; /// │ │ /// └──────────┘ -/// A pool of ByteRecord slabs. Used for deserializing CSV. -#[derive(Debug)] -struct CsvBufferPool { - buffers: Mutex>>, - buffer_size: usize, - record_buffer_size: usize, - num_fields: usize, -} - -/// A slab of ByteRecords. Used for deserializing CSV. -struct CsvBuffer { - pool: Arc, - buffer: Vec, -} - -impl CsvBufferPool { - pub fn new( - record_buffer_size: usize, - num_fields: usize, - chunk_size_rows: usize, - initial_pool_size: usize, - ) -> Self { - let chunk_buffers = vec![ - vec![ - read::ByteRecord::with_capacity(record_buffer_size, num_fields); - chunk_size_rows - ]; - initial_pool_size - ]; - CsvBufferPool { - buffers: Mutex::new(chunk_buffers), - buffer_size: chunk_size_rows, - record_buffer_size, - num_fields, - } - } - - pub fn get_buffer(self: &Arc) -> CsvBuffer { - let mut buffers = self.buffers.lock().unwrap(); - let buffer = buffers.pop(); - let buffer = match buffer { - Some(buffer) => buffer, - None => { - println!("csv buf empty"); - vec![ - read::ByteRecord::with_capacity(self.record_buffer_size, self.num_fields); - self.buffer_size - ] - } - }; - - CsvBuffer { - pool: Arc::clone(self), - buffer, - } - } - - fn return_buffer(&self, buffer: Vec) { - let mut buffers = self.buffers.lock().unwrap(); - buffers.push(buffer); - } -} +// /// A pool of ByteRecord slabs. Used for deserializing CSV. +// #[derive(Debug)] +// struct CsvBufferPool { +// buffers: Mutex>>, +// buffer_size: usize, +// record_buffer_size: usize, +// num_fields: usize, +// } + +// /// A slab of ByteRecords. Used for deserializing CSV. +// struct CsvBuffer { +// pool: Arc, +// buffer: Vec, +// } +// +// impl CsvBufferPool { +// pub fn new( +// record_buffer_size: usize, +// num_fields: usize, +// chunk_size_rows: usize, +// initial_pool_size: usize, +// ) -> Self { +// let chunk_buffers = vec![ +// vec![ +// read::ByteRecord::with_capacity(record_buffer_size, num_fields); +// chunk_size_rows +// ]; +// initial_pool_size +// ]; +// CsvBufferPool { +// buffers: Mutex::new(chunk_buffers), +// buffer_size: chunk_size_rows, +// record_buffer_size, +// num_fields, +// } +// } +// +// pub fn get_buffer(self: &Arc) -> CsvBuffer { +// let mut buffers = self.buffers.lock().unwrap(); +// let buffer = buffers.pop(); +// let buffer = match buffer { +// Some(buffer) => buffer, +// None => { +// println!("csv buf empty"); +// vec![ +// read::ByteRecord::with_capacity(self.record_buffer_size, self.num_fields); +// self.buffer_size +// ] +// } +// }; +// +// CsvBuffer { +// pool: Arc::clone(self), +// buffer, +// } +// } +// +// fn return_buffer(&self, buffer: Vec) { +// let mut buffers = self.buffers.lock().unwrap(); +// buffers.push(buffer); +// } +// } // The default size of a slab used for reading CSV files in chunks. Currently set to 4MB. const SLAB_SIZE: usize = 4 * 1024 * 1024; @@ -141,7 +141,7 @@ pub async fn read_csv_local( read_options, max_chunks_in_flight, ) - .await?; + .await?; tables_concat(tables_stream_collect(Box::pin(stream)).await) } @@ -163,10 +163,13 @@ pub async fn stream_csv_local( parse_options: CsvParseOptions, read_options: Option, max_chunks_in_flight: Option, -) -> DaftResult> + Send> { +) -> DaftResult> + Send> { let uri = uri.trim_start_matches("file://"); let file = tokio::fs::File::open(uri).await?; + + println!("convert_options is NOne? ... {}", convert_options.is_none()); + // TODO(desmond): This logic is repeated multiple times in the csv reader files. Should dedup. let predicate = convert_options .as_ref() @@ -195,7 +198,8 @@ pub async fn stream_csv_local( Some(co) } } - .unwrap_or_default(); + .unwrap_or_default(); + // End of `should dedup`. // TODO(desmond): We should do better schema inference here. @@ -231,22 +235,32 @@ pub async fn stream_csv_local( let chunk_size_rows = (chunk_size as f64 / record_buffer_size as f64).ceil() as usize; let num_fields = schema.fields.len(); // TODO(desmond): We might consider creating per-process buffer pools and slab pools. - let buffer_pool = Arc::new(CsvBufferPool::new( - record_buffer_size, - num_fields, - chunk_size_rows, - n_threads * 2, - )); + + + let initial_pool_size = n_threads * 2; + let csv_slabs = vec![ + vec![ + read::ByteRecord::with_capacity(record_buffer_size, num_fields); + chunk_size_rows + ]; + initial_pool_size + ]; + + + let buffer_pool = SlabPool::new( + csv_slabs, + ); // We suppose that each slab of CSV data produces (chunk size / slab size) number of Daft tables. We // then double this capacity to ensure that our channel is never full and our threads won't deadlock. let (sender, receiver) = crossbeam_channel::bounded(max_chunks_in_flight.unwrap_or(2 * chunk_size / SLAB_SIZE)); - let total_len = file.metadata().unwrap().len() as usize; + let total_len = file.metadata().await.unwrap().len() as usize; let windowed_buffers = read_slabs_windowed(file, vec![vec![0; SLAB_SIZE]; SLAB_POOL_DEFAULT_SIZE]); rayon::spawn(move || { + // todo: await consume_csv_file( buffer_pool, windowed_buffers, @@ -270,8 +284,8 @@ pub async fn stream_csv_local( /// Consumes the CSV file and sends the results to `sender`. #[allow(clippy::too_many_arguments)] async fn consume_csv_file( - buffer_pool: Arc, - window_stream: impl Stream, + mut buffer_pool: SlabPool, + mut window_stream: impl Stream + Unpin, total_len: usize, parse_options: CsvParseOptions, projection_indices: Arc>, @@ -291,8 +305,6 @@ async fn consume_csv_file( let quote_char = parse_options.quote; let double_quote_escape_allowed = parse_options.double_quote; let mut total_bytes_read = 0; - let mut next_slab = None; - let mut next_buffer_len = 0; let mut is_first_buffer = true; loop { let limit_reached = limit.map_or(false, |limit| { @@ -302,20 +314,31 @@ async fn consume_csv_file( if limit_reached { break; } - let window: WindowedSlab = window_stream.next().await?; - let first_buffer = &window[0]; - let second_buffer = window.get(1).map(|slab| &****slab); - - let file_chunk = get_file_chunk( - first_buffer, - second_buffer, - is_first_buffer, - num_fields, - quote_char, - field_delimiter, - escape_char, - double_quote_escape_allowed, - ); + + let Some(window) = window_stream.next().await else { + // todo: probably right (we really think so) + break; + }; + + let first_buffer = window.get(0).unwrap().clone(); + let second_buffer = window.get(1); + + let file_chunk = { + let second_buffer = second_buffer.map(|slab| &****slab); + get_file_chunk( + &first_buffer, + second_buffer, + is_first_buffer, + num_fields, + quote_char, + field_delimiter, + escape_char, + double_quote_escape_allowed, + ) + }; + + let second_buffer = second_buffer.cloned(); + is_first_buffer = false; if let (None, _) = file_chunk { // Exit early before spawning a new thread. @@ -323,7 +346,7 @@ async fn consume_csv_file( break; } let parse_options = parse_options.clone(); - let csv_buffer = buffer_pool.get_buffer(); + let csv_slab = buffer_pool.get_next_data().await; let projection_indices = projection_indices.clone(); let fields = fields.clone(); let read_daft_fields = read_daft_fields.clone(); @@ -349,7 +372,7 @@ async fn consume_csv_file( fields, read_daft_fields, read_schema, - csv_buffer, + csv_slab, &include_columns, predicate, sender, @@ -357,31 +380,27 @@ async fn consume_csv_file( ); } (Some(start), Some(end)) => { - if let Some(next_slab_clone) = next_slab_clone - && let Some(current_buffer) = ¤t_slab_clone.buffer - && let Some(next_buffer) = &next_slab_clone.buffer - { - let buffer_source = BufferSource::Chain(std::io::Read::chain( - Cursor::new(¤t_buffer[start..current_buffer_len]), - Cursor::new(&next_buffer[..end]), - )); - dispatch_to_parse_csv( - has_header, - parse_options, - buffer_source, - projection_indices, - fields, - read_daft_fields, - read_schema, - csv_buffer, - &include_columns, - predicate, - sender, - rows_read, - ); - } else { - panic!("Trying to read from an overflow CSV buffer that doesn't exist. Please report this issue.") - } + let first_buffer = Cursor::new(&first_buffer[start..]); + + let second_buffer = second_buffer.unwrap(); + let second_buffer = Cursor::new(&second_buffer[..end]); + + let reader = std::io::Read::chain(first_buffer, second_buffer); + + dispatch_to_parse_csv( + has_header, + parse_options, + reader, + projection_indices, + fields, + read_daft_fields, + read_schema, + csv_slab, + &include_columns, + predicate, + sender, + rows_read, + ); } _ => panic!( "Something went wrong when parsing the CSV file. Please report this issue." @@ -671,12 +690,12 @@ fn validate_csv_record( fn dispatch_to_parse_csv( has_header: bool, parse_options: CsvParseOptions, - buffer_source: BufferSource, + buffer_source: impl Read, projection_indices: Arc>, fields: Vec, read_daft_fields: Arc>>, read_schema: Arc, - csv_buffer: CsvBuffer, + csv_buffer: Slab, include_columns: &Option>, predicate: Option>, sender: Sender>, @@ -727,19 +746,19 @@ fn parse_csv_chunk( fields: Vec, read_daft_fields: Arc>>, read_schema: Arc, - csv_buffer: CsvBuffer, + mut csv_buffer: Slab, include_columns: &Option>, predicate: Option>, ) -> DaftResult> where R: std::io::Read, { - let mut chunk_buffer = csv_buffer.buffer; + let mut chunk_buffer = &mut *csv_buffer; let mut tables = vec![]; loop { //let time = Instant::now(); let (rows_read, has_more) = - local_read_rows(&mut reader, chunk_buffer.as_mut_slice()).context(ArrowSnafu {})?; + local_read_rows(&mut reader, chunk_buffer).context(ArrowSnafu {})?; //let time = Instant::now(); let chunk = projection_indices .par_iter() @@ -777,6 +796,5 @@ where break; } } - csv_buffer.pool.return_buffer(chunk_buffer); Ok(tables) } diff --git a/src/daft-csv/src/local/pool.rs b/src/daft-csv/src/local/pool.rs index ab13894462..7cac7cd672 100644 --- a/src/daft-csv/src/local/pool.rs +++ b/src/daft-csv/src/local/pool.rs @@ -7,7 +7,9 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; mod fixed_capacity_vec; -pub type FileSlab = Vec; +pub type FileSlabData = Vec; + +pub type CsvSlabData = Vec; /// A pool of reusable memory slabs for efficient I/O operations. pub struct SlabPool { @@ -51,7 +53,7 @@ impl SlabPool { impl SlabPool { /// Asynchronously retrieves the next available slab from the pool. - async fn get_next_data(&mut self) -> Slab { + pub async fn get_next_data(&mut self) -> Slab { let mut data = self .available_slabs .recv() @@ -98,16 +100,17 @@ impl Drop for Slab { } use tokio_stream::wrappers::ReceiverStream; +use arrow2::io::csv::read; /// Asynchronously reads slabs from a file and returns a stream of SharedSlabs. pub fn read_slabs( mut file: R, - iterator: impl ExactSizeIterator, -) -> impl Stream> + iterator: impl ExactSizeIterator, +) -> impl Stream> where R: AsyncRead + Unpin + Send + 'static, { - let (tx, rx) = mpsc::channel::>(iterator.len()); + let (tx, rx) = mpsc::channel::>(iterator.len()); let pool = SlabPool::new(iterator); tokio::spawn(async move { @@ -154,7 +157,7 @@ where ReceiverStream::new(rx) } -pub type WindowedSlab = heapless::Vec, 2>; +pub type WindowedSlab = heapless::Vec, 2>; /// Asynchronously reads slabs from a file and returns a stream of WindowedSlabs. /// @@ -171,11 +174,11 @@ pub type WindowedSlab = heapless::Vec, 2>; /// # Returns /// /// A `Stream` of `WindowedSlab`s. -pub fn read_slabs_windowed(file: R, iterator: I) -> impl Stream +pub fn read_slabs_windowed(file: R, iterator: I) -> impl Stream + Unpin where R: AsyncRead + Unpin + Send + 'static, - I: IntoIterator + 'static, - I::IntoIter: ExactSizeIterator + 'static, + I: IntoIterator + 'static, + I::IntoIter: ExactSizeIterator + 'static, { let iterator = iterator.into_iter(); let (tx, rx) = mpsc::channel(iterator.len()); @@ -187,7 +190,7 @@ where tokio::spawn(async move { let mut slab_stream = pin!(slab_stream); - let mut windowed_slab = heapless::Vec::, 2>::new(); + let mut windowed_slab = heapless::Vec::, 2>::new(); let mut slab_stream = slab_stream.as_mut(); while let Some(slab) = StreamExt::next(&mut slab_stream).await { diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 89a3c87fa3..13ddd7766d 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -34,18 +34,18 @@ use crate::{metadata::read_csv_schema_single, CsvConvertOptions, CsvParseOptions use daft_compression::CompressionCodec; use daft_decoding::deserialize::deserialize_column; -trait ByteRecordChunkStream: Stream>> {} -impl ByteRecordChunkStream for S where - S: Stream>> -{ -} +trait ByteRecordChunkStream: Stream>> {} +impl ByteRecordChunkStream for S +where + S: Stream>>, +{} use crate::{local::read_csv_local, local::stream_csv_local}; type TableChunkResult = - super::Result>, super::JoinSnafu, super::Error>>; -trait TableStream: Stream {} -impl TableStream for S where S: Stream {} +super::Result>, super::JoinSnafu, super::Error>>; +trait TableStream: Stream {} +impl TableStream for S where S: Stream {} #[allow(clippy::too_many_arguments)] pub fn read_csv( @@ -70,7 +70,7 @@ pub fn read_csv( io_stats, max_chunks_in_flight, ) - .await + .await }) } @@ -109,9 +109,9 @@ pub fn read_csv_bulk( io_stats, max_chunks_in_flight, ) - .await + .await }) - .context(super::JoinSnafu {}) + .context(super::JoinSnafu {}) })); let mut remaining_rows = convert_options .as_ref() @@ -155,15 +155,14 @@ pub async fn stream_csv( let uri = uri.as_str(); let (source_type, _) = parse_url(uri)?; let is_compressed = CompressionCodec::from_uri(uri).is_some(); - let use_local = false; - if matches!(source_type, SourceType::File) && !is_compressed && use_local { + if matches!(source_type, SourceType::File) && !is_compressed { let stream = stream_csv_local( uri, convert_options, parse_options.unwrap_or_default(), read_options, max_chunks_in_flight, - )?; + ).await?; Ok(Box::pin(stream)) } else { let stream = stream_csv_single( @@ -175,7 +174,7 @@ pub async fn stream_csv( io_stats, max_chunks_in_flight, ) - .await?; + .await?; Ok(Box::pin(stream)) } } @@ -229,8 +228,7 @@ async fn read_csv_single_into_table( ) -> DaftResult { let (source_type, _) = parse_url(uri)?; let is_compressed = CompressionCodec::from_uri(uri).is_some(); - let use_local = false; - if matches!(source_type, SourceType::File) && !is_compressed && use_local { + if matches!(source_type, SourceType::File) && !is_compressed { return read_csv_local( uri, convert_options, @@ -238,7 +236,7 @@ async fn read_csv_single_into_table( read_options, max_chunks_in_flight, ) - .await; + .await; } let predicate = convert_options @@ -277,7 +275,7 @@ async fn read_csv_single_into_table( io_client, io_stats, ) - .await?; + .await?; // Default max chunks in flight is set to 2x the number of cores, which should ensure pipelining of reading chunks // with the parsing of chunks on the rayon threadpool. let max_chunks_in_flight = max_chunks_in_flight.unwrap_or_else(|| { @@ -366,7 +364,7 @@ async fn stream_csv_single( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, -) -> DaftResult> + Send> { +) -> DaftResult> + Send> { let predicate = convert_options .as_ref() .and_then(|opts| opts.predicate.clone()); @@ -403,7 +401,7 @@ async fn stream_csv_single( io_client, io_stats, ) - .await?; + .await?; // Default max chunks in flight is set to 2x the number of cores, which should ensure pipelining of reading chunks // with the parsing of chunks on the rayon threadpool. let max_chunks_in_flight = max_chunks_in_flight.unwrap_or_else(|| { @@ -469,7 +467,7 @@ async fn read_csv_single_into_stream( io_client.clone(), io_stats.clone(), ) - .await?; + .await?; ( schema.to_arrow()?, Some(read_stats.mean_record_size_bytes), @@ -667,7 +665,7 @@ fn parse_into_column_array_chunk_stream( }); recv.await.context(super::OneShotRecvSnafu {})? }) - .context(super::JoinSnafu {}) + .context(super::JoinSnafu {}) })) } @@ -946,7 +944,7 @@ mod tests { None, None, ) - .await + .await }); assert!( @@ -1188,7 +1186,7 @@ mod tests { #[test] fn test_csv_read_local_escape() -> DaftResult<()> { - let file = format!("{}/test/iris_tiny_escape.csv", env!("CARGO_MANIFEST_DIR"),); + let file = format!("{}/test/iris_tiny_escape.csv", env!("CARGO_MANIFEST_DIR"), ); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1236,7 +1234,7 @@ mod tests { #[test] fn test_csv_read_local_comment() -> DaftResult<()> { - let file = format!("{}/test/iris_tiny_comment.csv", env!("CARGO_MANIFEST_DIR"),); + let file = format!("{}/test/iris_tiny_comment.csv", env!("CARGO_MANIFEST_DIR"), ); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1283,7 +1281,7 @@ mod tests { } #[test] fn test_csv_read_local_limit() -> DaftResult<()> { - let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"),); + let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"), ); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1331,7 +1329,7 @@ mod tests { #[test] fn test_csv_read_local_projection() -> DaftResult<()> { - let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"),); + let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"), ); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1441,7 +1439,7 @@ mod tests { #[test] fn test_csv_read_local_larger_than_buffer_size() -> DaftResult<()> { - let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"),); + let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"), ); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1489,7 +1487,7 @@ mod tests { #[test] fn test_csv_read_local_larger_than_chunk_size() -> DaftResult<()> { - let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"),); + let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"), ); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1537,7 +1535,7 @@ mod tests { #[test] fn test_csv_read_local_throttled_streaming() -> DaftResult<()> { - let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"),); + let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"), ); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1585,7 +1583,7 @@ mod tests { #[test] fn test_csv_read_local_nulls() -> DaftResult<()> { - let file = format!("{}/test/iris_tiny_nulls.csv", env!("CARGO_MANIFEST_DIR"),); + let file = format!("{}/test/iris_tiny_nulls.csv", env!("CARGO_MANIFEST_DIR"), ); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1762,7 +1760,7 @@ mod tests { #[test] fn test_csv_read_local_wrong_type_yields_nulls() -> DaftResult<()> { - let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"),); + let file = format!("{}/test/iris_tiny.csv", env!("CARGO_MANIFEST_DIR"), ); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true;