Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FEAT] Dynamically parallel local parquet reader (#3310)
Implement a dynamically parallel local streaming parquet reader. ### Background The current streaming local parquet reader, while fast and streaming, has some problems: - It reads and deserializes **ALL row groups and ALL columns in parallel.** - It does not respect **downstream back-pressure** (the crossbeam channels are all bounded by max chunks, it's free to fill it up). This leads to unnecessarily high memory usage, and it potentially starves downstream tasks. ### Solution Instead of launching all tasks at once, we can cap the number of parallel tasks based on certain factors: - Number of CPUs - Number of Columns. ### Results Most glaringly, the benefits of these are in memory usage of streaming queries, for example: ``` next(daft.read_parquet("data/tpch-dbgen/1_0/1/parquet/lineitem").iter_partitions()) # read lineitem tpch sf1 ``` The new implementation hits a peak of 300mb, while the old goes over 1gb. <img width="1186" alt="Screenshot 2024-11-18 at 11 35 36 PM" src="https://github.com/user-attachments/assets/45fb9fab-3215-4ff6-a7fe-63a428fd9c7b"> <img width="1170" alt="Screenshot 2024-11-18 at 11 36 15 PM" src="https://github.com/user-attachments/assets/591b9bad-25e9-46ed-ba53-caaa892f50eb"> Another example, where we stream the entire file, but the consumption is slow: ``` for _ in daft.read_parquet("/Users/colinho/Desktop/Daft/z/daft_tpch_100g_32part_64RG.parquet").iter_partitions(): time.sleep(0.1) ``` The new implementation hits a peak of 1.2gb, while the old goes over 3gb. <img width="1188" alt="Screenshot 2024-11-18 at 11 42 01 PM" src="https://github.com/user-attachments/assets/de9976c7-9c7f-46b4-bd24-1b0ade8a4a86"> <img width="1172" alt="Screenshot 2024-11-18 at 11 42 44 PM" src="https://github.com/user-attachments/assets/5a2f1bbc-35ed-45a8-93c1-d1853cdbfc89"> To maintain perfomance parity, I also wrote some benchmarks for parquet files with differing rows / cols / row groups, the results show that the new implementation is pretty much on par, with some slight differences. <img width="1432" alt="Screenshot 2024-11-18 at 11 29 30 PM" src="https://github.com/user-attachments/assets/cf8c7f2c-3fa4-4d43-979a-da2b0f8a1f35"> <img width="1407" alt="Screenshot 2024-11-18 at 11 29 38 PM" src="https://github.com/user-attachments/assets/b5afb8ca-fe8e-4f6e-b6a1-ea8c45be36cb"> On reading a tpch sf-1 lineitem table though: the results are pretty much the same: (~0.2s) --------- Co-authored-by: Colin Ho <[email protected]> Co-authored-by: EC2 Default User <[email protected]>
- Loading branch information