diff --git a/src/daft-csv/src/lib.rs b/src/daft-csv/src/lib.rs index 35c4d72006..a5a93ca56b 100644 --- a/src/daft-csv/src/lib.rs +++ b/src/daft-csv/src/lib.rs @@ -7,6 +7,7 @@ extern crate test; use common_error::DaftError; use snafu::Snafu; +pub mod local; pub mod metadata; pub mod options; #[cfg(feature = "python")] diff --git a/src/daft-csv/src/local.rs b/src/daft-csv/src/local.rs new file mode 100644 index 0000000000..3bd73f2ef0 --- /dev/null +++ b/src/daft-csv/src/local.rs @@ -0,0 +1,912 @@ +use core::str; +use std::io::{Chain, Cursor, Read}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{num::NonZeroUsize, sync::Arc, sync::Condvar, sync::Mutex}; + +use crate::ArrowSnafu; +use crate::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; +use arrow2::{ + datatypes::Field, + io::csv::read, + io::csv::read::{Reader, ReaderBuilder}, + io::csv::read_async::local_read_rows, +}; +use common_error::{DaftError, DaftResult}; +use crossbeam_channel::Sender; +use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_decoding::deserialize::deserialize_column; +use daft_dsl::{optimization::get_required_columns, Expr}; +use daft_table::Table; +use futures::{stream::BoxStream, Stream, StreamExt}; +use rayon::{ + iter::IndexedParallelIterator, + prelude::{IntoParallelRefIterator, ParallelIterator}, +}; +use snafu::ResultExt; + +use crate::read::{fields_to_projection_indices, tables_concat}; + +#[allow(clippy::doc_lazy_continuation)] +/// Our local CSV reader has the following approach to reading CSV files: +/// 1. Read the CSV file in 4MB chunks from a slab pool. +/// 2. Adjust the chunks so that chunks are contiguous and contain complete +/// CSV records. See `get_file_chunk` for more details. +/// 3. In parallel with the above, convert the adjusted chunks into byte records, +/// which are stored within pre-allocated CSV buffers. +/// 4. In parallel with the above, deserialize each CSV buffer into a Daft table +/// and stream the results. +/// +/// Slab Pool CSV Buffer Pool +/// ┌────────────────────┐ ┌────────────────────┐ +/// │ 4MB Chunks │ │ CSV Buffers │ +/// │┌───┐┌───┐┌───┐ │ │┌───┐┌───┐┌───┐ │ +/// ││ ││ ││ │ ... │ ││ ││ ││ │ ... │ +/// │└─┬─┘└─┬─┘└───┘ │ │└─┬─┘└─┬─┘└───┘ │ +/// └──┼────┼────────────┘ └──┼────┼────────────┘ +/// │ │ │ │ +/// ───────┐ │ │ │ │ +/// /│ │ │ │ │ │ +/// /─┘ │ │ │ │ │ +/// │ │ ▼ ▼ ▼ ▼ +/// │ ─┼───►┌───┐ ┌───┐ ┌────┐ ┬--─┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ +/// │ │ │ │ │ │ ──────► │ ┬┘┌─┘ ┬─┘ ───────► │ │ │ │ ──────────► │ │ │ │ +/// │ CSV File │ └───┘ └───┘ └───┴ └───┘ └───┘ └───┘ └───┘ └───┘ +/// │ │ Chain of buffers Adjusted chunks Vectors of ByteRecords Stream of Daft tables +/// │ │ +/// └──────────┘ + +/// A pool of ByteRecord slabs. Used for deserializing CSV. +#[derive(Debug)] +struct CsvBufferPool { + buffers: Mutex>>, + buffer_size: usize, + record_buffer_size: usize, + num_fields: usize, +} + +/// A slab of ByteRecords. Used for deserializing CSV. +struct CsvBuffer { + pool: Arc, + buffer: Vec, +} + +impl CsvBufferPool { + pub fn new( + record_buffer_size: usize, + num_fields: usize, + chunk_size_rows: usize, + initial_pool_size: usize, + ) -> Self { + let chunk_buffers = vec![ + vec![ + read::ByteRecord::with_capacity(record_buffer_size, num_fields); + chunk_size_rows + ]; + initial_pool_size + ]; + CsvBufferPool { + buffers: Mutex::new(chunk_buffers), + buffer_size: chunk_size_rows, + record_buffer_size, + num_fields, + } + } + + pub fn get_buffer(self: &Arc) -> CsvBuffer { + let mut buffers = self.buffers.lock().unwrap(); + let buffer = buffers.pop(); + let buffer = match buffer { + Some(buffer) => buffer, + None => { + println!("csv buf empty"); + vec![ + read::ByteRecord::with_capacity(self.record_buffer_size, self.num_fields); + self.buffer_size + ] + } + }; + + CsvBuffer { + pool: Arc::clone(self), + buffer, + } + } + + fn return_buffer(&self, buffer: Vec) { + let mut buffers = self.buffers.lock().unwrap(); + buffers.push(buffer); + } +} + +// The default size of a slab used for reading CSV files in chunks. Currently set to 4MB. +const SLABSIZE: usize = 4 * 1024 * 1024; +// The default number of slabs in a slab pool. +const SLABPOOL_DEFAULT_SIZE: usize = 20; + +/// A pool of 4MB slabs. Used for reading CSV files in 4MB chunks. +#[derive(Debug)] +struct SlabPool { + buffers: Mutex>>, + condvar: Condvar, +} + +/// A 4MB slab of bytes. Used for reading CSV files in 4MB chunks. +#[derive(Clone)] +struct Slab { + pool: Arc, + // We wrap the Arc in an Option so that when a Slab is being dropped, we can move the Slab's reference + // to the Arc back to the slab pool. + buffer: Option>, +} + +impl Drop for Slab { + fn drop(&mut self) { + // Move the buffer back to the slab pool. + if let Some(buffer) = self.buffer.take() { + self.pool.return_buffer(buffer); + } + } +} + +impl SlabPool { + pub fn new() -> Self { + let chunk_buffers: Vec> = (0..SLABPOOL_DEFAULT_SIZE) + .map(|_| Arc::new([0; SLABSIZE])) + .collect(); + SlabPool { + buffers: Mutex::new(chunk_buffers), + condvar: Condvar::new(), + } + } + + pub fn get_buffer(self: &Arc) -> Arc<[u8; SLABSIZE]> { + let mut buffers = self.buffers.lock().unwrap(); + while buffers.is_empty() { + buffers = self.condvar.wait(buffers).unwrap(); + } + buffers.pop().unwrap() + } + + fn return_buffer(&self, buffer: Arc<[u8; SLABSIZE]>) { + let mut buffers = self.buffers.lock().unwrap(); + buffers.push(buffer); + self.condvar.notify_one(); + } +} + +/// A data structure that holds either a single slice of bytes, or a chain of two slices of bytes. +/// See the description to `parse_json` for more details. +#[derive(Debug)] +enum BufferSource<'a> { + Single(Cursor<&'a [u8]>), + Chain(Chain, Cursor<&'a [u8]>>), +} + +/// Read implementation that allows BufferSource to be used by csv::read::Reader. +impl<'a> Read for BufferSource<'a> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self { + BufferSource::Single(cursor) => std::io::Read::read(cursor, buf), + BufferSource::Chain(chain) => chain.read(buf), + } + } +} + +pub async fn read_csv_local( + uri: &str, + convert_options: Option, + parse_options: CsvParseOptions, + read_options: Option, + max_chunks_in_flight: Option, +) -> DaftResult { + let stream = stream_csv_local( + uri, + convert_options, + parse_options, + read_options, + max_chunks_in_flight, + )?; + tables_concat(tables_stream_collect(Box::pin(stream)).await) +} + +async fn tables_stream_collect(stream: BoxStream<'static, DaftResult
>) -> Vec
{ + stream + .filter_map(|result| async { + match result { + Ok(table) => Some(table), + Err(_) => None, // Skips errors; you could log them or handle differently + } + }) + .collect() + .await +} + +pub fn stream_csv_local( + uri: &str, + convert_options: Option, + parse_options: CsvParseOptions, + read_options: Option, + max_chunks_in_flight: Option, +) -> DaftResult> + Send> { + let uri = uri.trim_start_matches("file://"); + let file = std::fs::File::open(uri)?; + + // TODO(desmond): This logic is repeated multiple times in the csv reader files. Should dedup. + let predicate = convert_options + .as_ref() + .and_then(|opts| opts.predicate.clone()); + + let limit = convert_options.as_ref().and_then(|opts| opts.limit); + + let include_columns = convert_options + .as_ref() + .and_then(|opts| opts.include_columns.clone()); + + let convert_options = match (convert_options, &predicate) { + (None, _) => None, + (co, None) => co, + (Some(mut co), Some(predicate)) => { + if let Some(ref mut include_columns) = co.include_columns { + let required_columns_for_predicate = get_required_columns(predicate); + for rc in required_columns_for_predicate { + if include_columns.iter().all(|c| c.as_str() != rc.as_str()) { + include_columns.push(rc) + } + } + } + // If we have a limit and a predicate, remove limit for stream. + co.limit = None; + Some(co) + } + } + .unwrap_or_default(); + // End of `should dedup`. + + // TODO(desmond): We should do better schema inference here. + let schema = convert_options.clone().schema.unwrap().to_arrow()?; + let n_threads: usize = std::thread::available_parallelism() + .unwrap_or(NonZeroUsize::new(2).unwrap()) + .into(); + let chunk_size = read_options + .as_ref() + .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) + .unwrap_or(DEFAULT_CHUNK_SIZE); + let projection_indices = fields_to_projection_indices( + &schema.clone().fields, + &convert_options.clone().include_columns, + ); + let fields = schema.clone().fields; + let fields_subset = projection_indices + .iter() + .map(|i| fields.get(*i).unwrap().into()) + .collect::>(); + let read_schema = Arc::new(daft_core::schema::Schema::new(fields_subset)?); + let read_daft_fields = Arc::new( + read_schema + .fields + .values() + .map(|f| Arc::new(f.clone())) + .collect::>(), + ); + // TODO(desmond): Need better upfront estimators. Cory did something like what we need here: https://github.com/universalmind303/Daft/blob/7b40f23a5ff83aba4ab059b62ac781d7766be0b1/src/daft-json/src/local.rs#L338 + let estimated_mean_row_size = 100f64; + let estimated_std_row_size = 20f64; + let record_buffer_size = (estimated_mean_row_size + estimated_std_row_size).ceil() as usize; + let chunk_size_rows = (chunk_size as f64 / record_buffer_size as f64).ceil() as usize; + let num_fields = schema.fields.len(); + // TODO(desmond): We might consider creating per-process buffer pools and slab pools. + let buffer_pool = Arc::new(CsvBufferPool::new( + record_buffer_size, + num_fields, + chunk_size_rows, + n_threads * 2, + )); + let slabpool = Arc::new(SlabPool::new()); + // We suppose that each slab of CSV data produces (chunk size / slab size) number of Daft tables. We + // then double this capacity to ensure that our channel is never full and our threads won't deadlock. + let (sender, receiver) = + crossbeam_channel::bounded(max_chunks_in_flight.unwrap_or(2 * chunk_size / SLABSIZE)); + rayon::spawn(move || { + consume_csv_file( + file, + buffer_pool, + slabpool, + parse_options, + projection_indices, + read_daft_fields, + read_schema, + fields, + num_fields, + &include_columns, + predicate, + limit, + sender, + ); + }); + let result_stream = futures::stream::iter(receiver); + Ok(result_stream) +} + +/// Consumes the CSV file and sends the results to `sender`. +#[allow(clippy::too_many_arguments)] +fn consume_csv_file( + mut file: std::fs::File, + buffer_pool: Arc, + slabpool: Arc, + parse_options: CsvParseOptions, + projection_indices: Arc>, + read_daft_fields: Arc>>, + read_schema: Arc, + fields: Vec, + num_fields: usize, + include_columns: &Option>, + predicate: Option>, + limit: Option, + sender: Sender>, +) { + let rows_read = Arc::new(AtomicUsize::new(0)); + let mut has_header = parse_options.has_header; + let total_len = file.metadata().unwrap().len() as usize; + let field_delimiter = parse_options.delimiter; + let escape_char = parse_options.escape_char; + let quote_char = parse_options.quote; + let double_quote_escape_allowed = parse_options.double_quote; + let mut total_bytes_read = 0; + let mut next_slab = None; + let mut next_buffer_len = 0; + let mut first_buffer = true; + loop { + let limit_reached = limit.map_or(false, |limit| { + let current_rows_read = rows_read.load(Ordering::Relaxed); + current_rows_read >= limit + }); + if limit_reached { + break; + } + let (current_slab, current_buffer_len) = match next_slab.take() { + Some(next_slab) => { + total_bytes_read += next_buffer_len; + (next_slab, next_buffer_len) + } + None => { + let mut buffer = slabpool.get_buffer(); + match Arc::get_mut(&mut buffer) { + Some(inner_buffer) => { + let bytes_read = file.read(inner_buffer).unwrap(); + if bytes_read == 0 { + slabpool.return_buffer(buffer); + break; + } + total_bytes_read += bytes_read; + ( + Arc::new(Slab { + pool: Arc::clone(&slabpool), + buffer: Some(buffer), + }), + bytes_read, + ) + } + None => { + slabpool.return_buffer(buffer); + break; + } + } + } + }; + (next_slab, next_buffer_len) = if total_bytes_read < total_len { + let mut next_buffer = slabpool.get_buffer(); + match Arc::get_mut(&mut next_buffer) { + Some(inner_buffer) => { + let bytes_read = file.read(inner_buffer).unwrap(); + if bytes_read == 0 { + slabpool.return_buffer(next_buffer); + (None, 0) + } else { + ( + Some(Arc::new(Slab { + pool: Arc::clone(&slabpool), + buffer: Some(next_buffer), + })), + bytes_read, + ) + } + } + None => { + slabpool.return_buffer(next_buffer); + break; + } + } + } else { + (None, 0) + }; + let file_chunk = get_file_chunk( + unsafe_clone_buffer(¤t_slab.buffer), + current_buffer_len, + next_slab + .as_ref() + .map(|slab| unsafe_clone_buffer(&slab.buffer)), + next_buffer_len, + first_buffer, + num_fields, + quote_char, + field_delimiter, + escape_char, + double_quote_escape_allowed, + ); + first_buffer = false; + if let (None, _) = file_chunk { + // Return the buffer. It doesn't matter that we still have a reference to the slab. We're going to fallback + // and the slabs will be useless. + slabpool.return_buffer(unsafe_clone_buffer(¤t_slab.buffer)); + // Exit early before spawning a new thread. + break; + // TODO(desmond): we should fallback instead. + } + let current_slab_clone = Arc::clone(¤t_slab); + let next_slab_clone = next_slab.clone(); + let parse_options = parse_options.clone(); + let csv_buffer = buffer_pool.get_buffer(); + let projection_indices = projection_indices.clone(); + let fields = fields.clone(); + let read_daft_fields = read_daft_fields.clone(); + let read_schema = read_schema.clone(); + let include_columns = include_columns.clone(); + let predicate = predicate.clone(); + let sender = sender.clone(); + let rows_read = Arc::clone(&rows_read); + rayon::spawn(move || { + let limit_reached = limit.map_or(false, |limit| { + let current_rows_read = rows_read.load(Ordering::Relaxed); + current_rows_read >= limit + }); + if !limit_reached { + match file_chunk { + (Some(start), None) => { + if let Some(buffer) = ¤t_slab_clone.buffer { + let buffer_source = BufferSource::Single(Cursor::new( + &buffer[start..current_buffer_len], + )); + dispatch_to_parse_csv( + has_header, + parse_options, + buffer_source, + projection_indices, + fields, + read_daft_fields, + read_schema, + csv_buffer, + &include_columns, + predicate, + sender, + rows_read, + ); + } else { + panic!("Trying to read from a CSV buffer that doesn't exist. Please report this issue.") + } + } + (Some(start), Some(end)) => { + if let Some(next_slab_clone) = next_slab_clone + && let Some(current_buffer) = ¤t_slab_clone.buffer + && let Some(next_buffer) = &next_slab_clone.buffer + { + let buffer_source = BufferSource::Chain(std::io::Read::chain( + Cursor::new(¤t_buffer[start..current_buffer_len]), + Cursor::new(&next_buffer[..end]), + )); + dispatch_to_parse_csv( + has_header, + parse_options, + buffer_source, + projection_indices, + fields, + read_daft_fields, + read_schema, + csv_buffer, + &include_columns, + predicate, + sender, + rows_read, + ); + } else { + panic!("Trying to read from an overflow CSV buffer that doesn't exist. Please report this issue.") + } + } + _ => panic!( + "Something went wrong when parsing the CSV file. Please report this issue." + ), + }; + } + }); + has_header = false; + if total_bytes_read >= total_len { + break; + } + } +} + +/// Unsafe helper function that extracts the buffer from an &Option>. Users should +/// ensure that the buffer is Some, otherwise this function causes the process to panic. +fn unsafe_clone_buffer(buffer: &Option>) -> Arc<[u8; SLABSIZE]> { + match buffer { + Some(buffer) => Arc::clone(buffer), + None => panic!("Tried to clone a CSV slab that doesn't exist. Please report this error."), + } +} + +#[allow(clippy::doc_lazy_continuation)] +/// Helper function that determines what chunk of data to parse given a starting position within the +/// file, and the desired initial chunk size. +/// +/// Given a starting position, we use our chunk size to compute a preliminary start and stop +/// position. For example, we can visualize all preliminary chunks in a file as follows. +/// +/// Chunk 1 Chunk 2 Chunk 3 Chunk N +/// ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ +/// │ │ │\n │ │ \n │ │ \n │ +/// │ │ │ │ │ │ │ │ +/// │ │ │ \n │ │ │ │ │ +/// │ \n │ │ │ │ \n │ │ │ +/// │ │ │ │ │ │ ... │ \n │ +/// │ │ │ \n │ │ │ │ │ +/// │ \n │ │ │ │ │ │ │ +/// │ │ │ │ │ \n │ │ \n │ +/// └──────────┘ └──────────┘ └──────────┘ └──────────┘ +/// +/// However, record boundaries (i.e. the \n terminators) do not align nicely with these preliminary +/// chunk boundaries. So we adjust each preliminary chunk as follows: +/// - Find the first record terminator from the chunk's start. This is the new starting position. +/// - Find the first record terminator from the chunk's end. This is the new ending position. +/// - If a given preliminary chunk doesn't contain a record terminator, the adjusted chunk is empty. +/// +/// For example: +/// +/// Adjusted Chunk 1 Adj. Chunk 2 Adj. Chunk 3 Adj. Chunk N +/// ┌──────────────────┐┌─────────────────┐ ┌────────┐ ┌─┐ +/// │ \n││ \n│ │ \n│ \n │ │ +/// │ ┌───────┘│ ┌──────────┘ │ ┌─────┘ │ │ +/// │ │ ┌───┘ \n │ ┌───────┘ │ ┌────────┘ │ +/// │ \n │ │ │ │ \n │ │ │ +/// │ │ │ │ │ │ ... │ \n │ +/// │ │ │ \n │ │ │ │ │ +/// │ \n │ │ │ │ │ │ │ +/// │ │ │ │ │ \n │ │ \n │ +/// └──────────┘ └──────────┘ └──────────┘ └──────────┘ +/// +/// Using this method, we now have adjusted chunks that are aligned with record boundaries, that do +/// not overlap, and that fully cover every byte in the CSV file. Parsing each adjusted chunk can +/// now happen in parallel. +/// +/// This is the same method as described in: +/// Ge, Chang et al. “Speculative Distributed CSV Data Parsing for Big Data Analytics.” Proceedings of the 2019 International Conference on Management of Data (2019). +/// +/// Another observation is that seeing a pure \n character is not necessarily indicative of a record +/// terminator. We need to consider whether the \n character was seen within a quoted field, since the +/// string "some text \n some text" is a valid CSV string field. To do this, we carry out the following +/// algorithm: +/// 1. Find a \n character. +/// 2. Check if the CSV string immediately following this \n character is valid, i.e. does it parse +/// as valid CSV, and does it produce the same number of fields as our schema. +/// 2a. If there is a valid record at this point, then we assume that the \n we saw was a valid terminator. +/// 2b. If the record at this point is invalid, then this was likely a \n in a quoted field. Find the next +/// \n character and go back to 2. +#[allow(clippy::too_many_arguments)] +fn get_file_chunk( + current_buffer: Arc<[u8; SLABSIZE]>, + current_buffer_len: usize, + next_buffer: Option>, + next_buffer_len: usize, + first_buffer: bool, + num_fields: usize, + quote_char: u8, + field_delimiter: u8, + escape_char: Option, + double_quote_escape_allowed: bool, +) -> (Option, Option) { + // TODO(desmond): There is a potential fast path here when `escape_char` is None: simply check for \n characters. + let start = if !first_buffer { + let start = next_line_position( + ¤t_buffer[..current_buffer_len], + 0, + num_fields, + quote_char, + field_delimiter, + escape_char, + double_quote_escape_allowed, + ); + match start { + Some(_) => start, + None => return (None, None), // If the record size is >= 4MB, return None and fallback. + } + } else { + Some(0) + }; + // If there is a next buffer, find the adjusted chunk in that buffer. If there's no next buffer, we're at the end of the file. + let end = if let Some(next_buffer) = next_buffer { + let end = next_line_position( + &next_buffer[..next_buffer_len], + 0, + num_fields, + quote_char, + field_delimiter, + escape_char, + double_quote_escape_allowed, + ); + match end { + Some(_) => end, + None => return (None, None), // If the record size is >= 4MB, return None and fallback. + } + } else { + None + }; + (start, end) +} + +/// Helper function that finds the first valid record terminator in a buffer. +fn next_line_position( + buffer: &[u8], + offset: usize, + num_fields: usize, + quote_char: u8, + field_delimiter: u8, + escape_char: Option, + double_quote_escape_allowed: bool, +) -> Option { + let mut start = offset; + loop { + start = match newline_position(&buffer[start..]) { + // Start reading after the first record terminator from the start of the chunk. + Some(pos) => start + pos + 1, + None => return None, + }; + if start >= buffer.len() { + return None; + } + if validate_csv_record( + &buffer[start..], + num_fields, + quote_char, + field_delimiter, + escape_char, + double_quote_escape_allowed, + ) { + return Some(start); + } + } +} + +// Daft does not currently support non-\n record terminators (e.g. carriage return \r, which only +// matters for pre-Mac OS X). +const NEWLINE: u8 = b'\n'; +const DOUBLE_QUOTE: u8 = b'"'; +const DEFAULT_CHUNK_SIZE: usize = 4 * 1024 * 1024; // 1MiB. TODO(desmond): This should be tuned. + +/// Helper function that finds the first new line character (\n) in the given byte slice. +fn newline_position(buffer: &[u8]) -> Option { + // Assuming we are searching for the ASCII `\n` character, we don't need to do any special + // handling for UTF-8, since a `\n` value always corresponds to an ASCII `\n`. + // For more details, see: https://en.wikipedia.org/wiki/UTF-8#Encoding + memchr::memchr(NEWLINE, buffer) +} + +/// Csv states used by the state machine in `validate_csv_record`. +#[derive(Clone)] +enum CsvState { + FieldStart, + RecordEnd, + UnquotedField, + QuotedField, + Unquote, +} + +/// State machine that validates whether the current buffer starts at a valid csv record. +/// See `get_file_chunk` for more details. +fn validate_csv_record( + buffer: &[u8], + num_fields: usize, + quote_char: u8, + field_delimiter: u8, + escape_char: Option, + double_quote_escape_allowed: bool, +) -> bool { + let mut state = CsvState::FieldStart; + let mut index = 0; + let mut num_fields_seen = 0; + loop { + if index >= buffer.len() { + // We've reached the end of the buffer without seeing a valid record. + return false; + } + match state { + CsvState::FieldStart => { + let byte = buffer[index]; + if byte == NEWLINE { + state = CsvState::RecordEnd; + } else if byte == quote_char { + state = CsvState::QuotedField; + index += 1; + } else { + state = CsvState::UnquotedField; + } + } + CsvState::RecordEnd => { + return num_fields_seen == num_fields; + } + CsvState::UnquotedField => { + // We follow the convention where an unquoted field does consider escape characters. + while index < buffer.len() { + let byte = buffer[index]; + if byte == NEWLINE { + num_fields_seen += 1; + state = CsvState::RecordEnd; + break; + } + if byte == field_delimiter { + num_fields_seen += 1; + state = CsvState::FieldStart; + index += 1; + break; + } + index += 1; + } + } + CsvState::QuotedField => { + while index < buffer.len() { + let byte = buffer[index]; + if byte == quote_char { + state = CsvState::Unquote; + index += 1; + break; + } + if let Some(escape_char) = escape_char + && byte == escape_char + { + // Skip the next character. + index += 1; + } + index += 1; + } + } + CsvState::Unquote => { + let byte = buffer[index]; + if let Some(escape_char) = escape_char + && byte == escape_char + && escape_char == quote_char + && (byte != DOUBLE_QUOTE || double_quote_escape_allowed) + { + state = CsvState::QuotedField; + index += 1; + continue; + } + if byte == NEWLINE { + num_fields_seen += 1; + state = CsvState::RecordEnd; + continue; + } + if byte == field_delimiter { + num_fields_seen += 1; + state = CsvState::FieldStart; + index += 1; + continue; + } + // Other characters are not allowed after a quote. This is invalid CSV. + return false; + } + } + } +} + +/// Helper function that takes in a BufferSource, calls parse_csv() to extract table values from +/// the buffer source, then streams the results to `sender`. +#[allow(clippy::too_many_arguments)] +fn dispatch_to_parse_csv( + has_header: bool, + parse_options: CsvParseOptions, + buffer_source: BufferSource, + projection_indices: Arc>, + fields: Vec, + read_daft_fields: Arc>>, + read_schema: Arc, + csv_buffer: CsvBuffer, + include_columns: &Option>, + predicate: Option>, + sender: Sender>, + rows_read: Arc, +) { + let table_results = { + let rdr = ReaderBuilder::new() + .has_headers(has_header) + .delimiter(parse_options.delimiter) + .double_quote(parse_options.double_quote) + .quote(parse_options.quote) + .escape(parse_options.escape_char) + .comment(parse_options.comment) + .flexible(parse_options.allow_variable_columns) + .from_reader(buffer_source); + parse_csv_chunk( + rdr, + projection_indices, + fields, + read_daft_fields, + read_schema, + csv_buffer, + include_columns, + predicate, + ) + }; + match table_results { + Ok(tables) => { + for table in tables { + let table_len = table.len(); + sender.send(Ok(table)).unwrap(); + // Atomically update the number of rows read only after the result has + // been sent. In theory we could wrap these steps in a mutex, but + // applying limit at this layer can be best-effort with no adverse + // side effects. + rows_read.fetch_add(table_len, Ordering::Relaxed); + } + } + Err(e) => sender.send(Err(e)).unwrap(), + } +} + +/// Helper function that consumes a CSV reader and turns it into a vector of Daft tables. +#[allow(clippy::too_many_arguments)] +fn parse_csv_chunk( + mut reader: Reader, + projection_indices: Arc>, + fields: Vec, + read_daft_fields: Arc>>, + read_schema: Arc, + csv_buffer: CsvBuffer, + include_columns: &Option>, + predicate: Option>, +) -> DaftResult> +where + R: std::io::Read, +{ + let mut chunk_buffer = csv_buffer.buffer; + let mut tables = vec![]; + loop { + //let time = Instant::now(); + let (rows_read, has_more) = + local_read_rows(&mut reader, chunk_buffer.as_mut_slice()).context(ArrowSnafu {})?; + //let time = Instant::now(); + let chunk = projection_indices + .par_iter() + .enumerate() + .map(|(i, proj_idx)| { + let deserialized_col = deserialize_column( + &chunk_buffer[0..rows_read], + *proj_idx, + fields[*proj_idx].data_type().clone(), + 0, + ); + Series::try_from_field_and_arrow_array( + read_daft_fields[i].clone(), + cast_array_for_daft_if_needed(deserialized_col?), + ) + }) + .collect::>>()?; + let num_rows = chunk.first().map(|s| s.len()).unwrap_or(0); + let table = Table::new_unchecked(read_schema.clone(), chunk, num_rows); + let table = if let Some(predicate) = &predicate { + let filtered = table.filter(&[predicate.clone()])?; + if let Some(include_columns) = &include_columns { + filtered.get_columns(include_columns.as_slice())? + } else { + filtered + } + } else { + table + }; + tables.push(table); + + // The number of record might exceed the number of byte records we've allocated. + // Retry until all byte records in this chunk are read. + if !has_more { + break; + } + } + csv_buffer.pool.return_buffer(chunk_buffer); + Ok(tables) +} diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 3ed49204b3..9609e7bb63 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -1,19 +1,16 @@ -use std::{collections::HashMap, num::NonZeroUsize, sync::Arc, sync::Mutex}; - -use std::sync::atomic::{AtomicUsize, Ordering}; +use core::str; +use std::{collections::HashMap, num::NonZeroUsize, sync::Arc}; use arrow2::{ datatypes::Field, - io::csv::read, - io::csv::read::{Reader, ReaderBuilder}, io::csv::read_async, - io::csv::read_async::{local_read_rows, read_rows, AsyncReaderBuilder}, + io::csv::read_async::{read_rows, AsyncReaderBuilder}, }; use async_compat::{Compat, CompatExt}; use common_error::{DaftError, DaftResult}; use csv_async::AsyncReader; use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; -use daft_dsl::{optimization::get_required_columns, Expr}; +use daft_dsl::optimization::get_required_columns; use daft_io::{get_runtime, parse_url, GetResult, IOClient, IOStatsRef, SourceType}; use daft_table::Table; use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; @@ -32,8 +29,8 @@ use tokio::{ }; use tokio_util::io::StreamReader; +use crate::ArrowSnafu; use crate::{metadata::read_csv_schema_single, CsvConvertOptions, CsvParseOptions, CsvReadOptions}; -use crate::{ArrowSnafu, StdIOSnafu}; use daft_compression::CompressionCodec; use daft_decoding::deserialize::deserialize_column; @@ -43,6 +40,8 @@ impl ByteRecordChunkStream for S where { } +use crate::{local::read_csv_local, local::stream_csv_local}; + type TableChunkResult = super::Result>, super::JoinSnafu, super::Error>>; trait TableStream: Stream {} @@ -156,16 +155,14 @@ pub async fn stream_csv( let uri = uri.as_str(); let (source_type, _) = parse_url(uri)?; let is_compressed = CompressionCodec::from_uri(uri).is_some(); - let use_local_reader = false; // TODO(desmond): Feature under dev. - if matches!(source_type, SourceType::File) && !is_compressed && use_local_reader { + if matches!(source_type, SourceType::File) && !is_compressed { let stream = stream_csv_local( uri, convert_options, parse_options.unwrap_or_default(), read_options, max_chunks_in_flight, - ) - .await?; + )?; Ok(Box::pin(stream)) } else { let stream = stream_csv_single( @@ -182,7 +179,7 @@ pub async fn stream_csv( } } -fn tables_concat(mut tables: Vec
) -> DaftResult
{ +pub fn tables_concat(mut tables: Vec
) -> DaftResult
{ if tables.is_empty() { return Err(DaftError::ValueError( "Need at least 1 Table to perform concat".to_string(), @@ -220,387 +217,6 @@ fn tables_concat(mut tables: Vec
) -> DaftResult
{ ) } -#[derive(Debug)] -struct CsvBufferPool { - buffers: Mutex>>, - buffer_size: usize, - record_buffer_size: usize, - num_fields: usize, -} - -struct CsvBufferPoolRef<'a> { - pool: &'a CsvBufferPool, - buffer: Vec, -} - -impl CsvBufferPool { - pub fn new( - record_buffer_size: usize, - num_fields: usize, - chunk_size_rows: usize, - initial_pool_size: usize, - ) -> Self { - let chunk_buffers = vec![ - vec![ - read::ByteRecord::with_capacity(record_buffer_size, num_fields); - chunk_size_rows - ]; - initial_pool_size - ]; - CsvBufferPool { - buffers: Mutex::new(chunk_buffers), - buffer_size: chunk_size_rows, - record_buffer_size, - num_fields, - } - } - - pub fn get_buffer(&self) -> CsvBufferPoolRef { - let mut buffers = self.buffers.lock().unwrap(); - let buffer = buffers.pop(); - let buffer = match buffer { - Some(buffer) => buffer, - None => { - vec![ - read::ByteRecord::with_capacity(self.record_buffer_size, self.num_fields); - self.buffer_size - ] - } - }; - - CsvBufferPoolRef { pool: self, buffer } - } - - fn return_buffer(&self, buffer: Vec) { - let mut buffers = self.buffers.lock().unwrap(); - buffers.push(buffer); - } -} - -// Daft does not currently support non-\n record terminators (e.g. carriage return \r, which only -// matters for pre-Mac OS X). -const NEWLINE: u8 = b'\n'; -const DEFAULT_CHUNK_SIZE: usize = 4 * 1024 * 1024; // 1MiB. TODO(desmond): This should be tuned. - -/// Helper function that finds the first new line character (\n) in the given byte slice. -fn next_line_position(input: &[u8]) -> Option { - // Assuming we are searching for the ASCII `\n` character, we don't need to do any special - // handling for UTF-8, since a `\n` value always corresponds to an ASCII `\n`. - // For more details, see: https://en.wikipedia.org/wiki/UTF-8#Encoding - memchr::memchr(NEWLINE, input) -} - -/// Helper function that determines what chunk of data to parse given a starting position within the -/// file, and the desired initial chunk size. -/// -/// Given a starting position, we use our chunk size to compute a preliminary start and stop -/// position. For example, we can visualize all preliminary chunks in a file as follows. -/// -/// Chunk 1 Chunk 2 Chunk 3 Chunk N -/// ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ -/// │ │ │\n │ │ \n │ │ \n │ -/// │ │ │ │ │ │ │ │ -/// │ │ │ \n │ │ │ │ │ -/// │ \n │ │ │ │ \n │ │ │ -/// │ │ │ │ │ │ ... │ \n │ -/// │ │ │ \n │ │ │ │ │ -/// │ \n │ │ │ │ │ │ │ -/// │ │ │ │ │ \n │ │ \n │ -/// └──────────┘ └──────────┘ └──────────┘ └──────────┘ -/// -/// However, record boundaries (i.e. the \n terminators) do not align nicely with these preliminary -/// chunk boundaries. So we adjust each preliminary chunk as follows: -/// - Find the first record terminator from the chunk's start. This is the new starting position. -/// - Find the first record terminator from the chunk's end. This is the new ending position. -/// - If a given preliminary chunk doesn't contain a record terminator, the adjusted chunk is empty. -/// -/// For example: -/// -/// Adjusted Chunk 1 Adj. Chunk 2 Adj. Chunk 3 Adj. Chunk N -/// ┌──────────────────┐┌─────────────────┐ ┌────────┐ ┌─┐ -/// │ \n││ \n│ │ \n│ \n │ │ -/// │ ┌───────┘│ ┌──────────┘ │ ┌─────┘ │ │ -/// │ │ ┌───┘ \n │ ┌───────┘ │ ┌────────┘ │ -/// │ \n │ │ │ │ \n │ │ │ -/// │ │ │ │ │ │ ... │ \n │ -/// │ │ │ \n │ │ │ │ │ -/// │ \n │ │ │ │ │ │ │ -/// │ │ │ │ │ \n │ │ \n │ -/// └──────────┘ └──────────┘ └──────────┘ └──────────┘ -/// -/// Using this method, we now have adjusted chunks that are aligned with record boundaries, that do -/// not overlap, and that fully cover every byte in the CSV file. Parsing each adjusted chunk can -/// now happen in parallel. -/// -/// This is the same method as described in: -/// Ge, Chang et al. “Speculative Distributed CSV Data Parsing for Big Data Analytics.” Proceedings of the 2019 International Conference on Management of Data (2019). -fn get_file_chunk(bytes: &[u8], start: usize, chunk_size: usize) -> Option<(usize, usize)> { - let stop = start + chunk_size; - let start = if start == 0 { - 0 - } else { - match next_line_position(&bytes[start..]) { - // Start reading after the first record terminator from the start of the chunk. - Some(pos) => start + pos + 1, - // If there's no record terminator found, then the previous chunk reader would have - // consumed the current chunk. Hence, we skip. - None => return None, - } - }; - // If the first record terminator comes after this chunk, then the previous chunk reader would - // have consumed the current chunk. Hence, we skip. - if start > stop { - return None; - } - let stop = if stop < bytes.len() { - match next_line_position(&bytes[stop..]) { - // Read up to the first terminator from the end of the chunk. - Some(pos) => stop + pos, - None => bytes.len(), - } - } else { - bytes.len() - }; - Some((start, stop)) -} - -#[allow(clippy::too_many_arguments)] -fn parse_csv_chunk( - mut reader: Reader, - projection_indices: Arc>, - fields: Vec, - read_daft_fields: Arc>>, - read_schema: Arc, - buf: CsvBufferPoolRef, - include_columns: &Option>, - predicate: Option>, -) -> DaftResult> -where - R: std::io::Read, -{ - let mut chunk_buffer = buf.buffer; - let mut tables = vec![]; - loop { - let (rows_read, has_more) = - local_read_rows(&mut reader, chunk_buffer.as_mut_slice()).context(ArrowSnafu {})?; - let chunk = projection_indices - .par_iter() - .enumerate() - .map(|(i, proj_idx)| { - let deserialized_col = deserialize_column( - &chunk_buffer[0..rows_read], - *proj_idx, - fields[*proj_idx].data_type().clone(), - 0, - ); - Series::try_from_field_and_arrow_array( - read_daft_fields[i].clone(), - cast_array_for_daft_if_needed(deserialized_col?), - ) - }) - .collect::>>()?; - let num_rows = chunk.first().map(|s| s.len()).unwrap_or(0); - let table = Table::new_unchecked(read_schema.clone(), chunk, num_rows); - let table = if let Some(predicate) = &predicate { - let filtered = table.filter(&[predicate.clone()])?; - if let Some(include_columns) = &include_columns { - filtered.get_columns(include_columns.as_slice())? - } else { - filtered - } - } else { - table - }; - tables.push(table); - - // The number of record might exceed the number of byte records we've allocated. - // Retry until all byte records in this chunk are read. - if !has_more { - break; - } - } - buf.pool.return_buffer(chunk_buffer); - Ok(tables) -} - -async fn stream_csv_local( - uri: &str, - convert_options: Option, - parse_options: CsvParseOptions, - read_options: Option, - max_chunks_in_flight: Option, -) -> DaftResult> + Send> { - let uri = uri.trim_start_matches("file://"); - let file = std::fs::File::open(uri)?; - let mmap = unsafe { memmap2::Mmap::map(&file) }.context(StdIOSnafu)?; - let bytes = &mmap[..]; - - // TODO(desmond): This logic is repeated multiple times in this file. Should dedup. - let predicate = convert_options - .as_ref() - .and_then(|opts| opts.predicate.clone()); - - let limit = convert_options.as_ref().and_then(|opts| opts.limit); - - let include_columns = convert_options - .as_ref() - .and_then(|opts| opts.include_columns.clone()); - - let convert_options = match (convert_options, &predicate) { - (None, _) => None, - (co, None) => co, - (Some(mut co), Some(predicate)) => { - if let Some(ref mut include_columns) = co.include_columns { - let required_columns_for_predicate = get_required_columns(predicate); - for rc in required_columns_for_predicate { - if include_columns.iter().all(|c| c.as_str() != rc.as_str()) { - include_columns.push(rc) - } - } - } - // If we have a limit and a predicate, remove limit for stream. - co.limit = None; - Some(co) - } - } - .unwrap_or_default(); - // End of `should dedup`. - - // TODO(desmond): We should do better schema inference here. - let schema = convert_options.clone().schema.unwrap().to_arrow()?; - let n_threads: usize = std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(2).unwrap()) - .into(); - let chunk_size = read_options - .as_ref() - .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) - .unwrap_or(DEFAULT_CHUNK_SIZE); - let projection_indices = fields_to_projection_indices( - &schema.clone().fields, - &convert_options.clone().include_columns, - ); - let fields = schema.clone().fields; - let fields_subset = projection_indices - .iter() - .map(|i| fields.get(*i).unwrap().into()) - .collect::>(); - let read_schema = Arc::new(daft_core::schema::Schema::new(fields_subset)?); - let read_daft_fields = Arc::new( - read_schema - .fields - .values() - .map(|f| Arc::new(f.clone())) - .collect::>(), - ); - // TODO(desmond): Need better upfront estimators. Sample or keep running count of stats. - let estimated_mean_row_size = 100f64; - let estimated_std_row_size = 20f64; - let record_buffer_size = (estimated_mean_row_size + estimated_std_row_size).ceil() as usize; - let chunk_size_rows = (chunk_size as f64 / record_buffer_size as f64).ceil() as usize; - // TODO(desmond): We don't want to create a per-read buffer pool, we want one pool shared with - // the whole process. - let buffer_pool = CsvBufferPool::new( - record_buffer_size, - schema.fields.len(), - chunk_size_rows, - n_threads * 2, - ); - let chunk_offsets: Vec = (0..=bytes.len()).step_by(chunk_size).collect(); - // TODO(desmond): Memory usage is still growing during execution of a .count(*).collect(), so - // the following approach still isn't quite right. - // TODO(desmond): Also, is this usage of max_chunks_in_flight correct? - let (sender, receiver) = - crossbeam_channel::bounded(max_chunks_in_flight.unwrap_or(n_threads * 2)); - let rows_read = AtomicUsize::new(0); - rayon::spawn(move || { - let bytes = &mmap[..]; - chunk_offsets.into_par_iter().for_each(|start| { - // TODO(desmond): Use try_for_each and terminate early once the limit is reached. - let limit_reached = limit.map_or(false, |limit| { - let current_rows_read = rows_read.load(Ordering::Relaxed); - current_rows_read >= limit - }); - - if !limit_reached && let Some((start, stop)) = get_file_chunk(bytes, start, chunk_size) - { - let buf = buffer_pool.get_buffer(); - let chunk = &bytes[start..stop]; - // Only the first chunk might potentially have headers. Subsequent chunks should - // read all rows as records. - let has_headers = start == 0 && parse_options.has_header; - let rdr = ReaderBuilder::new() - .has_headers(has_headers) - .delimiter(parse_options.delimiter) - .double_quote(parse_options.double_quote) - // TODO(desmond): We need to handle the quoted case properly. - .quote(parse_options.quote) - .escape(parse_options.escape_char) - .comment(parse_options.comment) - .flexible(parse_options.allow_variable_columns) - .from_reader(chunk); - Reader::from_reader(chunk); - let table_results = parse_csv_chunk( - rdr, - projection_indices.clone(), - fields.clone(), - read_daft_fields.clone(), - read_schema.clone(), - buf, - &include_columns, - predicate.clone(), - ); - match table_results { - Ok(tables) => { - for table in tables { - let table_len = table.len(); - sender.send(Ok(table)).unwrap(); - // Atomically update the number of rows read only after the result has - // been sent. In theory we could wrap these steps in a mutex, but - // applying limit at this layer can be best-effort with no adverse - // side effects. - rows_read.fetch_add(table_len, Ordering::Relaxed); - } - } - Err(e) => sender.send(Err(e)).unwrap(), - } - } - }) - }); - - let result_stream = futures::stream::iter(receiver); - Ok(result_stream) -} - -async fn tables_stream_collect(stream: BoxStream<'static, DaftResult
>) -> Vec
{ - stream - .filter_map(|result| async { - match result { - Ok(table) => Some(table), - Err(_) => None, // Skips errors; you could log them or handle differently - } - }) - .collect() - .await -} - -async fn read_csv_local( - uri: &str, - convert_options: Option, - parse_options: CsvParseOptions, - read_options: Option, - max_chunks_in_flight: Option, -) -> DaftResult
{ - let stream = stream_csv_local( - uri, - convert_options, - parse_options, - read_options, - max_chunks_in_flight, - ) - .await?; - tables_concat(tables_stream_collect(Box::pin(stream)).await) -} - async fn read_csv_single_into_table( uri: &str, convert_options: Option, @@ -612,8 +228,7 @@ async fn read_csv_single_into_table( ) -> DaftResult
{ let (source_type, _) = parse_url(uri)?; let is_compressed = CompressionCodec::from_uri(uri).is_some(); - let use_local_reader = false; // TODO(desmond): Feature under dev. - if matches!(source_type, SourceType::File) && !is_compressed && use_local_reader { + if matches!(source_type, SourceType::File) && !is_compressed { return read_csv_local( uri, convert_options, @@ -1054,7 +669,7 @@ fn parse_into_column_array_chunk_stream( })) } -fn fields_to_projection_indices( +pub fn fields_to_projection_indices( fields: &[arrow2::datatypes::Field], include_columns: &Option>, ) -> Arc> {