-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PERF] Add a parallel local CSV reader #2772
[PERF] Add a parallel local CSV reader #2772
Conversation
CodSpeed Performance ReportMerging #2772 will degrade performances by 33.78%Comparing Summary
Benchmarks breakdown
|
src/daft-csv/src/read.rs
Outdated
fn next_line_position(input: &[u8]) -> Option<usize> { | ||
// Assuming we are searching for the ASCII `\n` character, we don't need to do any special | ||
// handling for UTF-8, since a `\n` value always corresponds to an ASCII `\n`. | ||
// For more details, see: https://en.wikipedia.org/wiki/UTF-8#Encoding | ||
memchr::memchr(NEWLINE, input) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this break on quoted columns with a newline in them?
c0,c1,quoted_column
1,true,"this is \n quoted"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's correct. I left some followup TODOs below, but we haven't handled this case yet.
Some ideas we tossed around:
- Do speculative parsing (assume each chunk could start in quoted or unquoted case, then prune the possibility as we go).
- Check if we're in a file without quoted fields/let users specify this. In the happy path without quoted fields we can just rip through it. If not use speculative parsing.
- We can even imagine other edge cases that we might not handle correctly. In those cases simply fallback to an existing mature csv reader.
src/daft-csv/src/read.rs
Outdated
// TODO(desmond): Need better upfront estimators. Sample or keep running count of stats. | ||
let estimated_mean_row_size = 100f64; | ||
let estimated_std_row_size = 20f64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should be able to do something similar to what I did in JSON here to get the estimated sizes
src/daft-csv/src/read.rs
Outdated
// the following approach still isn't quite right. | ||
// TODO(desmond): Also, is this usage of max_chunks_in_flight correct? | ||
let (sender, receiver) = | ||
crossbeam_channel::bounded(max_chunks_in_flight.unwrap_or(n_threads * 2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might deadlock here, e.g. if your channel is full and all threads are trying to send chunks, they will all block. For parquet, we capped it to expected_num_chunks
, i.e. the total amount of chunks that will ever be read. See: https://github.com/Eventual-Inc/Daft/pull/2620/files#diff-f65cce5db7fd92cf9fe1cc38ebd495165b7ac59444a91f0c93c4c4c027089429R392-R400
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh and also (maybe you have this as a todo but just in case), the ordering should also be preserved, (for the streaming executor, there's a maintain_order
flag that indicates if order needs to be preserved, but the non-streaming one requires order). I don't think into_par_iter.for_each
will preserve it but correct me if im wrong (I believe it only preserves order if it ends in a collect rayon-rs/rayon#551)
upper_case_acronyms
This branch is a little borked. Keeping a record of it but reopening the PR at #3055 |
Adds a parallel CSV reader to speed up ingestion of CSV. The approach adapts some ideas laid out in [1], but the majority of performance gains came from the use of buffer pools to minimize memory allocations.
Some performance numbers
We consider a simple case of reading and performing
.collect()
on a CSV file with 10^8 rows of 9 fields: 3 string fields, 5 int64 fields, and 1 double field. This file is roughly 5GB in size.This represents a roughly 10x speedup on CSV reads for the native executor.
Followup work
[1]: Ge, Chang et al. “Speculative Distributed CSV Data Parsing for Big Data Analytics.” Proceedings of the 2019 International Conference on Management of Data (2019).