Skip to content

Commit

Permalink
keep parquet as is
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 2, 2024
1 parent c9a0af6 commit ec3f9b2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 32 deletions.
32 changes: 22 additions & 10 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,15 @@ impl ParquetFileReader {
self.arrow_schema.as_ref(),
)?);

let (senders, receivers): (Vec<_>, Vec<_>) = (0..self.row_ranges.len())
.map(|_| tokio::sync::mpsc::channel(1))
let chunk_size = self.chunk_size.unwrap_or(Self::DEFAULT_CHUNK_SIZE);
let (senders, receivers): (Vec<_>, Vec<_>) = self
.row_ranges
.iter()
.map(|rg_range| {
let expected_num_chunks =
f32::ceil(rg_range.num_rows as f32 / chunk_size as f32) as usize;
crossbeam_channel::bounded(expected_num_chunks)
})
.unzip();

let table_iter_handles =
Expand Down Expand Up @@ -495,7 +502,7 @@ impl ParquetFileReader {
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;

tokio::spawn(async move {
rayon::spawn(move || {
// Even if there are no columns to read, we still need to create a empty table with the correct number of rows
// This is because the columns may be present in other files. See https://github.com/Eventual-Inc/Daft/pull/2514
let table_iter = arrow_column_iters_to_table_iter(
Expand All @@ -510,12 +517,20 @@ impl ParquetFileReader {
if table_iter.is_none() {
let table =
Table::new_with_size(daft_schema, vec![], row_range.num_rows);
let _ = sender.send(table).await;
if let Err(crossbeam_channel::TrySendError::Full(_)) =
sender.try_send(table)
{
panic!("Parquet stream channel should not be full")
}
return;
}
for table_result in table_iter.unwrap() {
let is_err = table_result.is_err();
let _ = sender.send(table_result).await;
if let Err(crossbeam_channel::TrySendError::Full(_)) =
sender.try_send(table_result)
{
panic!("Parquet stream channel should not be full")
}
if is_err {
break;
}
Expand All @@ -531,11 +546,8 @@ impl ParquetFileReader {
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;

let combined_stream = futures::stream::iter(
receivers
.into_iter()
.map(tokio_stream::wrappers::ReceiverStream::new),
);
let combined_stream =
futures::stream::iter(receivers.into_iter().map(futures::stream::iter));
match maintain_order {
true => Ok(Box::pin(combined_stream.flatten())),
false => Ok(Box::pin(combined_stream.flatten_unordered(None))),
Expand Down
45 changes: 23 additions & 22 deletions src/daft-parquet/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,24 +521,25 @@ pub(crate) fn local_parquet_stream(

// Create a channel for each row group to send the processed tables to the stream
// Each channel is expected to have a number of chunks equal to the number of chunks in the row group
let (senders, receivers): (Vec<_>, Vec<_>) = (0..row_ranges.len())
.map(|_| tokio::sync::mpsc::channel(1))
let (senders, receivers): (Vec<_>, Vec<_>) = row_ranges
.iter()
.map(|rg_range| {
let expected_num_chunks =
f32::ceil(rg_range.num_rows as f32 / chunk_size as f32) as usize;
crossbeam_channel::bounded(expected_num_chunks)
})
.unzip();

let owned_uri = uri.to_string();

// Once a row group has been read into memory and we have the column iterators,
// we can start processing them in parallel.
let par_column_iters = column_iters.zip(row_ranges).zip(senders);

// For each vec of column iters, iterate through them in parallel lock step such that each iteration
// produces a chunk of the row group that can be converted into a table.
par_column_iters.for_each(move |((rg_column_iters_result, rg_range), tx)| {
let schema_ref = schema_ref.clone();
let predicate = predicate.clone();
let owned_uri = owned_uri.clone();
let original_columns = original_columns.clone();
tokio::spawn(async move {
rayon::spawn(move || {
// Once a row group has been read into memory and we have the column iterators,
// we can start processing them in parallel.
let par_column_iters = column_iters.zip(row_ranges).zip(senders).par_bridge();

// For each vec of column iters, iterate through them in parallel lock step such that each iteration
// produces a chunk of the row group that can be converted into a table.
par_column_iters.for_each(move |((rg_column_iters_result, rg_range), tx)| {
let table_iter = match rg_column_iters_result {
Ok(rg_column_iters) => {
let table_iter = arrow_column_iters_to_table_iter(
Expand All @@ -557,30 +558,30 @@ pub(crate) fn local_parquet_stream(
} else {
let table =
Table::new_with_size(schema_ref.clone(), vec![], rg_range.num_rows);
let _ = tx.send(table).await;
if let Err(crossbeam_channel::TrySendError::Full(_)) = tx.try_send(table) {
panic!("Parquet stream channel should not be full")
}
return;
}
}
Err(e) => {
let _ = tx.send(Err(e.into())).await;
let _ = tx.send(Err(e.into()));
return;
}
};
for table_result in table_iter {
let table_err = table_result.is_err();
let _ = tx.send(table_result).await;
if let Err(crossbeam_channel::TrySendError::Full(_)) = tx.try_send(table_result) {
panic!("Parquet stream channel should not be full")
}
if table_err {
break;
}
}
});
});

let result_stream = futures::stream::iter(
receivers
.into_iter()
.map(tokio_stream::wrappers::ReceiverStream::new),
);
let result_stream = futures::stream::iter(receivers.into_iter().map(futures::stream::iter));

match maintain_order {
true => Ok((metadata, Box::pin(result_stream.flatten()))),
Expand Down

0 comments on commit ec3f9b2

Please sign in to comment.