Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 8, 2023
1 parent 29cb3a3 commit 6e5ab1b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/daft-csv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ daft-core = {path = "../daft-core", default-features = false}
daft-decoding = {path = "../daft-decoding"}
daft-io = {path = "../daft-io", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
futures = {workspace = true}
lexical-core = {version = "0.8"}
log = {workspace = true}
Expand All @@ -33,7 +34,7 @@ rstest = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python"]
python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-dsl/python"]

[package]
edition = {workspace = true}
Expand Down
30 changes: 24 additions & 6 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use csv_async::AsyncReader;
use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef};
use daft_table::Table;
use futures::{Stream, StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt, future};
use rayon::prelude::{
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
};
Expand Down Expand Up @@ -117,7 +117,7 @@ pub fn read_csv_bulk(
// num_parallel_tasks redundant files.
.try_take_while(|result| {
match (result, remaining_rows) {
// Limit has been met, early-teriminate.
// Limit has been met, early-terminate.
(_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
(Ok(table), Some(rows_left)) => {
Expand Down Expand Up @@ -148,6 +148,10 @@ async fn read_csv_single_into_table(
let include_columns = convert_options
.as_ref()
.and_then(|opts| opts.include_columns.clone());

let mut remaining_rows = convert_options
.as_ref()
.and_then(|opts| opts.limit.map(|limit| limit as i64));
let (chunk_stream, fields) = read_csv_single_into_stream(
uri,
convert_options.unwrap_or_default(),
Expand All @@ -171,6 +175,20 @@ async fn read_csv_single_into_table(
let chunks = chunk_stream
// Limit the number of chunks we have in flight at any given time.
.try_buffered(max_chunks_in_flight)
.try_take_while(|result| {
match (result, remaining_rows) {
// Limit has been met, early-terminate.
(_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
(Ok(table), Some(rows_left)) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
// (1) No limit, never early-terminate.
// (2) Encountered error, propagate error to try_collect to allow it to short-circuit.
(_, None) | (Err(_), _) => futures::future::ready(Ok(true)),
}
})
.try_collect::<Vec<_>>()
.await?
.into_iter()
Expand Down Expand Up @@ -288,14 +306,14 @@ async fn read_csv_single_into_stream(
let projection_indices =
fields_to_projection_indices(&schema.fields, &convert_options.include_columns);
let fields = schema.fields;
Ok((
let stream =
parse_into_column_array_chunk_stream(
read_stream,
Arc::new(fields.clone()),
projection_indices,
),
fields,
))
);

Ok((stream, fields))
}

fn read_into_byterecord_chunk_stream<R>(
Expand Down

0 comments on commit 6e5ab1b

Please sign in to comment.