-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Split *all* Parquet ScanTasks by default #3454
base: main
Are you sure you want to change the base?
Conversation
CodSpeed Performance ReportMerging #3454 will degrade performances by 32.06%Comparing Summary
Benchmarks breakdown
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3454 +/- ##
==========================================
+ Coverage 77.80% 77.87% +0.06%
==========================================
Files 718 721 +3
Lines 88250 88700 +450
==========================================
+ Hits 68666 69072 +406
- Misses 19584 19628 +44
|
65d6e2b
to
63a8814
Compare
63a8814
to
4db6d32
Compare
This has been fixed in commit: 8c527e8 |
450b35a
to
7c56994
Compare
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I would decompose your functionality into separate modules that can composed together rather than these mega matches.
@@ -351,6 +351,7 @@ def set_execution_config( | |||
shuffle_algorithm: str | None = None, | |||
pre_shuffle_merge_threshold: int | None = None, | |||
enable_ray_tracing: bool | None = None, | |||
enable_aggressive_scantask_splitting: bool | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be cleaner to set a level instead. maybe [0, 1, 2]
and have the default be 1?
scantask_split_level=...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would the levels entail?
src/daft-scan/src/storage_config.rs
Outdated
} | ||
} | ||
|
||
/// TODO: Deprecate this method in favor of `get_io_config` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
favor real depreciation warnings over comments,
https://doc.rust-lang.org/reference/attributes/diagnostics.html#the-deprecated-attribute
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried adding this, but it fails to compile if I do. Have to investigate more to see if I can make this work, but doesn't seem like we do this anywhere else in our codebase?
src/daft-scan/src/storage_config.rs
Outdated
@@ -23,6 +23,16 @@ pub enum StorageConfig { | |||
} | |||
|
|||
impl StorageConfig { | |||
/// TODO: Arc the inner IOConfig on each StorageConfig instead to avoid needing to Arc here | |||
pub fn get_io_config(&self) -> Arc<IOConfig> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me really sad :(
What if instead you return a reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't make it work because we have a bunch of code that requires Arc<IOConfig>
(specifically I think get_io_client
uses that), so if I return a reference I end up needing to clone/arc it somewhere upstream anyways...
# Small ones should not be split, large ones should be split into 10 rowgroups each | ||
# This gives us a total of 200 + 20 scantasks | ||
|
||
# Write 20 large files into tmpdir |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sounds expensive. Can you make these integration io tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually quite fast here, the files are small -- about 1s
in total for these tests. Do you still want them to be marked as integration?
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
4e89f55
to
55b471b
Compare
/// | ||
/// This will transform each [`NeedsParquetMetadata`] into its specified [`NeedsParquetMetadata::WithMetadata`] by | ||
/// performing batched Parquet metadata retrieval whenever [`NeedsParquetMetadata::should_fetch`] is ``true``. | ||
pub fn batched_parquet_metadata_retrieval<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would favor builders on the struct rather than these global functions.
FetchParquetMetadataByWindows::new(inputs, max_retrieval_batch_size)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping for each module to have 1 global function as the external facing API to keep this consistent.
Note that the other two modules decide_scantask_split_by_rowgroups.rs
and split_parquet_files_by_rowgroup.rs
don't have an iterator Struct
, and so the API might be slightly inconsistent across these modules if I were to make this change.
If you're still in favor of the change I can change only retrieve_parquet_metadata.rs
to expose the Struct and ::new instead?
.await | ||
})) | ||
}) | ||
.collect::<DaftResult<Vec<_>>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think you need to collect everything to do try_join_all!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem I'm having here is that parquet_metadata_futures
is iterating over a Result<future>
because it calls get_io_client
which is a fallible operation.
This collect lets me collect those results, early-terminate on them with a ?
and produce a Vec<future>
that can now be passed to try_join_all
.
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs
Outdated
Show resolved
Hide resolved
Refactor into accumulator struct Rename Further simplification of accumulator logic Cleanup into separate accumulator and accumulator context Account for potentially null TableMetadata Refactor into Iterator Refactor into state machine Convert Parquet file iterator to state machine as well small cleanup Reorganization into a separate module Cleanup to extend this easier for using catalog information Perform 16 Parquet metadata fetches in parallel perf: reduce calls to ScanTask::estimate_in_memory_size Adds unit test Adds more unit tests Add feature flag DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING Add a benchmarking script Trigger data materialization in benchmark Refactors to ParquetFileSplitter to not use state machine Big refactor to split into multiple files and iterators Add better docs Refactor splitter code nit naming Refactor Fetchable reordering for readability Simplify State logic for FetchParquetMetadataByWindows impl IntoIterator for SplittableScanTaskRef by propagating the config ref docstrings Removed advance_state for more explicit naming Remove trait
6789287
to
bba2bed
Compare
Refactors the Parquet splitting code into Iterators and state machines to make it more extendable.
Functional changes:
Implementation:
mark_scan_task_for_split
(convertsScanTaskRef
->ScanTaskSplitDecision
)retrieve_parquet_metadata
(convertsScanTaskSplitDecision
->SplittableScanTaskRef
by retrieving Parquet metadata and applying it to theScanTaskSplitDecision
)split_parquet_metadata_by_rowgroup
(flattens SplittableScanTaskRef -> ScanTaskRef)Dogfooding
I ran a synthetic benchmark just doing a
.collect()
on 10 small Parquet files and 1 large (25GB) Parquet file.You can run this with our new tooling to run distributed runs (run this script
benchmarking/ooms/big_file_reads.py
)DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING=0
(current behavior): https://github.com/Eventual-Inc/Daft/actions/runs/12193928381/job/34016955806DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING=1
(feature-flagged behavior): https://github.com/Eventual-Inc/Daft/actions/runs/12193933798/job/34016969107.collect()
takes only 40s to run.explain()
takes a little bit longer (about 0.3s) because it needs to fetch metadata of the big file in order to figure out the number of partitionsUsing this feature, the workload can actually run and the tasks are split quite nicely:
Regressions
Reading our 1TB lineitems table (parquet files of about
500MB
each) gives us a good idea of what regressions to expect.Feature flag off (old behavior):
Feature flag on (new behavior):
Couple of observations:
.show()
performance actually doesn't change that much (I realized that our.show()
performance on PyRunner is actually quite variable, anywhere between 10s to 20+slimit().explain()
performance doesn't change much either.explain()
performance suffers quite a bitThis is because when there is a limit pushdown, we actually skip the file splitting because our ScanTask size estimation logic per-scan-task accounts for the limit pushdown (and estimates the size of just a few rows). This is actually perhaps a little problematic and we can maybe fix this by adding some limit-specific logic here to truncate this to just 1 ScanTask.