Skip to content

Commit

Permalink
Fixing tests and review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
subygan committed Nov 17, 2023
1 parent 7c27ded commit de5fe92
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 123 deletions.
4 changes: 3 additions & 1 deletion daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ def __next__(self) -> pa.RecordBatch:
def __iter__(self) -> PACSVStreamHelper:
return self

def skip_comment(comment: str='#'):
return lambda row: 'skip' if row.text.startswith(comment) else 'error'
def read_csv(
file: FileInput,
schema: Schema,
Expand All @@ -206,7 +208,6 @@ def read_csv(
io_config = None
if storage_config is not None:
config = storage_config.config
print(isinstance(config, NativeStorageConfig))
if isinstance(config, NativeStorageConfig):
assert isinstance(
file, (str, pathlib.Path)
Expand Down Expand Up @@ -240,6 +241,7 @@ def read_csv(
delimiter=csv_options.delimiter,
quote_char=csv_options.quote,
escape_char=csv_options.escape_char,
invalid_row_handler=skip_comment(csv_options.comment),
),
read_options=pacsv.ReadOptions(
# If no header, we use the schema's column names. Otherwise we use the headers in the CSV file.
Expand Down
2 changes: 2 additions & 0 deletions src/daft-csv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub enum Error {
IOError { source: daft_io::Error },
#[snafu(display("{source}"))]
CSVError { source: csv_async::Error },
#[snafu(display("Invalid char: {}",val))]
WrongChar { source: std::char::TryFromCharError, val: char },
#[snafu(display("{source}"))]
ArrowError { source: arrow2::error::Error },
#[snafu(display("Error joining spawned task: {}", source))]
Expand Down
19 changes: 10 additions & 9 deletions src/daft-csv/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@ use tokio_util::io::StreamReader;

use crate::{compression::CompressionCodec, schema::merge_schema};
use daft_decoding::inference::infer;
use crate::read::char_to_byte;

const DEFAULT_COLUMN_PREFIX: &str = "column_";

pub fn read_csv_schema(
uri: &str,
has_header: bool,
delimiter: Option<u8>,
delimiter: Option<char>,
double_quote: bool,
quote: Option<u8>,
escape_char: Option<u8>,
comment: Option<u8>,
quote: Option<char>,
escape_char: Option<char>,
comment: Option<char>,
max_bytes: Option<usize>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
Expand All @@ -35,11 +36,11 @@ pub fn read_csv_schema(
read_csv_schema_single(
uri,
has_header,
delimiter,
char_to_byte(delimiter)?,
double_quote,
quote,
escape_char,
comment,
char_to_byte(quote)?,
char_to_byte(escape_char)?,
char_to_byte(comment)?,
// Default to 1 MiB.
max_bytes.or(Some(1024 * 1024)),
io_client,
Expand Down Expand Up @@ -376,7 +377,7 @@ mod tests {
let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema(
file.as_ref(),
true,
Some(b'|'),
Some('|'),
true,
None,
None,
Expand Down
30 changes: 9 additions & 21 deletions src/daft-csv/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,7 @@ pub mod pylib {
use daft_core::python::schema::PySchema;
use daft_io::{get_io_client, python::IOConfig, IOStatsContext};
use daft_table::python::PyTable;
use pyo3::{exceptions::PyValueError, pyfunction, PyResult, Python};

fn char_to_byte(char_val: Option<char>) -> PyResult<Option<u8>> {

char_val.map(|c| match u8::try_from(c){
Err(_e) => Err(PyValueError::new_err(format!(
"character is not valid : {:?}",
c
))),
Ok(c) => Ok(c),
})
.transpose()
}
use pyo3::{pyfunction, PyResult, Python};

#[pyfunction]
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -53,11 +41,11 @@ pub mod pylib {
include_columns,
num_rows,
has_header.unwrap_or(true),
char_to_byte(delimiter)?,
delimiter,
double_quote.unwrap_or(true),
char_to_byte(quote)?,
char_to_byte(escape_char)?,
char_to_byte(comment)?,
quote,
escape_char,
comment,
io_client,
Some(io_stats),
multithreaded_io.unwrap_or(true),
Expand Down Expand Up @@ -95,11 +83,11 @@ pub mod pylib {
let (schema, _, _, _, _) = crate::metadata::read_csv_schema(
uri,
has_header.unwrap_or(true),
char_to_byte(delimiter)?,
delimiter,
double_quote.unwrap_or(true),
char_to_byte(quote)?,
char_to_byte(escape_char)?,
char_to_byte(comment)?,
quote,
escape_char,
comment,
max_bytes,
io_client,
Some(io_stats),
Expand Down
63 changes: 37 additions & 26 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,32 @@ use tokio::{
use tokio_util::io::StreamReader;

use crate::metadata::read_csv_schema_single;
use crate::{compression::CompressionCodec, ArrowSnafu};
use crate::{compression::CompressionCodec, ArrowSnafu, Error};
use daft_decoding::deserialize::deserialize_column;


pub fn char_to_byte(c: Option<char>) -> Result<Option<u8>, Error> {
match c.map(u8::try_from).transpose() {
Ok(b) => Ok(b),
Err(e) => Err(Error::WrongChar {
source: e,
val: c.unwrap_or(' '),
}),
}
}

#[allow(clippy::too_many_arguments)]
pub fn read_csv(
uri: &str,
column_names: Option<Vec<&str>>,
include_columns: Option<Vec<&str>>,
num_rows: Option<usize>,
has_header: bool,
delimiter: Option<u8>,
delimiter: Option<char>,
double_quote: bool,
quote: Option<u8>,
escape_char: Option<u8>,
comment: Option<u8>,
quote: Option<char>,
escape_char: Option<char>,
comment: Option<char>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
multithreaded_io: bool,
Expand All @@ -58,11 +69,11 @@ pub fn read_csv(
include_columns,
num_rows,
has_header,
delimiter,
char_to_byte(delimiter)?,
double_quote,
quote,
escape_char,
comment,
char_to_byte(quote)?,
char_to_byte(escape_char)?,
char_to_byte(comment)?,
io_client,
io_stats,
schema,
Expand Down Expand Up @@ -492,27 +503,27 @@ mod tests {
use daft_table::Table;
use rstest::rstest;

use super::read_csv;
use super::{char_to_byte, read_csv};

fn check_equal_local_arrow2(
path: &str,
out: &Table,
has_header: bool,
delimiter: Option<u8>,
delimiter: Option<char>,
double_quote: bool,
quote: Option<u8>,
escape_char: Option<u8>,
comment: Option<u8>,
quote: Option<char>,
escape_char: Option<char>,
comment: Option<char>,
column_names: Option<Vec<&str>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) {
let mut reader = ReaderBuilder::new()
.delimiter(delimiter.unwrap_or(b','))
.delimiter(char_to_byte(delimiter).unwrap_or(None).unwrap_or(b','))
.double_quote(double_quote)
.quote(quote.unwrap_or(b'"'))
.escape(escape_char)
.comment(comment)
.quote(char_to_byte(quote).unwrap_or(None).unwrap_or(b'"'))
.escape(char_to_byte(escape_char).unwrap_or(Some(b'\\')))
.comment(char_to_byte(comment).unwrap_or(Some(b'#')))
.from_path(path)
.unwrap();
let (mut fields, _) = infer_schema(&mut reader, None, has_header, &infer).unwrap();
Expand Down Expand Up @@ -704,7 +715,7 @@ mod tests {
None,
Some(5),
true,
Some(b'|'),
Some('|'),
true,
None,
None,
Expand Down Expand Up @@ -733,7 +744,7 @@ mod tests {
file.as_ref(),
&table,
true,
Some(b'|'),
Some('|'),
true,
None,
None,
Expand Down Expand Up @@ -825,7 +836,7 @@ mod tests {
true,
None,
true,
Some(b'\''), // Testing with single quote
Some('\''), // Testing with single quote
None,
None,
io_client,
Expand Down Expand Up @@ -854,7 +865,7 @@ mod tests {
true,
None,
true,
Some(b'\''),
Some('\''),
None,
None,
None,
Expand Down Expand Up @@ -886,7 +897,7 @@ mod tests {
None,
true,
None,
Some(b'\\'), //testing with '\' as escape character
Some('\\'), //testing with '\' as escape character
None,
io_client,
None,
Expand Down Expand Up @@ -915,7 +926,7 @@ mod tests {
None,
true,
None,
Some(b'\\'),
Some('\\'),
None,
None,
None,
Expand Down Expand Up @@ -947,7 +958,7 @@ mod tests {
true,
None,
None,
Some(b'#'),
Some('#'),
io_client,
None,
true,
Expand Down Expand Up @@ -976,7 +987,7 @@ mod tests {
true,
None,
None,
Some(b'#'),
Some('#'),
None,
None,
Some(5),
Expand Down
2 changes: 1 addition & 1 deletion src/daft-csv/test/iris_tiny_comment.csv
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
5,3.6,1.4,.2,"Setosa"
5.4,3.9,1.7,.4,"Setosa"
4.6,3.4,1.4,.3,"Setosa"
#5,3.4,1.5,.2,"Setosa"
#53.41.5.2"Setosa"
4.4,2.9,1.4,.2,"Setosa"
4.9,3.1,1.5,.1,"Setosa"
5.4,3.7,1.5,.2,"Setosa"
Expand Down
23 changes: 21 additions & 2 deletions src/daft-csv/test/iris_tiny_escape.csv
Original file line number Diff line number Diff line change
@@ -1,2 +1,21 @@
"date32","date64","timestamp_s","timestamp_ms","timestamp_us","timestamp_s_utc_tz","timestamp_ms_utc_tz","timestamp_us_utc_tz","timestamp_s_tz","timestamp_ms_tz","timestamp_us_tz"
1970-01-02,1970-01-01,1970-01-01 00:00:01,1970-01-01 00:00:00.001,1970-01-01 00:00:00.000001,1970-01-01 00:00:01Z,1970-01-01 00:00:00.001Z,1970-01-01 00:00:00.000001Z,1970-01-01 07:30:01+0730,1970-01-01 07:30:00.001+0730,1970-01-01 07:30:00.000001+0730
"sepal.\"length\"","sepal.width","petal.length","petal.width","variety"
5.1,3.5,1.4,.2,"Setosa"
4.9,3,1.4,.2,"Setosa"
4.7,3.2,1.3,.2,"Setosa"
4.6,3.1,1.5,.2,"Se\"to\"sa"
5,3.6,1.4,.2,"Seto\"\"sa"
5.4,3.9,1.7,.4,"Setosa"
4.6,3.4,1.4,.3,"Setosa"
5,3.4,1.5,.2,"Setosa"
4.4,2.9,1.4,.2,"Setosa"
4.9,3.1,1.5,.1,"Setosa"
5.4,3.7,1.5,.2,"Setosa"
4.8,3.4,1.6,.2,"Setosa"
4.8,3,1.4,.1,"Setosa"
4.3,3,1.1,.1,"Setosa"
5.8,4,1.2,.2,"Setosa"
5.7,4.4,1.5,.4,"Setosa"
5.4,3.9,1.3,.4,"Setosa"
5.1,3.5,1.4,.3,"Setosa"
5.7,3.8,1.7,.3,"Setosa"
5.1,3.8,1.5,.3,"Setosa"
4 changes: 0 additions & 4 deletions src/daft-micropartition/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ pub enum Error {
#[snafu(display("DaftCoreComputeError: {}", source))]
DaftCoreCompute { source: DaftError },


#[snafu(display("non valid char: {}",val))]
WrongChar { val: char },

#[cfg(feature = "python")]
#[snafu(display("PyIOError: {}", source))]
PyIO { source: PyErr },
Expand Down
26 changes: 9 additions & 17 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use daft_table::Table;

use snafu::ResultExt;

use crate::{DaftCoreComputeSnafu, Error};
use crate::{DaftCoreComputeSnafu};
#[cfg(feature = "python")]
use crate::PyIOSnafu;

Expand All @@ -31,14 +31,6 @@ pub(crate) enum TableState {
Loaded(Arc<Vec<Table>>),
}

pub fn char_to_byte(char_val: Option<char>) -> Result<Option<u8>, Error> {

match u8::try_from(char_val.unwrap()){
Err(_e) => Err(Error::WrongChar{val: char_val.unwrap()}),
Ok(char_val) => Ok(Some(char_val)),
}
}

impl Display for TableState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -158,11 +150,11 @@ fn materialize_scan_task(
column_names.clone(),
scan_task.pushdowns.limit,
cfg.has_headers,
char_to_byte(cfg.delimiter)?,
cfg.delimiter,
cfg.double_quote,
char_to_byte(cfg.quote)?,
char_to_byte(cfg.escape_char)?,
char_to_byte(cfg.comment)?,
cfg.quote,
cfg.escape_char,
cfg.comment,
io_client.clone(),
io_stats.clone(),
native_storage_config.multithreaded_io,
Expand Down Expand Up @@ -550,11 +542,11 @@ pub(crate) fn read_csv_into_micropartition(
include_columns: Option<Vec<&str>>,
num_rows: Option<usize>,
has_header: bool,
delimiter: Option<u8>,
delimiter: Option<char>,
double_quote: bool,
quote: Option<u8>,
escape_char: Option<u8>,
comment: Option<u8>,
quote: Option<char>,
escape_char: Option<char>,
comment: Option<char>,
io_config: Arc<IOConfig>,
multithreaded_io: bool,
io_stats: Option<IOStatsRef>,
Expand Down
Loading

0 comments on commit de5fe92

Please sign in to comment.