Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Split parquet scan tasks into individual row groups #1799

Merged
merged 15 commits into from
Feb 8, 2024

Conversation

kevinzwang
Copy link
Member

@kevinzwang kevinzwang commented Jan 19, 2024

Benchmark results

All times averaged over 5 runs.

Single file read

Read one file in S3 using Ray with 4 worker nodes

Parquet file info:

  • number of rows: 18,751,674
  • number of row groups: 18
  • file size: 711.3 MiB
  • number of columns: 16
Split threshold (MiB) # of scan tasks Read time
32 18 3.84s
64 9 4.21s
128 5 5.47s
256 3 4.45s
512 2 6.06s
1024 1 10.51s

Multi-file workflow

Read and aggregate (Dataframe.count()) 32 files in S3 using Ray with 4 worker nodes

Parquet file info:

  • total rows: 600,037,902
  • number of row groups per file: 18
  • file sizes: 710.7-711.5 MiB
  • number of columns: 16
Split threshold (MiB) # of scan tasks per file Time (4 workers) Time (8 workers)
32 18 23.83s 14.17s
64 9 24.99s 14.17s
128 5 26.51s 15.23s
256 3 27.96s 16.58s
512 2 30.27s 16.85s
1024 1 26.50s 29.06s

(Averaged over 5 runs)

@kevinzwang kevinzwang linked an issue Jan 19, 2024 that may be closed by this pull request
3 tasks
@kevinzwang kevinzwang marked this pull request as ready for review January 19, 2024 08:34
vec![DataFileSource::AnonymousDataFile {
path: path.to_string(),
chunk_spec: Some(ChunkSpec::Parquet(vec![rg as i64])),
size_bytes: Some(rgm.compressed_size() as u64),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I interpreted size_bytes as representing the file size, which stores the data compressed, so I'm pretty sure .compressed_size() is the right method, but if it's the size of the data, then we should use .total_byte_size().

@kevinzwang
Copy link
Member Author

The function materialize_scan_task doesn't seem to make use of the row groups when using the Python storage config. Not sure if that's something to worry about.

See: https://github.com/Eventual-Inc/Daft/blob/main/src/daft-micropartition/src/micropartition.rs#L221

Copy link

codecov bot commented Jan 24, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (f471738) 85.47% compared to head (4acd096) 85.47%.

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #1799   +/-   ##
=======================================
  Coverage   85.47%   85.47%           
=======================================
  Files          55       55           
  Lines        6119     6119           
=======================================
  Hits         5230     5230           
  Misses        889      889           
Files Coverage Δ
daft/context.py 72.04% <ø> (ø)

@kevinzwang
Copy link
Member Author

The function materialize_scan_task doesn't seem to make use of the row groups when using the Python storage config. Not sure if that's something to worry about.

See: https://github.com/Eventual-Inc/Daft/blob/main/src/daft-micropartition/src/micropartition.rs#L221

Conclusion of conversation with @samster25: don't do row group splitting when using python reader, since it is a legacy feature that we don't care about performance for

@kevinzwang
Copy link
Member Author

Also let me know if I should write tests for non-local reads.

src/common/daft-config/src/lib.rs Outdated Show resolved Hide resolved
src/daft-scan/src/scan_task_iters.rs Outdated Show resolved Hide resolved
src/daft-scan/src/scan_task_iters.rs Outdated Show resolved Hide resolved
import daft

FILES = [
"tests/assets/parquet-data/mvp.parquet",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add some of the s3 file sources here too. I believe we all these files in s3 as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just removed this test and added a fixture to test_reads_public_data.py

src/common/daft-config/src/lib.rs Show resolved Hide resolved
@kevinzwang kevinzwang requested a review from samster25 January 26, 2024 18:29
daft/context.py Outdated Show resolved Hide resolved
src/common/daft-config/src/lib.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, just have a few nits and questions about the interaction between splitting and merging scan tasks.

src/daft-scan/src/scan_task_iters.rs Outdated Show resolved Hide resolved
src/daft-scan/src/scan_task_iters.rs Outdated Show resolved Hide resolved
src/common/daft-config/src/lib.rs Outdated Show resolved Hide resolved
src/daft-scan/src/scan_task_iters.rs Show resolved Hide resolved
tests/integration/io/parquet/test_reads_public_data.py Outdated Show resolved Hide resolved
src/daft-plan/src/planner.rs Show resolved Hide resolved
src/daft-scan/src/scan_task_iters.rs Outdated Show resolved Hide resolved
@kevinzwang kevinzwang merged commit 8aba872 into main Feb 8, 2024
42 checks passed
@kevinzwang kevinzwang deleted the kevin/split-parquet-row-groups branch February 8, 2024 01:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[PERF] Read large files into smaller partitions
3 participants