Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add streaming + parallel CSV reader, with decompression support. #1501

Merged
merged 12 commits into from
Oct 20, 2023

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Oct 18, 2023

This PR adds streaming + parallel CSV reading and parsing, along with support for streaming decompression. In particular, this PR:

  • Adds support for streaming decompression for brotli, bz, deflate, gzip, lzma, xz, zlib, and zstd.
  • Performs chunk-based streaming CSV reads, filling up a small buffer of unparsed records.
  • Pipelines chunk-based CSV parsing with reading by spawning Tokio + rayon parsing tasks.
  • Performances chunk parsing, as well as column parsing within a chunk, in parallel on the rayon threadpool.
  • Changes schema inference to involve an (at most) 1 MiB file peak rather than a full file read.
  • Gathers a mean row size in bytes estimate during schema inference and propagates this estimate back to the reader.
  • Unifies local and cloud reads + schema inference.
  • Adds thorough Rust-side local + cloud test coverage.

The streaming + parallel reading + parsing leads to a 4-8x speed up over the pyarrow reader and the previous non-parallel reader when benchmarking large file (~1 GB) reads, while also resulting in lower memory utilization due to the streaming reading + parsing.

TODOs (follow-up PRs)

  • Add snappy decompression support (need to essentially do something like this)

@github-actions github-actions bot added the enhancement New feature or request label Oct 18, 2023
@clarkzinzow clarkzinzow changed the title [FEAT] Add streaming + parallel CSV reader. [FEAT] Add streaming + parallel CSV reader, with decompression support. Oct 18, 2023
@codecov
Copy link

codecov bot commented Oct 18, 2023

Codecov Report

Merging #1501 (df97d66) into main (bdd2128) will increase coverage by 0.01%.
Report is 1 commits behind head on main.
The diff coverage is 100.00%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1501      +/-   ##
==========================================
+ Coverage   74.74%   74.76%   +0.01%     
==========================================
  Files          60       60              
  Lines        6118     6130      +12     
==========================================
+ Hits         4573     4583      +10     
- Misses       1545     1547       +2     
Files Coverage Δ
daft/execution/execution_step.py 92.82% <ø> (ø)
daft/io/_csv.py 95.00% <100.00%> (ø)
daft/runners/partitioning.py 80.70% <100.00%> (+0.34%) ⬆️
daft/table/table.py 81.94% <ø> (ø)
daft/table/table_io.py 95.83% <ø> (-0.70%) ⬇️

... and 7 files with indirect coverage changes

@clarkzinzow clarkzinzow force-pushed the clark/streaming-parallel-csv-read branch from c29cbbe to 46ec9ef Compare October 18, 2023 20:03
Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! And you were right haha, I think we should probably pull out the estimated rows commit until after we do the MicroPartition and scan operator work.

daft/table/schema_inference.py Outdated Show resolved Hide resolved
src/daft-csv/src/metadata.rs Outdated Show resolved Hide resolved
// default to Utf8 for conflicting datatypes (e.g bool and int)
DataType::Utf8
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any merging we have to do the the temporal types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming that we'll have a variety of follow-ups there, yes.

schema,
// Default buffer size of 512 KiB.
buffer_size.unwrap_or(512 * 1024),
// Default chunk size of 64 KiB.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these constants make sense given its a local buffered file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a TODO to benchmark locally and tweak these, but I'm assuming that tweaking these won't matter as much for local reads as they do for cloud reads. I can look into tweaking these if you'd like!

src/daft-csv/src/read.rs Outdated Show resolved Hide resolved
@clarkzinzow
Copy link
Contributor Author

@samster25 I reverted the estimated row size piping from schema inference, added a good bit more test coverage, and addressed your primary review comments, PTAL!

Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! However we should also test everything through the python side as well via Dataframe tests if that isn't already done!

src/daft-csv/src/read.rs Show resolved Hide resolved
src/daft-csv/src/read.rs Show resolved Hide resolved
daft/runners/partitioning.py Outdated Show resolved Hide resolved
@clarkzinzow clarkzinzow merged commit ad829c9 into main Oct 20, 2023
24 checks passed
@clarkzinzow clarkzinzow deleted the clark/streaming-parallel-csv-read branch October 20, 2023 01:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants