Skip to content

Commit

Permalink
parallel concats
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 12, 2023
1 parent 5bd7a8e commit 7911834
Showing 1 changed file with 42 additions and 3 deletions.
45 changes: 42 additions & 3 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ use arrow2::{
io::csv::read_async::{read_rows, AsyncReaderBuilder, ByteRecord},
};
use async_compat::{Compat, CompatExt};
use common_error::DaftResult;
use common_error::{DaftError, DaftResult};
use csv_async::AsyncReader;
use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_dsl::optimization::get_required_columns;
use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef};
use daft_table::Table;
use futures::{Stream, StreamExt, TryStreamExt};
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
use rayon::{
iter::IntoParallelIterator,
prelude::{IntoParallelRefIterator, ParallelIterator},
};
use snafu::{
futures::{try_future::Context, TryFutureExt},
ResultExt,
Expand Down Expand Up @@ -142,6 +145,42 @@ fn assert_stream_send<'u, R>(
s
}

// Parallel version of table concat
// get rid of this once Table APIs are parallel
fn parallel_table_concat(tables: &[Table]) -> DaftResult<Table> {
if tables.is_empty() {
return Err(DaftError::ValueError(
"Need at least 1 Table to perform concat".to_string(),
));
}
if tables.len() == 1 {
return Ok((*tables.first().unwrap()).clone());
}
let first_table = tables.first().unwrap();

let first_schema = first_table.schema.as_ref();
for tab in tables.iter().skip(1) {
if tab.schema.as_ref() != first_schema {
return Err(DaftError::SchemaMismatch(format!(
"Table concat requires all schemas to match, {} vs {}",
first_schema, tab.schema
)));
}
}
let num_columns = first_table.num_columns();
let new_series = (0..num_columns)
.into_par_iter()
.map(|i| {
let series_to_cat: Vec<&Series> = tables
.iter()
.map(|s| s.as_ref().get_column_by_index(i).unwrap())
.collect();
Series::concat(series_to_cat.as_slice())
})
.collect::<DaftResult<Vec<_>>>()?;
Table::new(first_table.schema.clone(), new_series)
}

async fn read_csv_single_into_table(
uri: &str,
convert_options: Option<CsvConvertOptions>,
Expand Down Expand Up @@ -286,7 +325,7 @@ async fn read_csv_single_into_table(
return Table::empty(Some(schema));
}
// TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked.
let concated_table = Table::concat(&collected_tables)?;
let concated_table = parallel_table_concat(&collected_tables)?;
if let Some(limit) = limit && concated_table.len() > limit {
// apply head incase that last chunk went over limit
concated_table.head(limit)
Expand Down

0 comments on commit 7911834

Please sign in to comment.