Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming Catalog Writes #2966

Closed
wants to merge 24 commits into from
Closed

Conversation

colin-ho
Copy link
Contributor

No description provided.

@github-actions github-actions bot added the enhancement New feature or request label Sep 28, 2024
Copy link

codspeed-hq bot commented Sep 30, 2024

CodSpeed Performance Report

Merging #2966 will not alter performance

Comparing colin/streaming-catalog-writes (f835c47) with main (fe4553f)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Sep 30, 2024

Codecov Report

Attention: Patch coverage is 79.95578% with 272 lines in your changes missing coverage. Please review.

Project coverage is 78.34%. Comparing base (b2dabf6) to head (f835c47).
Report is 86 commits behind head on main.

Files with missing lines Patch % Lines
daft/io/writer.py 0.00% 163 Missing ⚠️
...-local-execution/src/writes/unpartitioned_write.rs 85.05% 26 Missing ⚠️
...ft-local-execution/src/writes/partitioned_write.rs 89.47% 24 Missing ⚠️
src/daft-local-execution/src/buffer.rs 76.19% 15 Missing ⚠️
src/daft-local-execution/src/sources/scan_task.rs 79.36% 13 Missing ⚠️
src/daft-parquet/src/stream_reader.rs 45.83% 13 Missing ⚠️
src/daft-local-execution/src/pipeline.rs 95.42% 7 Missing ⚠️
src/daft-micropartition/src/lib.rs 90.62% 6 Missing ⚠️
...-execution/src/intermediate_ops/intermediate_op.rs 66.66% 2 Missing ⚠️
src/daft-micropartition/src/py_writers.rs 99.42% 1 Missing ⚠️
... and 2 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2966      +/-   ##
==========================================
+ Coverage   78.22%   78.34%   +0.11%     
==========================================
  Files         598      609      +11     
  Lines       70556    72222    +1666     
==========================================
+ Hits        55194    56583    +1389     
- Misses      15362    15639     +277     
Flag Coverage Δ
?

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
daft/logical/builder.py 89.87% <100.00%> (+0.26%) ⬆️
daft/table/table_io.py 85.96% <ø> (ø)
src/daft-local-execution/src/lib.rs 90.47% <100.00%> (+0.73%) ⬆️
src/daft-local-execution/src/run.rs 89.84% <100.00%> (ø)
...daft-local-execution/src/writes/deltalake_write.rs 100.00% <100.00%> (ø)
...c/daft-local-execution/src/writes/iceberg_write.rs 100.00% <100.00%> (ø)
.../daft-local-execution/src/writes/physical_write.rs 100.00% <100.00%> (ø)
src/daft-parquet/src/file.rs 74.01% <100.00%> (+0.10%) ⬆️
src/daft-physical-plan/src/local_plan.rs 89.06% <100.00%> (+3.34%) ⬆️
src/daft-plan/src/builder.rs 81.87% <100.00%> (-11.07%) ⬇️
... and 14 more

... and 34 files with indirect coverage changes

@samster25 samster25 changed the base branch from main to colin/streaming-physical-writes October 22, 2024 00:46
@samster25 samster25 changed the base branch from colin/streaming-physical-writes to main October 22, 2024 00:47
colin-ho added a commit that referenced this pull request Oct 31, 2024
Streaming writes for swordfish (parquet + csv only). Iceberg and delta
writes are here: #2966

Implement streaming writes as a blocking sink. Unpartitioned writes run
with 1 worker, and Partitioned writes run with NUM_CPUs workers. As a
drive by, made blocking sinks parallelizable.

**Behaviour**
- Unpartitioned: Make writes to a `TargetFileSizeWriter`, which manages
file sizes and row group sizes, as data is streamed in.

- Partitioned: Partition data via a `Dispatcher` and send to workers
based on the hash. Each worker runs a `PartitionedWriter` that manages
partitioning by value, file sizes, and row group sizes.


**Benchmarks:**
I made a new benchmark suite in
`tests/benchmarks/test_streaming_writes.py`, it tests writes of tpch
lineitem to parquet/csv with/without partition columns and different
file/rowgroup size. The streaming executor performs much better when
there are partition columns, as seen in this screenshot. Without
partition columns it is about the same, when target row group size /
file size is decreased, it is slightly slower. Likely due to the fact
that probably does more slicing, but will need to investigate more.
Memory usage is the same for both.
<img width="1400" alt="Screenshot 2024-10-03 at 11 22 32 AM"
src="https://github.com/user-attachments/assets/53b4d77d-553a-4181-8a4d-9eddaa3adaf7">

Memory test on read->write parquet tpch lineitem sf1:
Native:
<img width="1078" alt="Screenshot 2024-10-08 at 1 48 34 PM"
src="https://github.com/user-attachments/assets/3eda33c6-9413-415f-b808-ac3c7437e269">

Python:
<img width="1090" alt="Screenshot 2024-10-08 at 1 48 50 PM"
src="https://github.com/user-attachments/assets/f92b9a9f-a3b5-408b-98d5-4ba2d66b7be4">

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
@colin-ho colin-ho closed this Oct 31, 2024
sagiahrac pushed a commit to sagiahrac/Daft that referenced this pull request Nov 4, 2024
Streaming writes for swordfish (parquet + csv only). Iceberg and delta
writes are here: Eventual-Inc#2966

Implement streaming writes as a blocking sink. Unpartitioned writes run
with 1 worker, and Partitioned writes run with NUM_CPUs workers. As a
drive by, made blocking sinks parallelizable.

**Behaviour**
- Unpartitioned: Make writes to a `TargetFileSizeWriter`, which manages
file sizes and row group sizes, as data is streamed in.

- Partitioned: Partition data via a `Dispatcher` and send to workers
based on the hash. Each worker runs a `PartitionedWriter` that manages
partitioning by value, file sizes, and row group sizes.


**Benchmarks:**
I made a new benchmark suite in
`tests/benchmarks/test_streaming_writes.py`, it tests writes of tpch
lineitem to parquet/csv with/without partition columns and different
file/rowgroup size. The streaming executor performs much better when
there are partition columns, as seen in this screenshot. Without
partition columns it is about the same, when target row group size /
file size is decreased, it is slightly slower. Likely due to the fact
that probably does more slicing, but will need to investigate more.
Memory usage is the same for both.
<img width="1400" alt="Screenshot 2024-10-03 at 11 22 32 AM"
src="https://github.com/user-attachments/assets/53b4d77d-553a-4181-8a4d-9eddaa3adaf7">

Memory test on read->write parquet tpch lineitem sf1:
Native:
<img width="1078" alt="Screenshot 2024-10-08 at 1 48 34 PM"
src="https://github.com/user-attachments/assets/3eda33c6-9413-415f-b808-ac3c7437e269">

Python:
<img width="1090" alt="Screenshot 2024-10-08 at 1 48 50 PM"
src="https://github.com/user-attachments/assets/f92b9a9f-a3b5-408b-98d5-4ba2d66b7be4">

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant