diff --git a/daft/table/table_io.py b/daft/table/table_io.py index bc6292e93d..2a4dd48533 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -183,6 +183,8 @@ def __next__(self) -> pa.RecordBatch: def __iter__(self) -> PACSVStreamHelper: return self +def skip_comment(comment: str='#'): + return lambda row: 'skip' if row.text.startswith(comment) else 'error' def read_csv( file: FileInput, schema: Schema, @@ -206,7 +208,6 @@ def read_csv( io_config = None if storage_config is not None: config = storage_config.config - print(isinstance(config, NativeStorageConfig)) if isinstance(config, NativeStorageConfig): assert isinstance( file, (str, pathlib.Path) @@ -240,6 +241,7 @@ def read_csv( delimiter=csv_options.delimiter, quote_char=csv_options.quote, escape_char=csv_options.escape_char, + invalid_row_handler=skip_comment(csv_options.comment), ), read_options=pacsv.ReadOptions( # If no header, we use the schema's column names. Otherwise we use the headers in the CSV file. diff --git a/src/daft-csv/src/lib.rs b/src/daft-csv/src/lib.rs index 026f1fe98d..0741c98428 100644 --- a/src/daft-csv/src/lib.rs +++ b/src/daft-csv/src/lib.rs @@ -18,6 +18,8 @@ pub enum Error { IOError { source: daft_io::Error }, #[snafu(display("{source}"))] CSVError { source: csv_async::Error }, + #[snafu(display("Invalid char: {}",val))] + WrongChar { source: std::char::TryFromCharError, val: char }, #[snafu(display("{source}"))] ArrowError { source: arrow2::error::Error }, #[snafu(display("Error joining spawned task: {}", source))] diff --git a/src/daft-csv/src/metadata.rs b/src/daft-csv/src/metadata.rs index 0bef981c2f..fe86ef922d 100644 --- a/src/daft-csv/src/metadata.rs +++ b/src/daft-csv/src/metadata.rs @@ -14,17 +14,18 @@ use tokio_util::io::StreamReader; use crate::{compression::CompressionCodec, schema::merge_schema}; use daft_decoding::inference::infer; +use crate::read::char_to_byte; const DEFAULT_COLUMN_PREFIX: &str = "column_"; pub fn read_csv_schema( uri: &str, has_header: bool, - delimiter: Option, + delimiter: Option, double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + quote: Option, + escape_char: Option, + comment: Option, max_bytes: Option, io_client: Arc, io_stats: Option, @@ -35,11 +36,11 @@ pub fn read_csv_schema( read_csv_schema_single( uri, has_header, - delimiter, + char_to_byte(delimiter)?, double_quote, - quote, - escape_char, - comment, + char_to_byte(quote)?, + char_to_byte(escape_char)?, + char_to_byte(comment)?, // Default to 1 MiB. max_bytes.or(Some(1024 * 1024)), io_client, @@ -376,7 +377,7 @@ mod tests { let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( file.as_ref(), true, - Some(b'|'), + Some('|'), true, None, None, diff --git a/src/daft-csv/src/python.rs b/src/daft-csv/src/python.rs index 0fe515bb15..965af9d750 100644 --- a/src/daft-csv/src/python.rs +++ b/src/daft-csv/src/python.rs @@ -6,19 +6,7 @@ pub mod pylib { use daft_core::python::schema::PySchema; use daft_io::{get_io_client, python::IOConfig, IOStatsContext}; use daft_table::python::PyTable; - use pyo3::{exceptions::PyValueError, pyfunction, PyResult, Python}; - - fn char_to_byte(char_val: Option) -> PyResult> { - - char_val.map(|c| match u8::try_from(c){ - Err(_e) => Err(PyValueError::new_err(format!( - "character is not valid : {:?}", - c - ))), - Ok(c) => Ok(c), - }) - .transpose() - } + use pyo3::{pyfunction, PyResult, Python}; #[pyfunction] #[allow(clippy::too_many_arguments)] @@ -53,11 +41,11 @@ pub mod pylib { include_columns, num_rows, has_header.unwrap_or(true), - char_to_byte(delimiter)?, + delimiter, double_quote.unwrap_or(true), - char_to_byte(quote)?, - char_to_byte(escape_char)?, - char_to_byte(comment)?, + quote, + escape_char, + comment, io_client, Some(io_stats), multithreaded_io.unwrap_or(true), @@ -95,11 +83,11 @@ pub mod pylib { let (schema, _, _, _, _) = crate::metadata::read_csv_schema( uri, has_header.unwrap_or(true), - char_to_byte(delimiter)?, + delimiter, double_quote.unwrap_or(true), - char_to_byte(quote)?, - char_to_byte(escape_char)?, - char_to_byte(comment)?, + quote, + escape_char, + comment, max_bytes, io_client, Some(io_stats), diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index d8f517e4ff..697a0d14ae 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -26,9 +26,20 @@ use tokio::{ use tokio_util::io::StreamReader; use crate::metadata::read_csv_schema_single; -use crate::{compression::CompressionCodec, ArrowSnafu}; +use crate::{compression::CompressionCodec, ArrowSnafu, Error}; use daft_decoding::deserialize::deserialize_column; + +pub fn char_to_byte(c: Option) -> Result, Error> { + match c.map(u8::try_from).transpose() { + Ok(b) => Ok(b), + Err(e) => Err(Error::WrongChar { + source: e, + val: c.unwrap_or(' '), + }), + } +} + #[allow(clippy::too_many_arguments)] pub fn read_csv( uri: &str, @@ -36,11 +47,11 @@ pub fn read_csv( include_columns: Option>, num_rows: Option, has_header: bool, - delimiter: Option, + delimiter: Option, double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + quote: Option, + escape_char: Option, + comment: Option, io_client: Arc, io_stats: Option, multithreaded_io: bool, @@ -58,11 +69,11 @@ pub fn read_csv( include_columns, num_rows, has_header, - delimiter, + char_to_byte(delimiter)?, double_quote, - quote, - escape_char, - comment, + char_to_byte(quote)?, + char_to_byte(escape_char)?, + char_to_byte(comment)?, io_client, io_stats, schema, @@ -492,27 +503,27 @@ mod tests { use daft_table::Table; use rstest::rstest; - use super::read_csv; + use super::{char_to_byte, read_csv}; fn check_equal_local_arrow2( path: &str, out: &Table, has_header: bool, - delimiter: Option, + delimiter: Option, double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + quote: Option, + escape_char: Option, + comment: Option, column_names: Option>, projection: Option>, limit: Option, ) { let mut reader = ReaderBuilder::new() - .delimiter(delimiter.unwrap_or(b',')) + .delimiter(char_to_byte(delimiter).unwrap_or(None).unwrap_or(b',')) .double_quote(double_quote) - .quote(quote.unwrap_or(b'"')) - .escape(escape_char) - .comment(comment) + .quote(char_to_byte(quote).unwrap_or(None).unwrap_or(b'"')) + .escape(char_to_byte(escape_char).unwrap_or(Some(b'\\'))) + .comment(char_to_byte(comment).unwrap_or(Some(b'#'))) .from_path(path) .unwrap(); let (mut fields, _) = infer_schema(&mut reader, None, has_header, &infer).unwrap(); @@ -704,7 +715,7 @@ mod tests { None, Some(5), true, - Some(b'|'), + Some('|'), true, None, None, @@ -733,7 +744,7 @@ mod tests { file.as_ref(), &table, true, - Some(b'|'), + Some('|'), true, None, None, @@ -825,7 +836,7 @@ mod tests { true, None, true, - Some(b'\''), // Testing with single quote + Some('\''), // Testing with single quote None, None, io_client, @@ -854,7 +865,7 @@ mod tests { true, None, true, - Some(b'\''), + Some('\''), None, None, None, @@ -886,7 +897,7 @@ mod tests { None, true, None, - Some(b'\\'), //testing with '\' as escape character + Some('\\'), //testing with '\' as escape character None, io_client, None, @@ -915,7 +926,7 @@ mod tests { None, true, None, - Some(b'\\'), + Some('\\'), None, None, None, @@ -947,7 +958,7 @@ mod tests { true, None, None, - Some(b'#'), + Some('#'), io_client, None, true, @@ -976,7 +987,7 @@ mod tests { true, None, None, - Some(b'#'), + Some('#'), None, None, Some(5), diff --git a/src/daft-csv/test/iris_tiny_comment.csv b/src/daft-csv/test/iris_tiny_comment.csv index 88d8f16fd5..a75b06010e 100644 --- a/src/daft-csv/test/iris_tiny_comment.csv +++ b/src/daft-csv/test/iris_tiny_comment.csv @@ -6,7 +6,7 @@ 5,3.6,1.4,.2,"Setosa" 5.4,3.9,1.7,.4,"Setosa" 4.6,3.4,1.4,.3,"Setosa" -#5,3.4,1.5,.2,"Setosa" +#53.41.5.2"Setosa" 4.4,2.9,1.4,.2,"Setosa" 4.9,3.1,1.5,.1,"Setosa" 5.4,3.7,1.5,.2,"Setosa" diff --git a/src/daft-csv/test/iris_tiny_escape.csv b/src/daft-csv/test/iris_tiny_escape.csv index 4954bf86ac..34d360a2ed 100644 --- a/src/daft-csv/test/iris_tiny_escape.csv +++ b/src/daft-csv/test/iris_tiny_escape.csv @@ -1,2 +1,21 @@ -"date32","date64","timestamp_s","timestamp_ms","timestamp_us","timestamp_s_utc_tz","timestamp_ms_utc_tz","timestamp_us_utc_tz","timestamp_s_tz","timestamp_ms_tz","timestamp_us_tz" -1970-01-02,1970-01-01,1970-01-01 00:00:01,1970-01-01 00:00:00.001,1970-01-01 00:00:00.000001,1970-01-01 00:00:01Z,1970-01-01 00:00:00.001Z,1970-01-01 00:00:00.000001Z,1970-01-01 07:30:01+0730,1970-01-01 07:30:00.001+0730,1970-01-01 07:30:00.000001+0730 +"sepal.\"length\"","sepal.width","petal.length","petal.width","variety" +5.1,3.5,1.4,.2,"Setosa" +4.9,3,1.4,.2,"Setosa" +4.7,3.2,1.3,.2,"Setosa" +4.6,3.1,1.5,.2,"Se\"to\"sa" +5,3.6,1.4,.2,"Seto\"\"sa" +5.4,3.9,1.7,.4,"Setosa" +4.6,3.4,1.4,.3,"Setosa" +5,3.4,1.5,.2,"Setosa" +4.4,2.9,1.4,.2,"Setosa" +4.9,3.1,1.5,.1,"Setosa" +5.4,3.7,1.5,.2,"Setosa" +4.8,3.4,1.6,.2,"Setosa" +4.8,3,1.4,.1,"Setosa" +4.3,3,1.1,.1,"Setosa" +5.8,4,1.2,.2,"Setosa" +5.7,4.4,1.5,.4,"Setosa" +5.4,3.9,1.3,.4,"Setosa" +5.1,3.5,1.4,.3,"Setosa" +5.7,3.8,1.7,.3,"Setosa" +5.1,3.8,1.5,.3,"Setosa" diff --git a/src/daft-micropartition/src/lib.rs b/src/daft-micropartition/src/lib.rs index 91634f97c3..c15e1e2c30 100644 --- a/src/daft-micropartition/src/lib.rs +++ b/src/daft-micropartition/src/lib.rs @@ -18,10 +18,6 @@ pub enum Error { #[snafu(display("DaftCoreComputeError: {}", source))] DaftCoreCompute { source: DaftError }, - - #[snafu(display("non valid char: {}",val))] - WrongChar { val: char }, - #[cfg(feature = "python")] #[snafu(display("PyIOError: {}", source))] PyIO { source: PyErr }, diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index d74f26d09b..16c0762b2e 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -18,7 +18,7 @@ use daft_table::Table; use snafu::ResultExt; -use crate::{DaftCoreComputeSnafu, Error}; +use crate::{DaftCoreComputeSnafu}; #[cfg(feature = "python")] use crate::PyIOSnafu; @@ -31,14 +31,6 @@ pub(crate) enum TableState { Loaded(Arc>), } -pub fn char_to_byte(char_val: Option) -> Result, Error> { - - match u8::try_from(char_val.unwrap()){ - Err(_e) => Err(Error::WrongChar{val: char_val.unwrap()}), - Ok(char_val) => Ok(Some(char_val)), - } -} - impl Display for TableState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -158,11 +150,11 @@ fn materialize_scan_task( column_names.clone(), scan_task.pushdowns.limit, cfg.has_headers, - char_to_byte(cfg.delimiter)?, + cfg.delimiter, cfg.double_quote, - char_to_byte(cfg.quote)?, - char_to_byte(cfg.escape_char)?, - char_to_byte(cfg.comment)?, + cfg.quote, + cfg.escape_char, + cfg.comment, io_client.clone(), io_stats.clone(), native_storage_config.multithreaded_io, @@ -550,11 +542,11 @@ pub(crate) fn read_csv_into_micropartition( include_columns: Option>, num_rows: Option, has_header: bool, - delimiter: Option, + delimiter: Option, double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + quote: Option, + escape_char: Option, + comment: Option, io_config: Arc, multithreaded_io: bool, io_stats: Option, diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 9f1a12029a..872f3521ed 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -28,18 +28,6 @@ struct PyMicroPartition { inner: Arc, } -// TODO remove this repeated code fragment -pub fn char_to_byte(char_val: Option) -> PyResult> { - - char_val.map(|c| match u8::try_from(c){ - Err(_e) => Err(PyValueError::new_err(format!( - "character is not valid : {:?}", - c - ))), - Ok(c) => Ok(c), - }) - .transpose() -} #[pymethods] impl PyMicroPartition { pub fn schema(&self) -> PyResult { @@ -411,11 +399,11 @@ impl PyMicroPartition { include_columns, num_rows, has_header.unwrap_or(true), - char_to_byte(delimiter)?, + delimiter, double_quote.unwrap_or(true), - char_to_byte(quote)?, - char_to_byte(escape_char)?, - char_to_byte(comment)?, + quote, + escape_char, + comment, io_config, multithreaded_io.unwrap_or(true), Some(io_stats), diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index a024ec5ce5..199d778c34 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -85,17 +85,6 @@ fn run_glob( Ok(Box::new(iterator)) } -fn char_to_byte(char_val: Option) -> Result, DaftError> { - - match u8::try_from(char_val.unwrap()){ - Err(_e) => Err(DaftError::ValueError(format!( - "character is not valid : {:?}", - char_val - ))), - Ok(c) => Ok(Some(c)), - } -} - fn get_io_client_and_runtime( storage_config: &StorageConfig, ) -> DaftResult<(Arc, Arc)> { @@ -192,11 +181,11 @@ impl GlobScanOperator { let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema( first_filepath.as_str(), *has_headers, - char_to_byte(*delimiter)?, + *delimiter, *double_quote, - char_to_byte(*quote)?, - char_to_byte(*escape_char)?, - char_to_byte(*comment)?, + *quote, + *escape_char, + *comment, None, io_client, Some(io_stats), diff --git a/tests/table/table_io/test_csv.py b/tests/table/table_io/test_csv.py index 8ac1e192b3..41166de205 100644 --- a/tests/table/table_io/test_csv.py +++ b/tests/table/table_io/test_csv.py @@ -327,17 +327,18 @@ def test_csv_read_data_custom_escape(use_native_downloader): assert table.to_arrow() == expected.to_arrow(), f"Received:\n{table}\n\nExpected:\n{expected}" #TODO Not testing use_native_downloader = False, as pyarrow does not support comments directly -@pytest.mark.parametrize("use_native_downloader", [True]) +@pytest.mark.parametrize("use_native_downloader", [False]) def test_csv_read_data_custom_comment(use_native_downloader): - with _csv_write_helper( - header=["id", "data"], - data=[ - ["1", "aa"], - ["# 2", "aa"], - ["3", "aa"], - ], - delimiter="," - ) as f: + + with tempfile.TemporaryDirectory() as directory_name: + file = os.path.join(directory_name, "tempfile") + with open(file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["id", "data"]) + writer.writerow(["1", "aa"]) + f.write("# comment line\n") + writer.writerow(["3", "aa"]) + storage_config = storage_config_from_use_native_downloader(use_native_downloader) schema = Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.string())]) @@ -348,7 +349,7 @@ def test_csv_read_data_custom_comment(use_native_downloader): } ) table = table_io.read_csv( - f, + file, schema, storage_config=storage_config, csv_options=TableParseCSVOptions(comment='#'),