Skip to content

Commit

Permalink
both are slabs yay (andrew stash)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Oct 8, 2024
1 parent 7c7bf14 commit 1e0022c
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 167 deletions.
266 changes: 142 additions & 124 deletions src/daft-csv/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io::{Chain, Cursor, Read};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{num::NonZeroUsize, sync::Arc, sync::Condvar, sync::Mutex};

use crate::local::pool::{read_slabs_windowed, FileSlab, SlabPool};
use crate::local::pool::{read_slabs_windowed, CsvSlabData, FileSlabData, Slab, SlabPool};
use crate::ArrowSnafu;
use crate::{CsvConvertOptions, CsvParseOptions, CsvReadOptions};
use arrow2::{
Expand Down Expand Up @@ -59,68 +59,68 @@ use crate::read::{fields_to_projection_indices, tables_concat};
/// │ │
/// └──────────┘
/// A pool of ByteRecord slabs. Used for deserializing CSV.
#[derive(Debug)]
struct CsvBufferPool {
buffers: Mutex<Vec<Vec<read::ByteRecord>>>,
buffer_size: usize,
record_buffer_size: usize,
num_fields: usize,
}

/// A slab of ByteRecords. Used for deserializing CSV.
struct CsvBuffer {
pool: Arc<CsvBufferPool>,
buffer: Vec<read::ByteRecord>,
}

impl CsvBufferPool {
pub fn new(
record_buffer_size: usize,
num_fields: usize,
chunk_size_rows: usize,
initial_pool_size: usize,
) -> Self {
let chunk_buffers = vec![
vec![
read::ByteRecord::with_capacity(record_buffer_size, num_fields);
chunk_size_rows
];
initial_pool_size
];
CsvBufferPool {
buffers: Mutex::new(chunk_buffers),
buffer_size: chunk_size_rows,
record_buffer_size,
num_fields,
}
}

pub fn get_buffer(self: &Arc<Self>) -> CsvBuffer {
let mut buffers = self.buffers.lock().unwrap();
let buffer = buffers.pop();
let buffer = match buffer {
Some(buffer) => buffer,
None => {
println!("csv buf empty");
vec![
read::ByteRecord::with_capacity(self.record_buffer_size, self.num_fields);
self.buffer_size
]
}
};

CsvBuffer {
pool: Arc::clone(self),
buffer,
}
}

fn return_buffer(&self, buffer: Vec<read::ByteRecord>) {
let mut buffers = self.buffers.lock().unwrap();
buffers.push(buffer);
}
}
// /// A pool of ByteRecord slabs. Used for deserializing CSV.
// #[derive(Debug)]
// struct CsvBufferPool {
// buffers: Mutex<Vec<Vec<read::ByteRecord>>>,
// buffer_size: usize,
// record_buffer_size: usize,
// num_fields: usize,
// }

// /// A slab of ByteRecords. Used for deserializing CSV.
// struct CsvBuffer {
// pool: Arc<CsvBufferPool>,
// buffer: Vec<read::ByteRecord>,
// }
//
// impl CsvBufferPool {
// pub fn new(
// record_buffer_size: usize,
// num_fields: usize,
// chunk_size_rows: usize,
// initial_pool_size: usize,
// ) -> Self {
// let chunk_buffers = vec![
// vec![
// read::ByteRecord::with_capacity(record_buffer_size, num_fields);
// chunk_size_rows
// ];
// initial_pool_size
// ];
// CsvBufferPool {
// buffers: Mutex::new(chunk_buffers),
// buffer_size: chunk_size_rows,
// record_buffer_size,
// num_fields,
// }
// }
//
// pub fn get_buffer(self: &Arc<Self>) -> CsvBuffer {
// let mut buffers = self.buffers.lock().unwrap();
// let buffer = buffers.pop();
// let buffer = match buffer {
// Some(buffer) => buffer,
// None => {
// println!("csv buf empty");
// vec![
// read::ByteRecord::with_capacity(self.record_buffer_size, self.num_fields);
// self.buffer_size
// ]
// }
// };
//
// CsvBuffer {
// pool: Arc::clone(self),
// buffer,
// }
// }
//
// fn return_buffer(&self, buffer: Vec<read::ByteRecord>) {
// let mut buffers = self.buffers.lock().unwrap();
// buffers.push(buffer);
// }
// }

// The default size of a slab used for reading CSV files in chunks. Currently set to 4MB.
const SLAB_SIZE: usize = 4 * 1024 * 1024;
Expand All @@ -141,7 +141,7 @@ pub async fn read_csv_local(
read_options,
max_chunks_in_flight,
)
.await?;
.await?;
tables_concat(tables_stream_collect(Box::pin(stream)).await)
}

Expand All @@ -163,10 +163,13 @@ pub async fn stream_csv_local(
parse_options: CsvParseOptions,
read_options: Option<CsvReadOptions>,
max_chunks_in_flight: Option<usize>,
) -> DaftResult<impl Stream<Item = DaftResult<Table>> + Send> {
) -> DaftResult<impl Stream<Item=DaftResult<Table>> + Send> {
let uri = uri.trim_start_matches("file://");
let file = tokio::fs::File::open(uri).await?;


println!("convert_options is NOne? ... {}", convert_options.is_none());

// TODO(desmond): This logic is repeated multiple times in the csv reader files. Should dedup.
let predicate = convert_options
.as_ref()
Expand Down Expand Up @@ -195,7 +198,8 @@ pub async fn stream_csv_local(
Some(co)
}
}
.unwrap_or_default();
.unwrap_or_default();

// End of `should dedup`.

// TODO(desmond): We should do better schema inference here.
Expand Down Expand Up @@ -231,22 +235,32 @@ pub async fn stream_csv_local(
let chunk_size_rows = (chunk_size as f64 / record_buffer_size as f64).ceil() as usize;
let num_fields = schema.fields.len();
// TODO(desmond): We might consider creating per-process buffer pools and slab pools.
let buffer_pool = Arc::new(CsvBufferPool::new(
record_buffer_size,
num_fields,
chunk_size_rows,
n_threads * 2,
));


let initial_pool_size = n_threads * 2;
let csv_slabs = vec![
vec![
read::ByteRecord::with_capacity(record_buffer_size, num_fields);
chunk_size_rows
];
initial_pool_size
];


let buffer_pool = SlabPool::new(
csv_slabs,
);

// We suppose that each slab of CSV data produces (chunk size / slab size) number of Daft tables. We
// then double this capacity to ensure that our channel is never full and our threads won't deadlock.
let (sender, receiver) =
crossbeam_channel::bounded(max_chunks_in_flight.unwrap_or(2 * chunk_size / SLAB_SIZE));

let total_len = file.metadata().unwrap().len() as usize;
let total_len = file.metadata().await.unwrap().len() as usize;
let windowed_buffers =
read_slabs_windowed(file, vec![vec![0; SLAB_SIZE]; SLAB_POOL_DEFAULT_SIZE]);
rayon::spawn(move || {
// todo: await
consume_csv_file(
buffer_pool,
windowed_buffers,
Expand All @@ -270,8 +284,8 @@ pub async fn stream_csv_local(
/// Consumes the CSV file and sends the results to `sender`.
#[allow(clippy::too_many_arguments)]
async fn consume_csv_file(
buffer_pool: Arc<CsvBufferPool>,
window_stream: impl Stream<Item = WindowedSlab>,
mut buffer_pool: SlabPool<CsvSlabData>,
mut window_stream: impl Stream<Item=WindowedSlab> + Unpin,
total_len: usize,
parse_options: CsvParseOptions,
projection_indices: Arc<Vec<usize>>,
Expand All @@ -291,8 +305,6 @@ async fn consume_csv_file(
let quote_char = parse_options.quote;
let double_quote_escape_allowed = parse_options.double_quote;
let mut total_bytes_read = 0;
let mut next_slab = None;
let mut next_buffer_len = 0;
let mut is_first_buffer = true;
loop {
let limit_reached = limit.map_or(false, |limit| {
Expand All @@ -302,28 +314,39 @@ async fn consume_csv_file(
if limit_reached {
break;
}
let window: WindowedSlab = window_stream.next().await?;
let first_buffer = &window[0];
let second_buffer = window.get(1).map(|slab| &****slab);

let file_chunk = get_file_chunk(
first_buffer,
second_buffer,
is_first_buffer,
num_fields,
quote_char,
field_delimiter,
escape_char,
double_quote_escape_allowed,
);

let Some(window) = window_stream.next().await else {
// todo: probably right (we really think so)
break;
};

let first_buffer = window.get(0).unwrap().clone();
let second_buffer = window.get(1);

let file_chunk = {
let second_buffer = second_buffer.map(|slab| &****slab);
get_file_chunk(
&first_buffer,
second_buffer,
is_first_buffer,
num_fields,
quote_char,
field_delimiter,
escape_char,
double_quote_escape_allowed,
)
};

let second_buffer = second_buffer.cloned();

is_first_buffer = false;
if let (None, _) = file_chunk {
// Exit early before spawning a new thread.
// TODO(desmond): we should fallback instead.
break;
}
let parse_options = parse_options.clone();
let csv_buffer = buffer_pool.get_buffer();
let csv_slab = buffer_pool.get_next_data().await;
let projection_indices = projection_indices.clone();
let fields = fields.clone();
let read_daft_fields = read_daft_fields.clone();
Expand All @@ -349,39 +372,35 @@ async fn consume_csv_file(
fields,
read_daft_fields,
read_schema,
csv_buffer,
csv_slab,
&include_columns,
predicate,
sender,
rows_read,
);
}
(Some(start), Some(end)) => {
if let Some(next_slab_clone) = next_slab_clone
&& let Some(current_buffer) = &current_slab_clone.buffer
&& let Some(next_buffer) = &next_slab_clone.buffer
{
let buffer_source = BufferSource::Chain(std::io::Read::chain(
Cursor::new(&current_buffer[start..current_buffer_len]),
Cursor::new(&next_buffer[..end]),
));
dispatch_to_parse_csv(
has_header,
parse_options,
buffer_source,
projection_indices,
fields,
read_daft_fields,
read_schema,
csv_buffer,
&include_columns,
predicate,
sender,
rows_read,
);
} else {
panic!("Trying to read from an overflow CSV buffer that doesn't exist. Please report this issue.")
}
let first_buffer = Cursor::new(&first_buffer[start..]);

let second_buffer = second_buffer.unwrap();
let second_buffer = Cursor::new(&second_buffer[..end]);

let reader = std::io::Read::chain(first_buffer, second_buffer);

dispatch_to_parse_csv(
has_header,
parse_options,
reader,
projection_indices,
fields,
read_daft_fields,
read_schema,
csv_slab,
&include_columns,
predicate,
sender,
rows_read,
);
}
_ => panic!(
"Something went wrong when parsing the CSV file. Please report this issue."
Expand Down Expand Up @@ -671,12 +690,12 @@ fn validate_csv_record(
fn dispatch_to_parse_csv(
has_header: bool,
parse_options: CsvParseOptions,
buffer_source: BufferSource,
buffer_source: impl Read,
projection_indices: Arc<Vec<usize>>,
fields: Vec<Field>,
read_daft_fields: Arc<Vec<Arc<daft_core::datatypes::Field>>>,
read_schema: Arc<Schema>,
csv_buffer: CsvBuffer,
csv_buffer: Slab<CsvSlabData>,
include_columns: &Option<Vec<String>>,
predicate: Option<Arc<Expr>>,
sender: Sender<Result<Table, DaftError>>,
Expand Down Expand Up @@ -727,19 +746,19 @@ fn parse_csv_chunk<R>(
fields: Vec<arrow2::datatypes::Field>,
read_daft_fields: Arc<Vec<Arc<daft_core::datatypes::Field>>>,
read_schema: Arc<Schema>,
csv_buffer: CsvBuffer,
mut csv_buffer: Slab<CsvSlabData>,
include_columns: &Option<Vec<String>>,
predicate: Option<Arc<Expr>>,
) -> DaftResult<Vec<Table>>
where
R: std::io::Read,
{
let mut chunk_buffer = csv_buffer.buffer;
let mut chunk_buffer = &mut *csv_buffer;
let mut tables = vec![];
loop {
//let time = Instant::now();
let (rows_read, has_more) =
local_read_rows(&mut reader, chunk_buffer.as_mut_slice()).context(ArrowSnafu {})?;
local_read_rows(&mut reader, chunk_buffer).context(ArrowSnafu {})?;
//let time = Instant::now();
let chunk = projection_indices
.par_iter()
Expand Down Expand Up @@ -777,6 +796,5 @@ where
break;
}
}
csv_buffer.pool.return_buffer(chunk_buffer);
Ok(tables)
}
Loading

0 comments on commit 1e0022c

Please sign in to comment.