Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming Local Writes for Native Executor #2871

Closed
wants to merge 9 commits into from

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Sep 20, 2024

Implements physical file writes, supports unpartitioned + partitioned writes, configurable row group / file sizes.

Unpartitioned writes:

  1. Spawn NUM_CPU workers that are responsible for making write calls.
  2. A centralized dispatch function will receive input from the child node, buffer the data in chunks of target row group size / chunk size, and send them to a worker.
  3. The dispatcher will switch to a new worker once the current worker has received enough bytes to flush and close a file.

Partitioned writes

  1. Spawn NUM_CPU workers that are responsible for making write calls for a subset of partition values.
  2. A centralized dispatch function will receive input from the child node, partition it by hash, and send each partition to the worker responsible for it.
  3. Each worker will subsequently partition the input by value, and store it in a buffer.
  4. If the buffer has enough data for a row group / chunk, it will make the write to a SizedDataWriter.
  5. The SizedDataWriter is responsible for closing and opening new files when target file size is reached.

Notes:

  • File writing is still performed by PyArrow Parquet + CSV writers.
  • We may need a max_open_files parameter, which we can support via LRU caching of the writers.

Memray stats (read -> write lineitem parquet)

  • Native:
Screenshot 2024-09-24 at 8 48 43 PM
  • Python:
Screenshot 2024-09-20 at 3 38 12 PM

@github-actions github-actions bot added the enhancement New feature or request label Sep 20, 2024
Copy link

codspeed-hq bot commented Sep 20, 2024

CodSpeed Performance Report

Merging #2871 will not alter performance

Comparing colin/streaming-writes-v2 (28ddf0d) with main (46c5a7e)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Sep 20, 2024

Codecov Report

Attention: Patch coverage is 84.53739% with 122 lines in your changes missing coverage. Please review.

Project coverage is 75.60%. Comparing base (46c5a7e) to head (28ddf0d).
Report is 14 commits behind head on main.

Files with missing lines Patch % Lines
daft/io/writer.py 0.00% 40 Missing ⚠️
...aft-local-execution/src/sinks/partitioned_write.rs 91.41% 26 Missing ⚠️
...t-local-execution/src/sinks/unpartitioned_write.rs 85.79% 25 Missing ⚠️
src/daft-local-execution/src/buffer.rs 76.19% 15 Missing ⚠️
src/daft-micropartition/src/lib.rs 75.00% 6 Missing ⚠️
src/daft-parquet/src/stream_reader.rs 82.60% 4 Missing ⚠️
src/daft-local-execution/src/pipeline.rs 93.33% 3 Missing ⚠️
...-execution/src/intermediate_ops/intermediate_op.rs 66.66% 2 Missing ⚠️
src/daft-physical-plan/src/translate.rs 90.90% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2871      +/-   ##
==========================================
- Coverage   76.34%   75.60%   -0.74%     
==========================================
  Files         597      601       +4     
  Lines       71388    73589    +2201     
==========================================
+ Hits        54504    55640    +1136     
- Misses      16884    17949    +1065     
Flag Coverage Δ
75.60% <84.53%> (-0.74%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/daft-local-execution/src/lib.rs 90.47% <100.00%> (+0.73%) ⬆️
src/daft-local-execution/src/run.rs 89.84% <100.00%> (ø)
src/daft-local-execution/src/sources/in_memory.rs 100.00% <ø> (+9.09%) ⬆️
src/daft-micropartition/src/py_writers.rs 100.00% <100.00%> (ø)
src/daft-physical-plan/src/local_plan.rs 87.03% <100.00%> (+1.32%) ⬆️
src/daft-physical-plan/src/translate.rs 91.66% <90.90%> (-0.10%) ⬇️
...-execution/src/intermediate_ops/intermediate_op.rs 79.83% <66.66%> (-0.17%) ⬇️
src/daft-local-execution/src/pipeline.rs 91.86% <93.33%> (+0.41%) ⬆️
src/daft-parquet/src/stream_reader.rs 90.74% <82.60%> (+0.18%) ⬆️
src/daft-micropartition/src/lib.rs 67.64% <75.00%> (+17.64%) ⬆️
... and 4 more

... and 34 files with indirect coverage changes

@colin-ho colin-ho force-pushed the colin/streaming-writes-v2 branch from 68b18c1 to 8d80919 Compare September 20, 2024 22:17
Colin Ho added 2 commits September 20, 2024 15:28
@colin-ho colin-ho marked this pull request as ready for review September 20, 2024 22:53
Comment on lines +67 to +70
pub trait FileWriter: Send + Sync {
fn write(&self, data: &Arc<MicroPartition>) -> DaftResult<()>;
fn close(&self) -> DaftResult<Option<String>>;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idk where to put this trait, ideally somewhere that already has Daft-Micropartition as a dependency, so not Daft-IO.

@colin-ho
Copy link
Contributor Author

import threading
import time
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd

# Create a sample table once
df = pd.DataFrame({
    'column1': range(1000000),
    'column2': range(1000000)
})
table = pa.Table.from_pandas(df)

# Function to write a Parquet file (just writing, no data creation)
def write_parquet_file(filename):
    pq.write_table(table, filename)

# Function to run multiple threads writing parquet files
def run_test_multithreaded():
    threads = []
    start = time.time()
    
    # Create 4 threads to write parquet files concurrently
    for i in range(12):
        thread = threading.Thread(target=write_parquet_file, args=(f'output_{i}.parquet',))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    end = time.time()
    print(f"Multithreaded Time: {end - start:.4f} seconds")

# Function to run single-threaded test for comparison
def run_test_singlethreaded():
    start = time.time()
    
    # Write parquet files sequentially
    for i in range(12):
        write_parquet_file(f'output_single_{i}.parquet')
    
    end = time.time()
    print(f"Single-threaded Time: {end - start:.4f} seconds")

# Run both tests
run_test_singlethreaded()
run_test_multithreaded()

Simple multithreaded vs single threaded writing parquet using pyarrow.

Single-threaded Time: 0.3169 seconds
Multithreaded Time: 0.0751 seconds

Looks like PyArrow does release the GIL, so parallelism is possible. cc @samster25

@colin-ho colin-ho closed this Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant