Skip to content

Commit

Permalink
Clean up stream error handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Oct 18, 2023
1 parent d0cd093 commit 46ec9ef
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ prettytable-rs = "0.10"
rand = "^0.8"
rayon = "1.7.0"
serde_json = "1.0.104"
snafu = "0.7.4"
snafu = {version = "0.7.4", features = ["futures"]}
tokio = {version = "1.32.0", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]}
tokio-stream = {version = "0.1.14", features = ["fs"]}
tokio-util = "0.7.8"
Expand Down
23 changes: 10 additions & 13 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ use futures::TryStreamExt;
use rayon::prelude::{
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
};
use snafu::ResultExt;
use snafu::{futures::TryFutureExt, ResultExt};
use tokio::{
fs::File,
io::{AsyncBufRead, AsyncRead, BufReader},
};
use tokio_util::io::StreamReader;

use crate::compression::CompressionCodec;
use crate::metadata::read_csv_schema_single;
use crate::{compression::CompressionCodec, ArrowSnafu};

#[allow(clippy::too_many_arguments)]
pub fn read_csv(
Expand Down Expand Up @@ -290,6 +290,7 @@ async fn read_into_column_chunks<R>(
where
R: AsyncRead + Unpin + Send,
{
let num_fields = fields.len();
let num_rows = num_rows.unwrap_or(usize::MAX);
let chunk_size_rows = {
// With a default chunk size of 64 KiB and estimated bytes per row of 200 bytes,
Expand All @@ -298,7 +299,6 @@ where
// Process at least 8 rows in a chunk, even if the rows are pretty large.
estimated_rows_per_desired_chunk.max(8).min(num_rows)
};
let num_fields = fields.len();
// Stream of unparsed CSV byte record chunks.
let read_stream = async_stream::try_stream! {
// Number of rows read in last read.
Expand All @@ -310,12 +310,10 @@ where
ByteRecord::with_capacity(estimated_mean_row_size, num_fields);
chunk_size_rows.min(num_rows - total_rows_read)
];
yield read_rows(&mut reader, 0, buffer.as_mut_slice()).await.map(|new_rows_read| {
rows_read = new_rows_read;
buffer.truncate(rows_read);
total_rows_read += rows_read;
buffer
})
rows_read = read_rows(&mut reader, 0, buffer.as_mut_slice()).await.context(ArrowSnafu {})?;
buffer.truncate(rows_read);
total_rows_read += rows_read;
yield buffer
}
};
// Parsing stream: we spawn background tokio + rayon tasks so we can pipeline chunk parsing with chunk reading, and
Expand All @@ -324,7 +322,6 @@ where
let fields = fields.clone();
let projection_indices = projection_indices.clone();
tokio::spawn(async move {
let record = record?;
let (send, recv) = tokio::sync::oneshot::channel();
rayon::spawn(move || {
let result = (move || {
Expand All @@ -345,16 +342,16 @@ where
});
recv.await.context(super::OneShotRecvSnafu {})?
})
.context(super::JoinSnafu {})
});
// Collect all chunks in chunk x column form.
let chunks = parse_stream
// Limit the number of chunks we have in flight at any given time.
.try_buffered(max_chunks_in_flight)
.try_collect::<Vec<_>>()
.await
.context(super::JoinSnafu {})?
.await?
.into_iter()
.collect::<DaftResult<Vec<Vec<Box<dyn arrow2::array::Array>>>>>()?;
.collect::<DaftResult<Vec<_>>>()?;
// Transpose chunk x column into column x chunk.
let mut column_arrays = vec![Vec::with_capacity(chunks.len()); projection_indices.len()];
for chunk in chunks.into_iter() {
Expand Down

0 comments on commit 46ec9ef

Please sign in to comment.