-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PERF] Add a parallel local CSV reader #3055
[PERF] Add a parallel local CSV reader #3055
Conversation
CodSpeed Performance ReportMerging #3055 will not alter performanceComparing Summary
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @desmondcheongzx!! Finally through :)
Just some minor feedback and questions but should be good to merge after
include_columns: Option<Vec<String>>, | ||
predicate: Option<Arc<Expr>>, | ||
limit: Option<usize>, | ||
) -> DaftResult<Vec<Table>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could technically return an Iterator of Tables. so we can start emiting out smaller chunks of tables as we're still processing the Chunk. But I'm going to say it's out of scope for this PR 🥹
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha I did give it a shot and got a table iterator. What I didn't get to was
start emiting out smaller chunks of tables as we're still processing the Chunk
so it kinda just sat around until someone called .next()
on it. 2x slowdown. I think you're right, let's punt it.
} | ||
} | ||
|
||
const NEWLINE: u8 = b'\n'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be user configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, Daft's read_csv
API currently does not take in non-\n record terminators. I added a TODO as a comment as a reminder that we should add this option to the API and pass it down here.
The local CSV reader currently makes upfront buffer allocations (80 MiB for file slabs and 80 MiB for CSV buffers). This unnecessarily blows up the read time for small CSV files which don't use so many buffers. Since the local CSV reader allocates additional buffers as needed, we can remove all upfront allocations without affecting anything else in the implementation of the reader. This speeds up reads of small files. At the same time, I benchmarked the performance of the reader against the test case described in #3055 and found no consistent slowdown without upfront comparisons.
Adds a parallel CSV reader to speed up ingestion of CSV. This CSV reader is used for local and uncompressed CSV files.
The approach in this PR adapts some ideas laid out in [1], namely the idea of adjusting chunks of the CSV file with its neighbouring chunks so that each chunk contains whole CSV records that can be decoded in parallel.
However, as it turns out, the majority of performance gains came from the use of buffer pools to minimize memory allocations.
Some performance numbers
We consider a simple case of reading and performing
.collect()
on a CSV file with 10^8 rows of 9 fields: 3 string fields, 5 int64 fields, and 1 double field. This file is roughly 5GB in size.This represents a roughly 12x speedup on CSV reads for the native executor.
Followups
Unfortunately, with this new CSV reader, the new native csv reader no longer shows a small and stable amount of memory usage during an aggregation. Memray shows a steady increase in resident set size that disappears once the aggregation completes. For what it's worth, parsing the whole CSV file using this reader then dumping the results does not result in any memory increase. So it's possible we're simply not passing results to downstream consumers in a way that they expect.
[1]: Ge, Chang et al. “Speculative Distributed CSV Data Parsing for Big Data Analytics.” Proceedings of the 2019 International Conference on Management of Data (2019).