Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(shuffles): Incrementally retrieve metadata in reduce #3545

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Dec 11, 2024

Incrementally retrieve partition and metadata as fanouts are completed, instead of retrieving only after all are completed. This drastically speeds up large partition shuffles.

Before, we would see pauses between the map and reduce phase. (example below is a 1000x 1000x 100mb shuffle)
Screenshot 2024-12-10 at 5 14 09 PM

Now:
Screenshot 2024-12-10 at 5 14 42 PM

@github-actions github-actions bot added the perf label Dec 11, 2024
Copy link

codspeed-hq bot commented Dec 11, 2024

CodSpeed Performance Report

Merging #3545 will degrade performances by 38.37%

Comparing colin/incremental-metadata-retrieval (8ab38ea) with main (e5ea73f)

Summary

⚡ 1 improvements
❌ 2 regressions
✅ 14 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main colin/incremental-metadata-retrieval Change
test_count[1 Small File] 4.1 ms 3.6 ms +12.04%
test_iter_rows_first_row[100 Small Files] 191.2 ms 219.8 ms -13.02%
test_show[100 Small Files] 16.1 ms 26.2 ms -38.37%

Copy link

codecov bot commented Dec 11, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 77.70%. Comparing base (ba46d07) to head (8ab38ea).
Report is 3 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3545      +/-   ##
==========================================
+ Coverage   77.54%   77.70%   +0.15%     
==========================================
  Files         709      710       +1     
  Lines       86286    86909     +623     
==========================================
+ Hits        66912    67531     +619     
- Misses      19374    19378       +4     
Files with missing lines Coverage Δ
daft/execution/physical_plan.py 94.44% <100.00%> (+0.09%) ⬆️

... and 8 files with indirect coverage changes

@colin-ho colin-ho requested a review from jaychia December 12, 2024 08:15
@jaychia
Copy link
Contributor

jaychia commented Dec 12, 2024

Interesting. One suggestion I have is to maybe see if we can use ray.wait([...metadatas], fetch_local=True) (see: https://docs.ray.io/en/latest/ray-core/api/doc/ray.wait.html)

I wonder if this might simplify the logic here, and it might also fix some interesting OOM issues that I was observing wrt the workers that are holding the metadata objectrefs dying due to OOM.

Could we try that and see if it gives us the same effect? We have to figure out where is appropriate to call this on the metadata objectrefs though. I would look into _await_tasks on our RayRunner.

@jaychia
Copy link
Contributor

jaychia commented Dec 12, 2024

Another idea... Looks like these metadatas are being retrieved using a Ray remote function:

@ray.remote
def get_metas(*partitions: MicroPartition) -> list[PartitionMetadata]:
    return [PartitionMetadata.from_table(partition) for partition in partitions]

My guess is that if the MicroPartition lives in the object store, we're having to page this data back in just to grab metadata. This is even worse if spilling occurred and we maybe have to page it back in from disk?

I wonder if there's a better way of dealing with this.

Edit: i think this might only be triggered from certain codepaths, and in most cases the metadata is returned as an objectref after execution. This code is so messy...

@jaychia
Copy link
Contributor

jaychia commented Dec 12, 2024

Doing some experiments in #3557 but it seems like ray.wait(fetch_local=True) doesn't seem to do anything for the metadata in my case (I still hit OOMs). I'm guessing maybe for small objects like the metadata this doesn't actually work, and it only works for things in the object store?

Doing another round of tests now using an explicit ray.get and trying to explicitly set the cached metadata object in the result. If that works, then it could work well for your use-case also.

Copy link
Contributor

@jaychia jaychia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this PR looks good to me, but I wonder if we should take a more holistic approach here and maybe just fix how metadata is being passed around the system rather than patch it here specifically for the map/reduce shuffle.

Maybe something really dumb like hooking into a stats/metadata actor? Would love to get thoughts from @samster25

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants