Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Add a parallel local CSV reader #3055

Merged
merged 39 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c16badc
Prototype
desmondcheongzx Aug 30, 2024
58ad170
Fix rebase
desmondcheongzx Oct 16, 2024
1c4881c
Remove unused deps
desmondcheongzx Oct 16, 2024
6cbd1f8
Remove rebase artifact
desmondcheongzx Oct 16, 2024
3136a1c
Fix tests
desmondcheongzx Oct 16, 2024
61b58c0
Add fallback
desmondcheongzx Oct 16, 2024
ef23acb
Cleanup
desmondcheongzx Oct 16, 2024
0449339
Fix comment
desmondcheongzx Oct 16, 2024
b64a135
wip
samster25 Oct 17, 2024
c5c8cbd
Make it work
desmondcheongzx Oct 18, 2024
423c2f0
Remove unused deps
desmondcheongzx Oct 18, 2024
11850aa
Rework state machine
desmondcheongzx Oct 18, 2024
516596a
Remove option wrapping around file slab buffers
desmondcheongzx Oct 18, 2024
c44871d
Remove Arcs
desmondcheongzx Oct 18, 2024
68d520b
clean up multislice reader
samster25 Oct 18, 2024
f617d45
rwlock example
samster25 Oct 18, 2024
8e86310
Rwlocking
desmondcheongzx Oct 19, 2024
0df4f04
Fix poisoning
desmondcheongzx Oct 22, 2024
024fbd7
Clean up buffer pools
desmondcheongzx Oct 22, 2024
f9a0cb4
Enforce ordering of results with oneshot channels
desmondcheongzx Oct 22, 2024
16c3e0e
Avoid concat
desmondcheongzx Oct 22, 2024
93350f0
Clean up unused deps
desmondcheongzx Oct 22, 2024
b87caba
Clean up
desmondcheongzx Oct 22, 2024
1cac1a0
minor cleanup
samster25 Oct 23, 2024
84e74ad
async works baby
samster25 Oct 23, 2024
554744a
reset validation state
samster25 Oct 23, 2024
105daac
remove box stream
samster25 Oct 23, 2024
2f04c6d
error clean up
samster25 Oct 23, 2024
efb9180
Apply local limit; apply max chunks in flight; cleanup
desmondcheongzx Oct 23, 2024
0559142
Address remaining comments
desmondcheongzx Oct 23, 2024
43c8a6a
Errr didn't git add some things
desmondcheongzx Oct 23, 2024
eac91b2
Cleanup
desmondcheongzx Oct 23, 2024
7006a47
Remove crossbeam dep
desmondcheongzx Oct 23, 2024
9d35c1b
Move pool stuff into their own mod
desmondcheongzx Oct 23, 2024
42ef62d
use smoll vec
desmondcheongzx Oct 24, 2024
2343c06
Apply global limit within streaming path
desmondcheongzx Oct 24, 2024
f5a80b4
Add todo to modify read_csv to take in configurable record terminator
desmondcheongzx Oct 24, 2024
ffd28de
Merge remote-tracking branch 'daft/main' into local-csv-reader
desmondcheongzx Oct 24, 2024
8106072
Fix limit with predicate
desmondcheongzx Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 24 additions & 1 deletion src/arrow2/src/io/csv/read_async/reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use futures::AsyncRead;

use super::{AsyncReader, ByteRecord};
use crate::io::csv::read;

use crate::error::{Error, Result};

/// Asynchronosly read `len` rows from `reader` into `row`, skipping the first `skip`.
/// Asynchronosly read `rows.len` rows from `reader` into `rows`, skipping the first `skip`.
/// This operation has minimal CPU work and is thus the fastest way to read through a CSV
/// without deserializing the contents to Arrow.
pub async fn read_rows<R>(
Expand Down Expand Up @@ -37,3 +38,25 @@ where
}
Ok(row_number)
}

/// Synchronously read `rows.len` rows from `reader` into `rows`. This is used in the local i/o case.
pub fn local_read_rows<R>(
reader: &mut read::Reader<R>,
rows: &mut [read::ByteRecord],
) -> Result<(usize, bool)>
where
R: std::io::Read,
{
let mut row_number = 0;
let mut has_more = true;
for row in rows.iter_mut() {
has_more = reader
.read_byte_record(row)
.map_err(|e| Error::External(format!(" at line {}", row_number), Box::new(e)))?;
if !has_more {
break;
}
row_number += 1;
}
Ok((row_number, has_more))
}
2 changes: 2 additions & 0 deletions src/daft-csv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ async-compat = {workspace = true}
async-stream = {workspace = true}
common-error = {path = "../common/error", default-features = false}
common-py-serde = {path = "../common/py-serde", default-features = false}
crossbeam-channel = "0.5.1"
csv-async = "1.3.0"
daft-compression = {path = "../daft-compression", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
Expand All @@ -12,6 +13,7 @@ daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
memchr = "2.7.2"
pyo3 = {workspace = true, optional = true}
rayon = {workspace = true}
serde = {workspace = true}
Expand Down
6 changes: 6 additions & 0 deletions src/daft-csv/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#![feature(async_closure)]
#![feature(let_chains)]
#![feature(new_uninit)]
#![feature(trait_alias)]
#![feature(trait_upcasting)]
#![feature(test)]
extern crate test;
use common_error::DaftError;
use snafu::Snafu;

pub mod local;
pub mod metadata;
pub mod options;
#[cfg(feature = "python")]
Expand All @@ -23,6 +27,8 @@ pub enum Error {
#[snafu(display("{source}"))]
IOError { source: daft_io::Error },
#[snafu(display("{source}"))]
StdIOError { source: std::io::Error },
desmondcheongzx marked this conversation as resolved.
Show resolved Hide resolved
#[snafu(display("{source}"))]
CSVError { source: csv_async::Error },
#[snafu(display("Invalid char: {}", val))]
WrongChar {
Expand Down
Loading
Loading