diff --git a/daft/daft.pyi b/daft/daft.pyi index 69667deed1..8cc8f40e6e 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -185,23 +185,23 @@ class CsvSourceConfig: Configuration of a CSV data source. """ - delimiter: str + delimiter: str | None has_headers: bool double_quote: bool - quote: str - escape_char: str - comment: str + quote: str | None + escape_char: str | None + comment: str | None buffer_size: int | None chunk_size: int | None def __init__( self, - delimiter: str, has_headers: bool, double_quote: bool, - quote: str, - escape_char: str, - comment: str, + delimiter: str | None, + quote: str | None, + escape_char: str | None, + comment: str | None, buffer_size: int | None = None, chunk_size: int | None = None, ): ... diff --git a/daft/io/_csv.py b/daft/io/_csv.py index 1cf4472ae1..9dcc561cba 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -18,19 +18,19 @@ @PublicAPI def read_csv( - path: Union[str, List[str]], - schema_hints: Optional[Dict[str, DataType]] = None, - has_headers: bool = True, - column_names: Optional[List[str]] = None, - delimiter: Optional[str] = None, - double_quote: bool = True, - quote: Optional[str] = None, - escape_char: Optional[str] = None, - comment: Optional[str] = None, - io_config: Optional["IOConfig"] = None, - use_native_downloader: bool = True, - _buffer_size: Optional[int] = None, - _chunk_size: Optional[int] = None, + path: Union[str, List[str]], + schema_hints: Optional[Dict[str, DataType]] = None, + has_headers: bool = True, + column_names: Optional[List[str]] = None, + delimiter: Optional[str] = None, + double_quote: bool = True, + quote: Optional[str] = None, + escape_char: Optional[str] = None, + comment: Optional[str] = None, + io_config: Optional["IOConfig"] = None, + use_native_downloader: bool = True, + _buffer_size: Optional[int] = None, + _chunk_size: Optional[int] = None, ) -> DataFrame: """Creates a DataFrame from CSV file(s) @@ -46,7 +46,9 @@ def read_csv( disable all schema inference on data being read, and throw an error if data being read is incompatible. has_headers (bool): Whether the CSV has a header or not, defaults to True delimiter (Str): Delimiter used in the CSV, defaults to "," - doubled_quote_escape (bool): Whether to support double quote escapes, defaults to True + doubled_quote (bool): Whether to support double quote escapes, defaults to True + escape_char (str): Character to use as the escape character for double quotes, or defaults to `"` + comment (str): Character to treat as the start of a comment line, or None to not support comments io_config (IOConfig): Config to be used with the native downloader use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This is currently experimental. diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 0b99654528..e2408ae2a3 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -2,7 +2,7 @@ import contextlib import pathlib -from collections.abc import Generator +from collections.abc import Callable, Generator from typing import IO, Union from uuid import uuid4 @@ -22,7 +22,6 @@ TableReadOptions, ) from daft.table import Table -from collections.abc import Callable FileInput = Union[pathlib.Path, str, IO[bytes]] @@ -184,11 +183,14 @@ def __next__(self) -> pa.RecordBatch: def __iter__(self) -> PACSVStreamHelper: return self + def skip_comment(comment: str | None) -> Callable | None: if comment is None: return None else: - return lambda row: 'skip' if row.text.startswith(comment) else 'error' + return lambda row: "skip" if row.text.startswith(comment) else "error" + + def read_csv( file: FileInput, schema: Schema, @@ -243,7 +245,9 @@ def read_csv( from daft.utils import ARROW_VERSION if csv_options.comment is not None and ARROW_VERSION < (7, 0, 0): - raise ValueError("pyarrow < 7.0.0 doesn't support handling comments in CSVs, please upgrade pyarrow to 7.0.0+.") + raise ValueError( + "pyarrow < 7.0.0 doesn't support handling comments in CSVs, please upgrade pyarrow to 7.0.0+." + ) parse_options = pacsv.ParseOptions( delimiter=csv_options.delimiter, diff --git a/src/daft-csv/src/lib.rs b/src/daft-csv/src/lib.rs index 0741c98428..4e9af21c81 100644 --- a/src/daft-csv/src/lib.rs +++ b/src/daft-csv/src/lib.rs @@ -18,8 +18,11 @@ pub enum Error { IOError { source: daft_io::Error }, #[snafu(display("{source}"))] CSVError { source: csv_async::Error }, - #[snafu(display("Invalid char: {}",val))] - WrongChar { source: std::char::TryFromCharError, val: char }, + #[snafu(display("Invalid char: {}", val))] + WrongChar { + source: std::char::TryFromCharError, + val: char, + }, #[snafu(display("{source}"))] ArrowError { source: arrow2::error::Error }, #[snafu(display("Error joining spawned task: {}", source))] diff --git a/src/daft-csv/src/metadata.rs b/src/daft-csv/src/metadata.rs index fe86ef922d..5e04846266 100644 --- a/src/daft-csv/src/metadata.rs +++ b/src/daft-csv/src/metadata.rs @@ -12,12 +12,13 @@ use tokio::{ }; use tokio_util::io::StreamReader; +use crate::read::char_to_byte; use crate::{compression::CompressionCodec, schema::merge_schema}; use daft_decoding::inference::infer; -use crate::read::char_to_byte; const DEFAULT_COLUMN_PREFIX: &str = "column_"; +#[allow(clippy::too_many_arguments)] pub fn read_csv_schema( uri: &str, has_header: bool, @@ -50,6 +51,7 @@ pub fn read_csv_schema( }) } +#[allow(clippy::too_many_arguments)] pub(crate) async fn read_csv_schema_single( uri: &str, has_header: bool, @@ -99,6 +101,7 @@ pub(crate) async fn read_csv_schema_single( } } +#[allow(clippy::too_many_arguments)] async fn read_csv_schema_from_compressed_reader( reader: R, compression_codec: Option, @@ -143,6 +146,7 @@ where } } +#[allow(clippy::too_many_arguments)] async fn read_csv_schema_from_uncompressed_reader( reader: R, has_header: bool, @@ -177,6 +181,7 @@ where )) } +#[allow(clippy::too_many_arguments)] async fn read_csv_arrow_schema_from_uncompressed_reader( reader: R, has_header: bool, diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 697a0d14ae..8c4a14b98b 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -29,7 +29,6 @@ use crate::metadata::read_csv_schema_single; use crate::{compression::CompressionCodec, ArrowSnafu, Error}; use daft_decoding::deserialize::deserialize_column; - pub fn char_to_byte(c: Option) -> Result, Error> { match c.map(u8::try_from).transpose() { Ok(b) => Ok(b), @@ -624,7 +623,19 @@ mod tests { .into(), ); if compression.is_none() { - check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None,None, None,None, None, None); + check_equal_local_arrow2( + file.as_ref(), + &table, + true, + None, + true, + None, + None, + None, + None, + None, + None, + ); } Ok(()) @@ -878,10 +889,7 @@ mod tests { #[test] fn test_csv_read_local_escape() -> DaftResult<()> { - let file = format!( - "{}/test/iris_tiny_escape.csv", - env!("CARGO_MANIFEST_DIR"), - ); + let file = format!("{}/test/iris_tiny_escape.csv", env!("CARGO_MANIFEST_DIR"),); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -938,10 +946,7 @@ mod tests { #[test] fn test_csv_read_local_comment() -> DaftResult<()> { - let file = format!( - "{}/test/iris_tiny_comment.csv", - env!("CARGO_MANIFEST_DIR"), - ); + let file = format!("{}/test/iris_tiny_comment.csv", env!("CARGO_MANIFEST_DIR"),); let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1035,7 +1040,19 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None,None, None, None, None, Some(5)); + check_equal_local_arrow2( + file.as_ref(), + &table, + true, + None, + true, + None, + None, + None, + None, + None, + Some(5), + ); Ok(()) } @@ -1198,7 +1215,19 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None,None, None, None, None, None); + check_equal_local_arrow2( + file.as_ref(), + &table, + true, + None, + true, + None, + None, + None, + None, + None, + None, + ); Ok(()) } @@ -1243,7 +1272,19 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None,None, None, None, None); + check_equal_local_arrow2( + file.as_ref(), + &table, + true, + None, + true, + None, + None, + None, + None, + None, + None, + ); Ok(()) } @@ -1288,7 +1329,19 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None,None,None, None, None); + check_equal_local_arrow2( + file.as_ref(), + &table, + true, + None, + true, + None, + None, + None, + None, + None, + None, + ); Ok(()) } @@ -1333,7 +1386,19 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None,None, None, None, None); + check_equal_local_arrow2( + file.as_ref(), + &table, + true, + None, + true, + None, + None, + None, + None, + None, + None, + ); Ok(()) } @@ -1504,7 +1569,19 @@ mod tests { ])? .into(), ); - check_equal_local_arrow2(file.as_ref(), &table, true, None, true, None, None,None, None, None, None); + check_equal_local_arrow2( + file.as_ref(), + &table, + true, + None, + true, + None, + None, + None, + None, + None, + None, + ); Ok(()) } diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 16c0762b2e..f0946748ba 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -18,7 +18,7 @@ use daft_table::Table; use snafu::ResultExt; -use crate::{DaftCoreComputeSnafu}; +use crate::DaftCoreComputeSnafu; #[cfg(feature = "python")] use crate::PyIOSnafu; diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 872f3521ed..1a028b79ad 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -389,7 +389,6 @@ impl PyMicroPartition { buffer_size: Option, chunk_size: Option, ) -> PyResult { - let mp = py.allow_threads(|| { let io_stats = IOStatsContext::new(format!("read_csv: for uri {uri}")); let io_config = io_config.unwrap_or_default().config.into(); diff --git a/src/daft-scan/src/file_format.rs b/src/daft-scan/src/file_format.rs index 72341b0906..5a733bbf7f 100644 --- a/src/daft-scan/src/file_format.rs +++ b/src/daft-scan/src/file_format.rs @@ -7,8 +7,8 @@ use std::{str::FromStr, sync::Arc}; use { daft_core::python::datatype::PyTimeUnit, pyo3::{ - pyclass, pyclass::CompareOp, pymethods, types::PyBytes, IntoPy, - PyObject, PyResult, PyTypeInfo, Python, ToPyObject, + pyclass, pyclass::CompareOp, pymethods, types::PyBytes, IntoPy, PyObject, PyResult, + PyTypeInfo, Python, ToPyObject, }, }; @@ -127,11 +127,12 @@ impl CsvSourceConfig { /// * `has_headers` - Whether the CSV has a header row; if so, it will be skipped during data parsing. /// * `buffer_size` - Size of the buffer (in bytes) used by the streaming reader. /// * `chunk_size` - Size of the chunks (in bytes) deserialized in parallel by the streaming reader. + #[allow(clippy::too_many_arguments)] #[new] fn new( - delimiter: Option, has_headers: bool, double_quote: bool, + delimiter: Option, quote: Option, escape_char: Option, comment: Option,