Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Split *all* Parquet ScanTasks by default #3454

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

jaychia
Copy link
Contributor

@jaychia jaychia commented Nov 29, 2024

Refactors the Parquet splitting code into Iterators and state machines to make it more extendable.

Functional changes:

  1. Performs Parquet metadata fetches in batches ("windows" of ScanTasks)
  2. Adds a mechanism for determining window sizing for ScanTasks to split (currently defaults to windows of size 1)
  3. Will split all ScanTasks by default instead of just the first 10

Implementation:

  • mark_scan_task_for_split (converts ScanTaskRef -> ScanTaskSplitDecision)
  • retrieve_parquet_metadata (converts ScanTaskSplitDecision -> SplittableScanTaskRef by retrieving Parquet metadata and applying it to the ScanTaskSplitDecision)
  • split_parquet_metadata_by_rowgroup (flattens SplittableScanTaskRef -> ScanTaskRef)

Dogfooding

I ran a synthetic benchmark just doing a .collect() on 10 small Parquet files and 1 large (25GB) Parquet file.

You can run this with our new tooling to run distributed runs (run this script benchmarking/ooms/big_file_reads.py)

uv run tools/gha_run_cluster_job.py benchmarking/ooms/big_file_reads.py --daft-wheel-url=https://github-actions-artifacts-bucket.s3.us-west-2.amazonaws.com/builds/491054b0455307e161012e4d48c9a5198b1334e3/getdaft-0.3.0.dev0-cp38-abi3-manylinux_2_31_x86_64.whl --branch jay/split-all-files -- --enable-optimized-splits

DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING=0 (current behavior): https://github.com/Eventual-Inc/Daft/actions/runs/12193928381/job/34016955806

DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING=1 (feature-flagged behavior): https://github.com/Eventual-Inc/Daft/actions/runs/12193933798/job/34016969107

  • Before: It creates a plan with 2 partitions, one really large one and one really small one. run times out because it can't schedule the large file (our memory estimations makes a really big task) This also means that it will have problems with straggler tasks.
  • After: creates a plan with 256 partitions. Looks like the large file was split by rowgroups into 255 partitions.
    • .collect() takes only 40s to run
    • .explain() takes a little bit longer (about 0.3s) because it needs to fetch metadata of the big file in order to figure out the number of partitions

Using this feature, the workload can actually run and the tasks are split quite nicely:
image

Regressions

Reading our 1TB lineitems table (parquet files of about 500MB each) gives us a good idea of what regressions to expect.

import daft
import time

df = daft.read_parquet("s3://daft-public-datasets/tpch_iceberg_sf1000.db/lineitem/data/**")

start = time.time()
df.explain(True)
explain_time = time.time() - start

start = time.time()
df.limit(1).explain(True)
explain_limit_time = time.time() - start

start = time.time()
df.show()
show_time = time.time() - start

print("Explain", explain_time)
print("Explain limit", explain_limit_time)
print("Show", show_time)

Feature flag off (old behavior):

Explain (500 partitions) 0.8800828456878662
Explain limit (500 partitions) 0.8331000804901123
Show (500 partitions) 23.00035309791565

Feature flag on (new behavior):

Explain (1316 partitions) 15.388989925384521
Explain limit (500 partitions) 0.7838840484619141
Show (500 partitions) 21.284283876419067

Couple of observations:

  1. .show() performance actually doesn't change that much (I realized that our .show() performance on PyRunner is actually quite variable, anywhere between 10s to 20+s
  2. limit().explain() performance doesn't change much either
  3. .explain() performance suffers quite a bit

This is because when there is a limit pushdown, we actually skip the file splitting because our ScanTask size estimation logic per-scan-task accounts for the limit pushdown (and estimates the size of just a few rows). This is actually perhaps a little problematic and we can maybe fix this by adding some limit-specific logic here to truncate this to just 1 ScanTask.

@github-actions github-actions bot added the enhancement New feature or request label Nov 29, 2024
Copy link

codspeed-hq bot commented Nov 29, 2024

CodSpeed Performance Report

Merging #3454 will degrade performances by 32.06%

Comparing jay/split-all-files (bba2bed) with main (063de4d)

Summary

❌ 1 regressions
✅ 26 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main jay/split-all-files Change
test_show[100 Small Files] 15.8 ms 23.3 ms -32.06%

Copy link

codecov bot commented Nov 30, 2024

Codecov Report

Attention: Patch coverage is 89.97821% with 46 lines in your changes missing coverage. Please review.

Project coverage is 77.87%. Comparing base (063de4d) to head (bba2bed).

Files with missing lines Patch % Lines
...scan_task_iters/split_parquet_files_by_rowgroup.rs 89.50% 19 Missing ⚠️
...n/src/scan_task_iters/retrieve_parquet_metadata.rs 92.35% 12 Missing ⚠️
...n/src/scan_task_iters/mark_scan_tasks_for_split.rs 87.93% 7 Missing ⚠️
src/common/daft-config/src/lib.rs 54.54% 5 Missing ⚠️
src/common/daft-config/src/python.rs 57.14% 3 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3454      +/-   ##
==========================================
+ Coverage   77.80%   77.87%   +0.06%     
==========================================
  Files         718      721       +3     
  Lines       88250    88700     +450     
==========================================
+ Hits        68666    69072     +406     
- Misses      19584    19628      +44     
Files with missing lines Coverage Δ
daft/context.py 87.65% <ø> (ø)
src/daft-scan/src/scan_task_iters.rs 97.33% <100.00%> (+0.43%) ⬆️
src/common/daft-config/src/python.rs 68.03% <57.14%> (-0.36%) ⬇️
src/common/daft-config/src/lib.rs 79.74% <54.54%> (-4.32%) ⬇️
...n/src/scan_task_iters/mark_scan_tasks_for_split.rs 87.93% <87.93%> (ø)
...n/src/scan_task_iters/retrieve_parquet_metadata.rs 92.35% <92.35%> (ø)
...scan_task_iters/split_parquet_files_by_rowgroup.rs 89.50% <89.50%> (ø)

... and 2 files with indirect coverage changes

src/daft-scan/src/scan_task_iters.rs Outdated Show resolved Hide resolved
src/daft-scan/src/scan_task_iters.rs Outdated Show resolved Hide resolved
@jaychia jaychia force-pushed the jay/split-all-files branch from 65d6e2b to 63a8814 Compare December 2, 2024 20:34
@jaychia jaychia marked this pull request as draft December 2, 2024 20:35
@jaychia jaychia force-pushed the jay/split-all-files branch from 63a8814 to 4db6d32 Compare December 3, 2024 19:00
@jaychia jaychia changed the title [FEAT] Change Parquet file splitting logic to split all files [CHORE] Refactors Parquet ScanTask splitting logic into iterators Dec 5, 2024
@jaychia jaychia changed the title [CHORE] Refactors Parquet ScanTask splitting logic into iterators [PERF] Split *all* Parquet ScanTasks by default Dec 5, 2024
@jaychia jaychia requested a review from samster25 December 5, 2024 02:42
@jaychia jaychia marked this pull request as ready for review December 5, 2024 02:42
@jaychia
Copy link
Contributor Author

jaychia commented Dec 5, 2024

image

Performance on .explain fell quite a bit. Looks like apparently ScanTask::estimate_in_memory_size_bytes is pretty expensive?

@samster25 samster25 requested review from samster25 and desmondcheongzx and removed request for samster25 December 5, 2024 22:48
@jaychia jaychia changed the title [PERF] Split *all* Parquet ScanTasks by default feat: Split *all* Parquet ScanTasks by default Dec 5, 2024
@github-actions github-actions bot added feat and removed chore labels Dec 5, 2024
@jaychia jaychia removed the request for review from desmondcheongzx December 5, 2024 22:59
@jaychia
Copy link
Contributor Author

jaychia commented Dec 5, 2024

Performance on .explain fell quite a bit. Looks like apparently ScanTask::estimate_in_memory_size_bytes is pretty expensive?

This has been fixed in commit: 8c527e8

@jaychia jaychia requested a review from samster25 December 6, 2024 00:43
@jaychia jaychia force-pushed the jay/split-all-files branch 5 times, most recently from 450b35a to 7c56994 Compare December 6, 2024 23:10
Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I would decompose your functionality into separate modules that can composed together rather than these mega matches.

@@ -351,6 +351,7 @@ def set_execution_config(
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
enable_ray_tracing: bool | None = None,
enable_aggressive_scantask_splitting: bool | None = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be cleaner to set a level instead. maybe [0, 1, 2] and have the default be 1?

scantask_split_level=...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the levels entail?

}
}

/// TODO: Deprecate this method in favor of `get_io_config`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried adding this, but it fails to compile if I do. Have to investigate more to see if I can make this work, but doesn't seem like we do this anywhere else in our codebase?

@@ -23,6 +23,16 @@ pub enum StorageConfig {
}

impl StorageConfig {
/// TODO: Arc the inner IOConfig on each StorageConfig instead to avoid needing to Arc here
pub fn get_io_config(&self) -> Arc<IOConfig> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me really sad :(

What if instead you return a reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't make it work because we have a bunch of code that requires Arc<IOConfig> (specifically I think get_io_client uses that), so if I return a reference I end up needing to clone/arc it somewhere upstream anyways...

# Small ones should not be split, large ones should be split into 10 rowgroups each
# This gives us a total of 200 + 20 scantasks

# Write 20 large files into tmpdir
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sounds expensive. Can you make these integration io tests?

Copy link
Contributor Author

@jaychia jaychia Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually quite fast here, the files are small -- about 1s in total for these tests. Do you still want them to be marked as integration?

@jaychia jaychia force-pushed the jay/split-all-files branch 2 times, most recently from 4e89f55 to 55b471b Compare December 12, 2024 09:12
@jaychia jaychia requested a review from samster25 December 12, 2024 09:20
///
/// This will transform each [`NeedsParquetMetadata`] into its specified [`NeedsParquetMetadata::WithMetadata`] by
/// performing batched Parquet metadata retrieval whenever [`NeedsParquetMetadata::should_fetch`] is ``true``.
pub fn batched_parquet_metadata_retrieval<T>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would favor builders on the struct rather than these global functions.

FetchParquetMetadataByWindows::new(inputs, max_retrieval_batch_size)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping for each module to have 1 global function as the external facing API to keep this consistent.

Note that the other two modules decide_scantask_split_by_rowgroups.rs and split_parquet_files_by_rowgroup.rs don't have an iterator Struct, and so the API might be slightly inconsistent across these modules if I were to make this change.

If you're still in favor of the change I can change only retrieve_parquet_metadata.rs to expose the Struct and ::new instead?

.await
}))
})
.collect::<DaftResult<Vec<_>>>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think you need to collect everything to do try_join_all!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I'm having here is that parquet_metadata_futures is iterating over a Result<future> because it calls get_io_client which is a fallible operation.

This collect lets me collect those results, early-terminate on them with a ? and produce a Vec<future> that can now be passed to try_join_all.

@jaychia jaychia requested a review from samster25 December 14, 2024 05:42
Refactor into accumulator struct

Rename

Further simplification of accumulator logic

Cleanup into separate accumulator and accumulator context

Account for potentially null TableMetadata

Refactor into Iterator

Refactor into state machine

Convert Parquet file iterator to state machine as well

small cleanup

Reorganization into a separate module

Cleanup to extend this easier for using catalog information

Perform 16 Parquet metadata fetches in parallel

perf: reduce calls to ScanTask::estimate_in_memory_size

Adds unit test

Adds more unit tests

Add feature flag DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING

Add a benchmarking script

Trigger data materialization in benchmark

Refactors to ParquetFileSplitter to not use state machine

Big refactor to split into multiple files and iterators

Add better docs

Refactor splitter code

nit naming

Refactor Fetchable

reordering for readability

Simplify State logic for FetchParquetMetadataByWindows

impl IntoIterator for SplittableScanTaskRef by propagating the config ref

docstrings

Removed advance_state for more explicit naming

Remove trait
@jaychia jaychia force-pushed the jay/split-all-files branch from 6789287 to bba2bed Compare December 19, 2024 08:05
@jaychia jaychia marked this pull request as draft December 20, 2024 08:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants