diff --git a/Cargo.lock b/Cargo.lock index ddb418c9b7..a013e5abc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1112,6 +1112,7 @@ dependencies = [ "async-compat", "async-compression", "async-stream", + "bincode", "bytes", "chrono", "chrono-tz", @@ -1128,6 +1129,7 @@ dependencies = [ "pyo3-log", "rayon", "rstest", + "serde", "simdutf8", "snafu", "tokio", diff --git a/daft/daft.pyi b/daft/daft.pyi index 8cc8f40e6e..3055883316 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -244,6 +244,60 @@ class FileFormatConfig: def __eq__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override] def __ne__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override] +class CsvConvertOptions: + """ + Options for converting CSV data to Daft data. + """ + + limit: int | None + include_columns: list[str] | None + column_names: list[str] | None + schema: PySchema | None + + def __init__( + self, + limit: int | None = None, + include_columns: list[str] | None = None, + column_names: list[str] | None = None, + schema: PySchema | None = None, + ): ... + +class CsvParseOptions: + """ + Options for parsing CSV files. + """ + + has_header: bool + delimiter: str | None + double_quote: bool + quote: str | None + escape_char: str | None + comment: str | None + + def __init__( + self, + has_header: bool = True, + delimiter: str | None = None, + double_quote: bool = True, + quote: str | None = None, + escape_char: str | None = None, + comment: str | None = None, + ): ... + +class CsvReadOptions: + """ + Options for reading CSV files. + """ + + buffer_size: int | None + chunk_size: int | None + + def __init__( + self, + buffer_size: int | None = None, + chunk_size: int | None = None, + ): ... + class FileInfo: """ Metadata for a single file. @@ -521,29 +575,15 @@ def read_parquet_schema( ): ... def read_csv( uri: str, - column_names: list[str] | None = None, - include_columns: list[str] | None = None, - num_rows: int | None = None, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + convert_options: CsvConvertOptions | None = None, + parse_options: CsvParseOptions | None = None, + read_options: CsvReadOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, - schema: PySchema | None = None, - buffer_size: int | None = None, - chunk_size: int | None = None, ): ... def read_csv_schema( uri: str, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + parse_options: CsvParseOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, ): ... @@ -877,20 +917,11 @@ class PyMicroPartition: def read_csv( cls, uri: str, - column_names: list[str] | None = None, - include_columns: list[str] | None = None, - num_rows: int | None = None, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + convert_options: CsvConvertOptions | None = None, + parse_options: CsvParseOptions | None = None, + read_options: CsvReadOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, - schema: PySchema | None = None, - buffer_size: int | None = None, - chunk_size: int | None = None, ): ... class PhysicalPlanScheduler: diff --git a/daft/logical/schema.py b/daft/logical/schema.py index e1a2b30112..fe3cf72fe6 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -3,6 +3,7 @@ import sys from typing import TYPE_CHECKING, Iterator +from daft.daft import CsvParseOptions from daft.daft import PyField as _PyField from daft.daft import PySchema as _PySchema from daft.daft import read_csv_schema as _read_csv_schema @@ -164,24 +165,14 @@ def from_parquet( def from_csv( cls, path: str, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + parse_options: CsvParseOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, ) -> Schema: return Schema._from_pyschema( _read_csv_schema( uri=path, - has_header=has_header, - delimiter=delimiter, - double_quote=double_quote, - quote=quote, - escape_char=escape_char, - comment=comment, + parse_options=parse_options, io_config=io_config, multithreaded_io=multithreaded_io, ) diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index 74e56c67d7..c8bb398cd0 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -5,7 +5,13 @@ import pyarrow as pa -from daft.daft import IOConfig, JoinType +from daft.daft import ( + CsvConvertOptions, + CsvParseOptions, + CsvReadOptions, + IOConfig, + JoinType, +) from daft.daft import PyMicroPartition as _PyMicroPartition from daft.daft import PyTable as _PyTable from daft.daft import ScanTask as _ScanTask @@ -352,37 +358,19 @@ def read_parquet_bulk( def read_csv( cls, path: str, - column_names: list[str] | None = None, - include_columns: list[str] | None = None, - num_rows: int | None = None, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + convert_options: CsvConvertOptions, + parse_options: CsvParseOptions, + read_options: CsvReadOptions, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, - schema: Schema | None = None, - buffer_size: int | None = None, - chunk_size: int | None = None, ) -> MicroPartition: return MicroPartition._from_pymicropartition( _PyMicroPartition.read_csv( uri=path, - column_names=column_names, - include_columns=include_columns, - num_rows=num_rows, - has_header=has_header, - delimiter=delimiter, - double_quote=double_quote, - quote=quote, - escape_char=escape_char, - comment=comment, + convert_options=convert_options, + parse_options=parse_options, + read_options=read_options, io_config=io_config, multithreaded_io=multithreaded_io, - schema=schema._schema if schema is not None else None, - buffer_size=buffer_size, - chunk_size=chunk_size, ) ) diff --git a/daft/table/schema_inference.py b/daft/table/schema_inference.py index f76d101bd4..9b8d43094a 100644 --- a/daft/table/schema_inference.py +++ b/daft/table/schema_inference.py @@ -6,7 +6,12 @@ import pyarrow.json as pajson import pyarrow.parquet as papq -from daft.daft import NativeStorageConfig, PythonStorageConfig, StorageConfig +from daft.daft import ( + CsvParseOptions, + NativeStorageConfig, + PythonStorageConfig, + StorageConfig, +) from daft.datatype import DataType from daft.filesystem import _resolve_paths_and_filesystem from daft.logical.schema import Schema @@ -41,8 +46,14 @@ def from_csv( io_config = config.io_config return Schema.from_csv( str(file), - has_header=csv_options.header_index is not None, - delimiter=csv_options.delimiter, + parse_options=CsvParseOptions( + has_header=csv_options.header_index is not None, + delimiter=csv_options.delimiter, + double_quote=csv_options.double_quote, + quote=csv_options.quote, + escape_char=csv_options.escape_char, + comment=csv_options.comment, + ), io_config=io_config, ) diff --git a/daft/table/table.py b/daft/table/table.py index 5217f35f21..1669170e23 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -6,7 +6,7 @@ import pyarrow as pa from daft.arrow_utils import ensure_table -from daft.daft import JoinType +from daft.daft import CsvConvertOptions, CsvParseOptions, CsvReadOptions, JoinType from daft.daft import PyTable as _PyTable from daft.daft import ScanTask as _ScanTask from daft.daft import read_csv as _read_csv @@ -448,38 +448,20 @@ def read_parquet_statistics( def read_csv( cls, path: str, - column_names: list[str] | None = None, - include_columns: list[str] | None = None, - num_rows: int | None = None, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + convert_options: CsvConvertOptions | None = None, + parse_options: CsvParseOptions | None = None, + read_options: CsvReadOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, - schema: Schema | None = None, - buffer_size: int | None = None, - chunk_size: int | None = None, ) -> Table: return Table._from_pytable( _read_csv( uri=path, - column_names=column_names, - include_columns=include_columns, - num_rows=num_rows, - has_header=has_header, - delimiter=delimiter, - double_quote=double_quote, - quote=quote, - escape_char=escape_char, - comment=comment, + convert_options=convert_options, + parse_options=parse_options, + read_options=read_options, io_config=io_config, multithreaded_io=multithreaded_io, - schema=schema._schema if schema is not None else None, - buffer_size=buffer_size, - chunk_size=chunk_size, ) ) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index e2408ae2a3..5ce668012b 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -12,7 +12,15 @@ from pyarrow import json as pajson from pyarrow import parquet as papq -from daft.daft import IOConfig, NativeStorageConfig, PythonStorageConfig, StorageConfig +from daft.daft import ( + CsvConvertOptions, + CsvParseOptions, + CsvReadOptions, + IOConfig, + NativeStorageConfig, + PythonStorageConfig, + StorageConfig, +) from daft.expressions import ExpressionsProjection from daft.filesystem import _resolve_paths_and_filesystem from daft.logical.schema import Schema @@ -219,21 +227,27 @@ def read_csv( file, (str, pathlib.Path) ), "Native downloader only works on string inputs to read_parquet" has_header = csv_options.header_index is not None - tbl = Table.read_csv( - str(file), - column_names=schema.column_names() if not has_header else None, + csv_convert_options = CsvConvertOptions( + limit=read_options.num_rows, include_columns=read_options.column_names, - num_rows=read_options.num_rows, + column_names=schema.column_names() if not has_header else None, + schema=schema._schema if schema is not None else None, + ) + csv_parse_options = CsvParseOptions( has_header=has_header, delimiter=csv_options.delimiter, double_quote=csv_options.double_quote, quote=csv_options.quote, escape_char=csv_options.escape_char, comment=csv_options.comment, + ) + csv_read_options = CsvReadOptions(buffer_size=csv_options.buffer_size, chunk_size=csv_options.chunk_size) + tbl = Table.read_csv( + str(file), + convert_options=csv_convert_options, + parse_options=csv_parse_options, + read_options=csv_read_options, io_config=config.io_config, - schema=schema, - buffer_size=csv_options.buffer_size, - chunk_size=csv_options.chunk_size, ) return _cast_table_to_schema(tbl, read_options=read_options, schema=schema) else: @@ -241,7 +255,6 @@ def read_csv( io_config = config.io_config with _open_stream(file, io_config) as f: - from daft.utils import ARROW_VERSION if csv_options.comment is not None and ARROW_VERSION < (7, 0, 0): diff --git a/src/daft-csv/Cargo.toml b/src/daft-csv/Cargo.toml index 36f97fb765..d0e164b7d9 100644 --- a/src/daft-csv/Cargo.toml +++ b/src/daft-csv/Cargo.toml @@ -3,6 +3,7 @@ arrow2 = {workspace = true, features = ["io_csv", "io_csv_async"]} async-compat = {workspace = true} async-compression = {workspace = true} async-stream = {workspace = true} +bincode = {workspace = true} bytes = {workspace = true} chrono = {workspace = true} chrono-tz = {workspace = true} @@ -18,6 +19,7 @@ log = {workspace = true} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true, optional = true} rayon = {workspace = true} +serde = {workspace = true} simdutf8 = "0.1.3" snafu = {workspace = true} tokio = {workspace = true} diff --git a/src/daft-csv/src/lib.rs b/src/daft-csv/src/lib.rs index 4e9af21c81..7541563578 100644 --- a/src/daft-csv/src/lib.rs +++ b/src/daft-csv/src/lib.rs @@ -1,16 +1,23 @@ #![feature(async_closure)] #![feature(let_chains)] +#![feature(trait_alias)] +#![feature(trait_upcasting)] use common_error::DaftError; use snafu::Snafu; mod compression; pub mod metadata; +pub mod options; #[cfg(feature = "python")] pub mod python; pub mod read; mod schema; + +pub use metadata::read_csv_schema_bulk; +pub use options::{char_to_byte, CsvConvertOptions, CsvParseOptions, CsvReadOptions}; #[cfg(feature = "python")] -pub use python::register_modules; +use pyo3::prelude::*; +pub use read::{read_csv, read_csv_bulk}; #[derive(Debug, Snafu)] pub enum Error { @@ -50,3 +57,23 @@ impl From for Error { Error::IOError { source: err } } } + +#[cfg(feature = "python")] +impl From for pyo3::PyErr { + fn from(value: Error) -> Self { + let daft_error: DaftError = value.into(); + daft_error.into() + } +} + +type Result = std::result::Result; + +#[cfg(feature = "python")] +pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { + parent.add_class::()?; + parent.add_class::()?; + parent.add_class::()?; + parent.add_wrapped(wrap_pyfunction!(python::pylib::read_csv))?; + parent.add_wrapped(wrap_pyfunction!(python::pylib::read_csv_schema))?; + Ok(()) +} diff --git a/src/daft-csv/src/metadata.rs b/src/daft-csv/src/metadata.rs index 5e04846266..75ac0ad43a 100644 --- a/src/daft-csv/src/metadata.rs +++ b/src/daft-csv/src/metadata.rs @@ -6,42 +6,62 @@ use common_error::DaftResult; use csv_async::ByteRecord; use daft_core::schema::Schema; use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef}; +use futures::{StreamExt, TryStreamExt}; +use snafu::ResultExt; use tokio::{ fs::File, io::{AsyncBufRead, AsyncRead, BufReader}, }; use tokio_util::io::StreamReader; -use crate::read::char_to_byte; -use crate::{compression::CompressionCodec, schema::merge_schema}; +use crate::{compression::CompressionCodec, schema::merge_schema, CsvParseOptions}; use daft_decoding::inference::infer; const DEFAULT_COLUMN_PREFIX: &str = "column_"; -#[allow(clippy::too_many_arguments)] +#[derive(Debug, Clone)] +pub struct CsvReadStats { + pub total_bytes_read: usize, + pub total_records_read: usize, + pub mean_record_size_bytes: f64, + pub stddev_record_size_bytes: f64, +} + +impl CsvReadStats { + pub fn new( + total_bytes_read: usize, + total_records_read: usize, + mean_record_size_bytes: f64, + stddev_record_size_bytes: f64, + ) -> Self { + Self { + total_bytes_read, + total_records_read, + mean_record_size_bytes, + stddev_record_size_bytes, + } + } +} + +impl Default for CsvReadStats { + fn default() -> Self { + Self::new(0, 0, 0f64, 0f64) + } +} + pub fn read_csv_schema( uri: &str, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: Option, max_bytes: Option, io_client: Arc, io_stats: Option, -) -> DaftResult<(Schema, usize, usize, f64, f64)> { +) -> DaftResult<(Schema, CsvReadStats)> { let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); runtime_handle.block_on(async { read_csv_schema_single( uri, - has_header, - char_to_byte(delimiter)?, - double_quote, - char_to_byte(quote)?, - char_to_byte(escape_char)?, - char_to_byte(comment)?, + parse_options.unwrap_or_default(), // Default to 1 MiB. max_bytes.or(Some(1024 * 1024)), io_client, @@ -51,19 +71,50 @@ pub fn read_csv_schema( }) } -#[allow(clippy::too_many_arguments)] +pub async fn read_csv_schema_bulk( + uris: &[&str], + parse_options: Option, + max_bytes: Option, + io_client: Arc, + io_stats: Option, + num_parallel_tasks: usize, +) -> DaftResult> { + let runtime_handle = get_runtime(true)?; + let _rt_guard = runtime_handle.enter(); + let result = runtime_handle + .block_on(async { + let task_stream = futures::stream::iter(uris.iter().map(|uri| { + let owned_string = uri.to_string(); + let owned_client = io_client.clone(); + let owned_io_stats = io_stats.clone(); + let owned_parse_options = parse_options.clone(); + tokio::spawn(async move { + read_csv_schema_single( + &owned_string, + owned_parse_options.unwrap_or_default(), + max_bytes, + owned_client, + owned_io_stats, + ) + .await + }) + })); + task_stream + .buffered(num_parallel_tasks) + .try_collect::>() + .await + }) + .context(super::JoinSnafu {})?; + result.into_iter().collect::>>() +} + pub(crate) async fn read_csv_schema_single( uri: &str, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: CsvParseOptions, max_bytes: Option, io_client: Arc, io_stats: Option, -) -> DaftResult<(Schema, usize, usize, f64, f64)> { +) -> DaftResult<(Schema, CsvReadStats)> { let compression_codec = CompressionCodec::from_uri(uri); match io_client .single_url_get(uri.to_string(), None, io_stats) @@ -73,12 +124,7 @@ pub(crate) async fn read_csv_schema_single( read_csv_schema_from_compressed_reader( BufReader::new(File::open(file.path).await?), compression_codec, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, + parse_options, max_bytes, ) .await @@ -87,12 +133,7 @@ pub(crate) async fn read_csv_schema_single( read_csv_schema_from_compressed_reader( StreamReader::new(stream), compression_codec, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, + parse_options, // Truncate max_bytes to size if both are set. max_bytes.map(|m| size.map(|s| m.min(s)).unwrap_or(m)), ) @@ -105,14 +146,9 @@ pub(crate) async fn read_csv_schema_single( async fn read_csv_schema_from_compressed_reader( reader: R, compression_codec: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: CsvParseOptions, max_bytes: Option, -) -> DaftResult<(Schema, usize, usize, f64, f64)> +) -> DaftResult<(Schema, CsvReadStats)> where R: AsyncBufRead + Unpin + Send + 'static, { @@ -120,99 +156,50 @@ where Some(compression) => { read_csv_schema_from_uncompressed_reader( compression.to_decoder(reader), - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - max_bytes, - ) - .await - } - None => { - read_csv_schema_from_uncompressed_reader( - reader, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, + parse_options, max_bytes, ) .await } + None => read_csv_schema_from_uncompressed_reader(reader, parse_options, max_bytes).await, } } #[allow(clippy::too_many_arguments)] async fn read_csv_schema_from_uncompressed_reader( reader: R, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: CsvParseOptions, max_bytes: Option, -) -> DaftResult<(Schema, usize, usize, f64, f64)> +) -> DaftResult<(Schema, CsvReadStats)> where R: AsyncRead + Unpin + Send, { - let (schema, total_bytes_read, num_records_read, mean_size, std_size) = - read_csv_arrow_schema_from_uncompressed_reader( - reader, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - max_bytes, - ) - .await?; - Ok(( - Schema::try_from(&schema)?, - total_bytes_read, - num_records_read, - mean_size, - std_size, - )) + let (schema, read_stats) = + read_csv_arrow_schema_from_uncompressed_reader(reader, parse_options, max_bytes).await?; + Ok((Schema::try_from(&schema)?, read_stats)) } #[allow(clippy::too_many_arguments)] async fn read_csv_arrow_schema_from_uncompressed_reader( reader: R, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: CsvParseOptions, max_bytes: Option, -) -> DaftResult<(arrow2::datatypes::Schema, usize, usize, f64, f64)> +) -> DaftResult<(arrow2::datatypes::Schema, CsvReadStats)> where R: AsyncRead + Unpin + Send, { let mut reader = AsyncReaderBuilder::new() - .has_headers(has_header) - .delimiter(delimiter.unwrap_or(b',')) - .double_quote(double_quote) - .quote(quote.unwrap_or(b'"')) - .escape(escape_char) - .comment(comment) + .has_headers(parse_options.has_header) + .delimiter(parse_options.delimiter) + .double_quote(parse_options.double_quote) + .quote(parse_options.quote) + .escape(parse_options.escape_char) + .comment(parse_options.comment) .buffer_capacity(max_bytes.unwrap_or(1 << 20).min(1 << 20)) .create_reader(reader.compat()); - let (fields, total_bytes_read, num_records_read, mean_size, std_size) = - infer_schema(&mut reader, None, max_bytes, has_header).await?; - Ok(( - fields.into(), - total_bytes_read, - num_records_read, - mean_size, - std_size, - )) + let (fields, read_stats) = + infer_schema(&mut reader, None, max_bytes, parse_options.has_header).await?; + Ok((fields.into(), read_stats)) } async fn infer_schema( @@ -220,7 +207,7 @@ async fn infer_schema( max_rows: Option, max_bytes: Option, has_header: bool, -) -> arrow2::error::Result<(Vec, usize, usize, f64, f64)> +) -> arrow2::error::Result<(Vec, CsvReadStats)> where R: futures::AsyncRead + Unpin + Send, { @@ -240,7 +227,7 @@ where } else { // Save the csv reader position before reading headers if !reader.read_byte_record(&mut record).await? { - return Ok((vec![], 0, 0, 0f64, 0f64)); + return Ok((vec![], Default::default())); } let first_record_count = record.len(); ( @@ -292,7 +279,10 @@ where } let fields = merge_schema(&headers, &mut column_types); let std = (m2 / ((records_count - 1) as f64)).sqrt(); - Ok((fields, total_bytes, records_count, mean, std)) + Ok(( + fields, + CsvReadStats::new(total_bytes, records_count, mean, std), + )) } #[cfg(test)] @@ -304,6 +294,8 @@ mod tests { use daft_io::{IOClient, IOConfig}; use rstest::rstest; + use crate::CsvParseOptions; + use super::read_csv_schema; #[rstest] @@ -340,18 +332,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -362,8 +344,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 328); - assert_eq!(num_records_read, 20); + assert_eq!(read_stats.total_bytes_read, 328); + assert_eq!(read_stats.total_records_read, 20); Ok(()) } @@ -379,14 +361,9 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( + let (schema, read_stats) = read_csv_schema( file.as_ref(), - true, - Some('|'), - true, - None, - None, - None, + Some(CsvParseOptions::default().with_delimiter(b'|')), None, io_client.clone(), None, @@ -401,8 +378,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 328); - assert_eq!(num_records_read, 20); + assert_eq!(read_stats.total_bytes_read, 328); + assert_eq!(read_stats.total_records_read, 20); Ok(()) } @@ -415,20 +392,9 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (_, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; - assert_eq!(total_bytes_read, 328); - assert_eq!(num_records_read, 20); + let (_, read_stats) = read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; + assert_eq!(read_stats.total_bytes_read, 328); + assert_eq!(read_stats.total_records_read, 20); Ok(()) } @@ -444,14 +410,9 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( + let (schema, read_stats) = read_csv_schema( file.as_ref(), - false, - None, - true, - None, - None, - None, + Some(CsvParseOptions::default().with_has_header(false)), None, io_client.clone(), None, @@ -466,8 +427,8 @@ mod tests { Field::new("column_5", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 328); - assert_eq!(num_records_read, 20); + assert_eq!(read_stats.total_bytes_read, 328); + assert_eq!(read_stats.total_records_read, 20); Ok(()) } @@ -483,18 +444,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -505,8 +456,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 49); - assert_eq!(num_records_read, 3); + assert_eq!(read_stats.total_bytes_read, 49); + assert_eq!(read_stats.total_records_read, 3); Ok(()) } @@ -519,18 +470,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -541,8 +482,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 82); - assert_eq!(num_records_read, 6); + assert_eq!(read_stats.total_bytes_read, 82); + assert_eq!(read_stats.total_records_read, 6); Ok(()) } @@ -558,18 +499,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -581,8 +512,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 33); - assert_eq!(num_records_read, 2); + assert_eq!(read_stats.total_bytes_read, 33); + assert_eq!(read_stats.total_records_read, 2); Ok(()) } @@ -595,18 +526,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - Some(100), - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, Some(100), io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -618,8 +539,16 @@ mod tests { ])?, ); // Max bytes doesn't include header, so add 15 bytes to upper bound. - assert!(total_bytes_read <= 100 + 15, "{}", total_bytes_read); - assert!(num_records_read <= 10, "{}", num_records_read); + assert!( + read_stats.total_bytes_read <= 100 + 15, + "{}", + read_stats.total_bytes_read + ); + assert!( + read_stats.total_records_read <= 10, + "{}", + read_stats.total_records_read + ); Ok(()) } @@ -635,18 +564,7 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let err = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - ); + let err = read_csv_schema(file.as_ref(), None, None, io_client.clone(), None); assert!(err.is_err()); let err = err.unwrap_err(); assert!(matches!(err, DaftError::ArrowError(_)), "{}", err); @@ -673,12 +591,7 @@ mod tests { let err = read_csv_schema( file.as_ref(), - true, - None, - true, - None, - None, - None, + Some(CsvParseOptions::default().with_has_header(false)), None, io_client.clone(), None, @@ -729,18 +642,7 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, _, _, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, _) = read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ diff --git a/src/daft-csv/src/options.rs b/src/daft-csv/src/options.rs new file mode 100644 index 0000000000..eceebadfd7 --- /dev/null +++ b/src/daft-csv/src/options.rs @@ -0,0 +1,352 @@ +use daft_core::{impl_bincode_py_state_serialization, schema::SchemaRef}; +use serde::{Deserialize, Serialize}; +#[cfg(feature = "python")] +use { + daft_core::python::schema::PySchema, + pyo3::{ + pyclass, pyclass::CompareOp, pymethods, types::PyBytes, PyObject, PyResult, PyTypeInfo, + Python, ToPyObject, + }, +}; + +/// Options for converting CSV data to Daft data. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] +pub struct CsvConvertOptions { + pub limit: Option, + pub include_columns: Option>, + pub column_names: Option>, + pub schema: Option, +} + +impl CsvConvertOptions { + pub fn new_internal( + limit: Option, + include_columns: Option>, + column_names: Option>, + schema: Option, + ) -> Self { + Self { + limit, + include_columns, + column_names, + schema, + } + } + + pub fn with_limit(self, limit: Option) -> Self { + Self { + limit, + include_columns: self.include_columns, + column_names: self.column_names, + schema: self.schema, + } + } + + pub fn with_include_columns(self, include_columns: Option>) -> Self { + Self { + limit: self.limit, + include_columns, + column_names: self.column_names, + schema: self.schema, + } + } + + pub fn with_column_names(self, column_names: Option>) -> Self { + Self { + limit: self.limit, + include_columns: self.include_columns, + column_names, + schema: self.schema, + } + } + + pub fn with_schema(self, schema: Option) -> Self { + Self { + limit: self.limit, + include_columns: self.include_columns, + column_names: self.column_names, + schema, + } + } +} + +impl Default for CsvConvertOptions { + fn default() -> Self { + Self::new_internal(None, None, None, None) + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl CsvConvertOptions { + /// Create conversion options for the CSV reader. + /// + /// # Arguments: + /// + /// * `limit` - Only read this many rows. + /// * `include_columns` - The names of the columns that should be kept, e.g. via a projection. + /// * `column_names` - The names for the CSV columns. + /// * `schema` - The names and dtypes for the CSV columns. + #[new] + #[pyo3(signature = (limit=None, include_columns=None, column_names=None, schema=None))] + pub fn new( + limit: Option, + include_columns: Option>, + column_names: Option>, + schema: Option, + ) -> Self { + Self::new_internal( + limit, + include_columns, + column_names, + schema.map(|s| s.into()), + ) + } + + #[getter] + pub fn get_limit(&self) -> PyResult> { + Ok(self.limit) + } + + #[getter] + pub fn get_include_columns(&self) -> PyResult>> { + Ok(self.include_columns.clone()) + } + + #[getter] + pub fn get_column_names(&self) -> PyResult>> { + Ok(self.column_names.clone()) + } + + #[getter] + pub fn get_schema(&self) -> PyResult> { + Ok(self.schema.as_ref().map(|s| s.clone().into())) + } + + fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { + match op { + CompareOp::Eq => self == other, + CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq), + _ => unimplemented!("not implemented"), + } + } + + pub fn __str__(&self) -> PyResult { + Ok(format!("{:?}", self)) + } +} + +impl_bincode_py_state_serialization!(CsvConvertOptions); + +/// Options for parsing CSV files. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] +pub struct CsvParseOptions { + pub has_header: bool, + pub delimiter: u8, + pub double_quote: bool, + pub quote: u8, + pub escape_char: Option, + pub comment: Option, +} + +impl CsvParseOptions { + pub fn new_internal( + has_header: bool, + delimiter: u8, + double_quote: bool, + quote: u8, + escape_char: Option, + comment: Option, + ) -> Self { + Self { + has_header, + delimiter, + double_quote, + quote, + escape_char, + comment, + } + } + + pub fn new_with_defaults( + has_header: bool, + delimiter: Option, + double_quote: bool, + quote: Option, + escape_char: Option, + comment: Option, + ) -> super::Result { + Ok(Self::new_internal( + has_header, + char_to_byte(delimiter)?.unwrap_or(b','), + double_quote, + char_to_byte(quote)?.unwrap_or(b'"'), + char_to_byte(escape_char)?, + char_to_byte(comment)?, + )) + } + + pub fn with_has_header(self, has_header: bool) -> Self { + Self { has_header, ..self } + } + + pub fn with_delimiter(self, delimiter: u8) -> Self { + Self { delimiter, ..self } + } + + pub fn with_double_quote(self, double_quote: bool) -> Self { + Self { + double_quote, + ..self + } + } + + pub fn with_quote(self, quote: u8) -> Self { + Self { quote, ..self } + } + + pub fn with_escape_char(self, escape_char: Option) -> Self { + Self { + escape_char, + ..self + } + } + + pub fn with_comment(self, comment: Option) -> Self { + Self { comment, ..self } + } +} + +impl Default for CsvParseOptions { + fn default() -> Self { + Self::new_with_defaults(true, None, true, None, None, None).unwrap() + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl CsvParseOptions { + /// Create parsing options for the CSV reader. + /// + /// # Arguments: + /// + /// * `has_headers` - Whether the CSV has a header row; if so, it will be skipped during data parsing. + /// * `delimiter` - The character delmiting individual cells in the CSV data. + /// * `double_quote` - Whether double-quote escapes are enabled. + /// * `quote` - The character to use for quoting strings. + /// * `escape_char` - The character to use as an escape character. + /// * `comment` - The character at the start of a line that indicates that the rest of the line is a comment, + /// which should be ignored while parsing. + #[new] + #[pyo3(signature = (has_header=true, delimiter=None, double_quote=false, quote=None, escape_char=None, comment=None))] + pub fn new( + has_header: bool, + delimiter: Option, + double_quote: bool, + quote: Option, + escape_char: Option, + comment: Option, + ) -> PyResult { + Ok(Self::new_with_defaults( + has_header, + delimiter, + double_quote, + quote, + escape_char, + comment, + )?) + } + + fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { + match op { + CompareOp::Eq => self == other, + CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq), + _ => unimplemented!("not implemented"), + } + } + + pub fn __str__(&self) -> PyResult { + Ok(format!("{:?}", self)) + } +} + +pub fn char_to_byte(c: Option) -> Result, super::Error> { + match c.map(u8::try_from).transpose() { + Ok(b) => Ok(b), + Err(e) => Err(super::Error::WrongChar { + source: e, + val: c.unwrap_or(' '), + }), + } +} + +impl_bincode_py_state_serialization!(CsvParseOptions); + +/// Options for reading CSV files. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] +pub struct CsvReadOptions { + pub buffer_size: Option, + pub chunk_size: Option, +} + +impl CsvReadOptions { + pub fn new_internal(buffer_size: Option, chunk_size: Option) -> Self { + Self { + buffer_size, + chunk_size, + } + } + + pub fn with_buffer_size(self, buffer_size: Option) -> Self { + Self { + buffer_size, + chunk_size: self.chunk_size, + } + } + + pub fn with_chunk_size(self, chunk_size: Option) -> Self { + Self { + buffer_size: self.buffer_size, + chunk_size, + } + } +} + +impl Default for CsvReadOptions { + fn default() -> Self { + Self::new_internal(None, None) + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl CsvReadOptions { + /// Create reading options for the CSV reader. + /// + /// # Arguments: + /// + /// * `buffer_size` - Size of the buffer (in bytes) used by the streaming reader. + /// * `chunk_size` - Size of the chunks (in bytes) deserialized in parallel by the streaming reader. + #[new] + #[pyo3(signature = (buffer_size=None, chunk_size=None))] + pub fn new(buffer_size: Option, chunk_size: Option) -> Self { + Self::new_internal(buffer_size, chunk_size) + } + + fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { + match op { + CompareOp::Eq => self == other, + CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq), + _ => unimplemented!("not implemented"), + } + } + + pub fn __str__(&self) -> PyResult { + Ok(format!("{:?}", self)) + } +} + +impl_bincode_py_state_serialization!(CsvReadOptions); diff --git a/src/daft-csv/src/python.rs b/src/daft-csv/src/python.rs index 965af9d750..229f4ada65 100644 --- a/src/daft-csv/src/python.rs +++ b/src/daft-csv/src/python.rs @@ -1,5 +1,3 @@ -use pyo3::prelude::*; - pub mod pylib { use std::sync::Arc; @@ -8,25 +6,17 @@ pub mod pylib { use daft_table::python::PyTable; use pyo3::{pyfunction, PyResult, Python}; + use crate::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; + #[pyfunction] - #[allow(clippy::too_many_arguments)] pub fn read_csv( py: Python, uri: &str, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: Option, - delimiter: Option, - double_quote: Option, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_config: Option, multithreaded_io: Option, - schema: Option, - buffer_size: Option, - chunk_size: Option, ) -> PyResult { py.allow_threads(|| { let io_stats = IOStatsContext::new(format!("read_csv: for uri {uri}")); @@ -37,21 +27,12 @@ pub mod pylib { )?; Ok(crate::read::read_csv( uri, - column_names, - include_columns, - num_rows, - has_header.unwrap_or(true), - delimiter, - double_quote.unwrap_or(true), - quote, - escape_char, - comment, + convert_options, + parse_options, + read_options, io_client, Some(io_stats), multithreaded_io.unwrap_or(true), - schema.map(|s| s.schema), - buffer_size, - chunk_size, None, )? .into()) @@ -59,16 +40,10 @@ pub mod pylib { } #[pyfunction] - #[allow(clippy::too_many_arguments)] pub fn read_csv_schema( py: Python, uri: &str, - has_header: Option, - delimiter: Option, - double_quote: Option, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: Option, max_bytes: Option, io_config: Option, multithreaded_io: Option, @@ -80,14 +55,9 @@ pub mod pylib { multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), )?; - let (schema, _, _, _, _) = crate::metadata::read_csv_schema( + let (schema, _) = crate::metadata::read_csv_schema( uri, - has_header.unwrap_or(true), - delimiter, - double_quote.unwrap_or(true), - quote, - escape_char, - comment, + parse_options, max_bytes, io_client, Some(io_stats), @@ -96,9 +66,3 @@ pub mod pylib { }) } } - -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { - parent.add_wrapped(wrap_pyfunction!(pylib::read_csv))?; - parent.add_wrapped(wrap_pyfunction!(pylib::read_csv_schema))?; - Ok(()) -} diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 8c4a14b98b..a51e56ef60 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -7,77 +7,60 @@ use arrow2::{ use async_compat::{Compat, CompatExt}; use common_error::DaftResult; use csv_async::AsyncReader; -use daft_core::{ - schema::{Schema, SchemaRef}, - utils::arrow::cast_array_for_daft_if_needed, - Series, -}; +use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef}; use daft_table::Table; -use futures::TryStreamExt; +use futures::{Stream, StreamExt, TryStreamExt}; use rayon::prelude::{ IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator, }; -use snafu::{futures::TryFutureExt, ResultExt}; +use snafu::{ + futures::{try_future::Context, TryFutureExt}, + ResultExt, +}; use tokio::{ fs::File, io::{AsyncBufRead, AsyncRead, BufReader}, + task::JoinHandle, }; use tokio_util::io::StreamReader; -use crate::metadata::read_csv_schema_single; -use crate::{compression::CompressionCodec, ArrowSnafu, Error}; +use crate::{compression::CompressionCodec, ArrowSnafu}; +use crate::{metadata::read_csv_schema_single, CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_decoding::deserialize::deserialize_column; -pub fn char_to_byte(c: Option) -> Result, Error> { - match c.map(u8::try_from).transpose() { - Ok(b) => Ok(b), - Err(e) => Err(Error::WrongChar { - source: e, - val: c.unwrap_or(' '), - }), - } -} +trait ByteRecordChunkStream = Stream>>; +trait ColumnArrayChunkStream = Stream< + Item = super::Result< + Context< + JoinHandle>>>, + super::JoinSnafu, + super::Error, + >, + >, +>; #[allow(clippy::too_many_arguments)] pub fn read_csv( uri: &str, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_client: Arc, io_stats: Option, multithreaded_io: bool, - schema: Option, - buffer_size: Option, - chunk_size: Option, max_chunks_in_flight: Option, ) -> DaftResult { let runtime_handle = get_runtime(multithreaded_io)?; let _rt_guard = runtime_handle.enter(); runtime_handle.block_on(async { - read_csv_single( + read_csv_single_into_table( uri, - column_names, - include_columns, - num_rows, - has_header, - char_to_byte(delimiter)?, - double_quote, - char_to_byte(quote)?, - char_to_byte(escape_char)?, - char_to_byte(comment)?, + convert_options, + parse_options, + read_options, io_client, io_stats, - schema, - buffer_size, - chunk_size, max_chunks_in_flight, ) .await @@ -85,295 +68,257 @@ pub fn read_csv( } #[allow(clippy::too_many_arguments)] -async fn read_csv_single( +pub fn read_csv_bulk( + uris: &[&str], + convert_options: Option, + parse_options: Option, + read_options: Option, + io_client: Arc, + io_stats: Option, + multithreaded_io: bool, + max_chunks_in_flight: Option, + num_parallel_tasks: usize, +) -> DaftResult> { + let runtime_handle = get_runtime(multithreaded_io)?; + let _rt_guard = runtime_handle.enter(); + let tables = runtime_handle + .block_on(async move { + // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. + let task_stream = futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| { + let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( + uri.to_string(), + convert_options.clone(), + parse_options.clone(), + read_options.clone(), + io_client.clone(), + io_stats.clone(), + ); + tokio::task::spawn(async move { + let table = read_csv_single_into_table( + uri.as_str(), + convert_options, + parse_options, + read_options, + io_client, + io_stats, + max_chunks_in_flight, + ) + .await?; + Ok((i, table)) + }) + })); + let mut remaining_rows = convert_options + .as_ref() + .and_then(|opts| opts.limit.map(|limit| limit as i64)); + task_stream + // Each task is annotated with its position in the output, so we can use unordered buffering to help mitigate stragglers + // and sort the task results at the end. + .buffer_unordered(num_parallel_tasks) + // Terminate the stream if we have already reached the row limit. With the upstream buffering, we will still read up to + // num_parallel_tasks redundant files. + .try_take_while(|result| { + match (result, remaining_rows) { + // Limit has been met, early-teriminate. + (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), + // Limit has not yet been met, update remaining limit slack and continue. + (Ok((_, table)), Some(rows_left)) => { + remaining_rows = Some(rows_left - table.len() as i64); + futures::future::ready(Ok(true)) + } + // (1) No limit, never early-terminate. + // (2) Encountered error, propagate error to try_collect to allow it to short-circuit. + (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), + } + }) + .try_collect::>() + .await + }) + .context(super::JoinSnafu {})?; + + // Sort the task results by task index, yielding tables whose order matches the input URI order. + let mut collected = tables.into_iter().collect::>>()?; + collected.sort_by_key(|(idx, _)| *idx); + Ok(collected.into_iter().map(|(_, v)| v).collect()) +} + +async fn read_csv_single_into_table( uri: &str, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_client: Arc, io_stats: Option, - schema: Option, - buffer_size: Option, - chunk_size: Option, max_chunks_in_flight: Option, ) -> DaftResult
{ - let (schema, estimated_mean_row_size, estimated_std_row_size) = match schema { + let include_columns = convert_options + .as_ref() + .and_then(|opts| opts.include_columns.clone()); + let (chunk_stream, fields) = read_csv_single_into_stream( + uri, + convert_options.unwrap_or_default(), + parse_options.unwrap_or_default(), + read_options, + io_client, + io_stats, + ) + .await?; + // Default max chunks in flight is set to 2x the number of cores, which should ensure pipelining of reading chunks + // with the parsing of chunks on the rayon threadpool. + let max_chunks_in_flight = max_chunks_in_flight.unwrap_or_else(|| { + std::thread::available_parallelism() + .unwrap_or(NonZeroUsize::new(2).unwrap()) + .checked_mul(2.try_into().unwrap()) + .unwrap() + .try_into() + .unwrap() + }); + // Collect all chunks in chunk x column form. + let chunks = chunk_stream + // Limit the number of chunks we have in flight at any given time. + .try_buffered(max_chunks_in_flight) + .try_collect::>() + .await? + .into_iter() + .collect::>>()?; + // Handle empty table case. + if chunks.is_empty() { + let schema: arrow2::datatypes::Schema = fields.into(); + let daft_schema = Arc::new(Schema::try_from(&schema)?); + return Table::empty(Some(daft_schema)); + } + // Transpose chunk x column into column x chunk. + let mut column_arrays = vec![Vec::with_capacity(chunks.len()); chunks[0].len()]; + for chunk in chunks.into_iter() { + for (idx, col) in chunk.into_iter().enumerate() { + column_arrays[idx].push(col); + } + } + // Build table from chunks. + // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. + chunks_to_table(column_arrays, include_columns, fields) +} + +async fn read_csv_single_into_stream( + uri: &str, + convert_options: CsvConvertOptions, + parse_options: CsvParseOptions, + read_options: Option, + io_client: Arc, + io_stats: Option, +) -> DaftResult<(impl ColumnArrayChunkStream + Send, Vec)> { + let (mut schema, estimated_mean_row_size, estimated_std_row_size) = match convert_options.schema + { Some(schema) => (schema.to_arrow()?, None, None), None => { - let (schema, _, _, mean, std) = read_csv_schema_single( + let (schema, read_stats) = read_csv_schema_single( uri, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, + parse_options.clone(), // Read at most 1 MiB when doing schema inference. Some(1024 * 1024), io_client.clone(), io_stats.clone(), ) .await?; - (schema.to_arrow()?, Some(mean), Some(std)) - } - }; - let compression_codec = CompressionCodec::from_uri(uri); - match io_client - .single_url_get(uri.to_string(), None, io_stats) - .await? - { - GetResult::File(file) => { - read_csv_from_compressed_reader( - BufReader::new(File::open(file.path).await?), - compression_codec, - column_names, - include_columns, - num_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - schema, - // Default buffer size of 512 KiB. - buffer_size.unwrap_or(512 * 1024), - // Default chunk size of 64 KiB. - chunk_size.unwrap_or(64 * 1024), - // Default max chunks in flight is set to 2x the number of cores, which should ensure pipelining of reading chunks - // with the parsing of chunks on the rayon threadpool. - max_chunks_in_flight.unwrap_or( - std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(2).unwrap()) - .checked_mul(2.try_into().unwrap()) - .unwrap() - .try_into() - .unwrap(), - ), - estimated_mean_row_size, - estimated_std_row_size, - ) - .await - } - GetResult::Stream(stream, _, _) => { - read_csv_from_compressed_reader( - StreamReader::new(stream), - compression_codec, - column_names, - include_columns, - num_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - schema, - // Default buffer size of 512 KiB. - buffer_size.unwrap_or(512 * 1024), - // Default chunk size of 64 KiB. - chunk_size.unwrap_or(64 * 1024), - // Default max chunks in flight is set to 2x the number of cores, which should ensure pipelining of reading chunks - // with the parsing of chunks on the rayon threadpool. - max_chunks_in_flight.unwrap_or( - std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(2).unwrap()) - .checked_mul(2.try_into().unwrap()) - .unwrap() - .try_into() - .unwrap(), - ), - estimated_mean_row_size, - estimated_std_row_size, - ) - .await - } - } -} - -#[allow(clippy::too_many_arguments)] -async fn read_csv_from_compressed_reader( - reader: R, - compression_codec: Option, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, - schema: arrow2::datatypes::Schema, - buffer_size: usize, - chunk_size: usize, - max_chunks_in_flight: usize, - estimated_mean_row_size: Option, - estimated_std_row_size: Option, -) -> DaftResult
-where - R: AsyncBufRead + Unpin + Send + 'static, -{ - match compression_codec { - Some(compression) => { - read_csv_from_uncompressed_reader( - compression.to_decoder(reader), - column_names, - include_columns, - num_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - schema, - buffer_size, - chunk_size, - max_chunks_in_flight, - estimated_mean_row_size, - estimated_std_row_size, - ) - .await - } - None => { - read_csv_from_uncompressed_reader( - reader, - column_names, - include_columns, - num_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - schema, - buffer_size, - chunk_size, - max_chunks_in_flight, - estimated_mean_row_size, - estimated_std_row_size, + ( + schema.to_arrow()?, + Some(read_stats.mean_record_size_bytes), + Some(read_stats.stddev_record_size_bytes), ) - .await } - } -} - -#[allow(clippy::too_many_arguments)] -async fn read_csv_from_uncompressed_reader( - stream_reader: R, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, - schema: arrow2::datatypes::Schema, - buffer_size: usize, - chunk_size: usize, - max_chunks_in_flight: usize, - estimated_mean_row_size: Option, - estimated_std_row_size: Option, -) -> DaftResult
-where - R: AsyncRead + Unpin + Send, -{ - let reader = AsyncReaderBuilder::new() - .has_headers(has_header) - .delimiter(delimiter.unwrap_or(b',')) - .double_quote(double_quote) - .quote(quote.unwrap_or(b'"')) - .escape(escape_char) - .comment(comment) - .buffer_capacity(buffer_size) - .create_reader(stream_reader.compat()); - let mut fields = schema.fields; + }; // Rename fields, if necessary. - if let Some(column_names) = column_names { - fields = fields + if let Some(column_names) = convert_options.column_names { + schema = schema + .fields .into_iter() .zip(column_names.iter()) .map(|(field, name)| { - Field::new(*name, field.data_type, field.is_nullable).with_metadata(field.metadata) + Field::new(name, field.data_type, field.is_nullable).with_metadata(field.metadata) }) - .collect(); + .collect::>() + .into(); } - // Read CSV into Arrow2 column chunks. - let column_chunks = read_into_column_chunks( + let (reader, buffer_size, chunk_size): (Box, usize, usize) = + match io_client + .single_url_get(uri.to_string(), None, io_stats) + .await? + { + GetResult::File(file) => { + ( + Box::new(BufReader::new(File::open(file.path).await?)), + // Use user-provided buffer size, falling back to 8 * the user-provided chunk size if that exists, otherwise falling back to 512 KiB as the default. + read_options + .as_ref() + .and_then(|opt| opt.buffer_size.or_else(|| opt.chunk_size.map(|cs| 8 * cs))) + .unwrap_or(512 * 1024), + read_options + .as_ref() + .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) + .unwrap_or(64 * 1024), + ) + } + GetResult::Stream(stream, _, _) => ( + Box::new(StreamReader::new(stream)), + read_options + .as_ref() + .and_then(|opt| opt.buffer_size.or_else(|| opt.chunk_size.map(|cs| 8 * cs))) + .unwrap_or(512 * 1024), + read_options + .as_ref() + .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) + .unwrap_or(64 * 1024), + ), + }; + let reader: Box = match CompressionCodec::from_uri(uri) { + Some(compression) => Box::new(compression.to_decoder(reader)), + None => reader, + }; + let reader = AsyncReaderBuilder::new() + .has_headers(parse_options.has_header) + .delimiter(parse_options.delimiter) + .double_quote(parse_options.double_quote) + .quote(parse_options.quote) + .escape(parse_options.escape_char) + .comment(parse_options.comment) + .buffer_capacity(buffer_size) + .create_reader(reader.compat()); + let read_stream = read_into_byterecord_chunk_stream( reader, - fields.clone().into(), - fields_to_projection_indices(&fields, &include_columns), - num_rows, + schema.fields.len(), + convert_options.limit, chunk_size, - max_chunks_in_flight, estimated_mean_row_size, estimated_std_row_size, - ) - .await?; - // Truncate fields to only contain projected columns. - if let Some(include_columns) = include_columns { - let field_map = fields - .into_iter() - .map(|field| (field.name.clone(), field)) - .collect::>(); - fields = include_columns - .into_iter() - .map(|col| field_map[col].clone()) - .collect::>(); - } - // Concatenate column chunks and convert into Daft Series. - // Note that this concatenation is done in parallel on the rayon threadpool. - let columns_series = column_chunks - .into_par_iter() - .zip(&fields) - .map(|(mut arrays, field)| { - let array = if arrays.len() > 1 { - // Concatenate all array chunks. - let unboxed_arrays = arrays.iter().map(Box::as_ref).collect::>(); - arrow2::compute::concatenate::concatenate(unboxed_arrays.as_slice())? - } else { - // Return single array chunk directly. - arrays.pop().unwrap() - }; - Series::try_from((field.name.as_ref(), cast_array_for_daft_if_needed(array))) - }) - .collect::>>()?; - // Build Daft Table. - let schema: arrow2::datatypes::Schema = fields.into(); - let daft_schema = Schema::try_from(&schema)?; - Table::new(daft_schema, columns_series) + ); + let projection_indices = + fields_to_projection_indices(&schema.fields, &convert_options.include_columns); + let fields = schema.fields; + Ok(( + parse_into_column_array_chunk_stream( + read_stream, + Arc::new(fields.clone()), + projection_indices, + ), + fields, + )) } -#[allow(clippy::too_many_arguments)] -async fn read_into_column_chunks( +fn read_into_byterecord_chunk_stream( mut reader: AsyncReader>, - fields: Arc>, - projection_indices: Arc>, + num_fields: usize, num_rows: Option, chunk_size: usize, - max_chunks_in_flight: usize, estimated_mean_row_size: Option, estimated_std_row_size: Option, -) -> DaftResult>>> +) -> impl ByteRecordChunkStream + Send where - R: AsyncRead + Unpin + Send, + R: AsyncRead + Unpin + Send + 'static, { - let num_fields = fields.len(); let num_rows = num_rows.unwrap_or(usize::MAX); let mut estimated_mean_row_size = estimated_mean_row_size.unwrap_or(200f64); let mut estimated_std_row_size = estimated_std_row_size.unwrap_or(20f64); // Stream of unparsed CSV byte record chunks. - let read_stream = async_stream::try_stream! { + async_stream::try_stream! { // Number of rows read in last read. let mut rows_read = 1; // Total number of rows read across all reads. @@ -413,12 +358,18 @@ where chunk_buffer.truncate(rows_read); yield chunk_buffer } - }; + } +} + +fn parse_into_column_array_chunk_stream( + stream: impl ByteRecordChunkStream + Send, + fields: Arc>, + projection_indices: Arc>, +) -> impl ColumnArrayChunkStream + Send { // Parsing stream: we spawn background tokio + rayon tasks so we can pipeline chunk parsing with chunk reading, and // we further parse each chunk column in parallel on the rayon threadpool. - let parse_stream = read_stream.map_ok(|record| { - let fields = fields.clone(); - let projection_indices = projection_indices.clone(); + stream.map_ok(move |record| { + let (fields, projection_indices) = (fields.clone(), projection_indices.clone()); tokio::spawn(async move { let (send, recv) = tokio::sync::oneshot::channel(); rayon::spawn(move || { @@ -433,36 +384,60 @@ where 0, ) }) - .collect::>>>()?; - DaftResult::Ok(chunk) + .collect::>>>() + .context(ArrowSnafu)?; + Ok(chunk) })(); let _ = send.send(result); }); recv.await.context(super::OneShotRecvSnafu {})? }) .context(super::JoinSnafu {}) - }); - // Collect all chunks in chunk x column form. - let chunks = parse_stream - // Limit the number of chunks we have in flight at any given time. - .try_buffered(max_chunks_in_flight) - .try_collect::>() - .await? - .into_iter() - .collect::>>()?; - // Transpose chunk x column into column x chunk. - let mut column_arrays = vec![Vec::with_capacity(chunks.len()); projection_indices.len()]; - for chunk in chunks.into_iter() { - for (idx, col) in chunk.into_iter().enumerate() { - column_arrays[idx].push(col); - } + }) +} + +fn chunks_to_table( + chunks: Vec>>, + include_columns: Option>, + mut fields: Vec, +) -> DaftResult
{ + // Truncate fields to only contain projected columns. + if let Some(include_columns) = include_columns { + let field_map = fields + .into_iter() + .map(|field| (field.name.clone(), field)) + .collect::>(); + fields = include_columns + .into_iter() + .map(|col| field_map[&col].clone()) + .collect::>(); } - Ok(column_arrays) + // Concatenate column chunks and convert into Daft Series. + // Note that this concatenation is done in parallel on the rayon threadpool. + let columns_series = chunks + .into_par_iter() + .zip(&fields) + .map(|(mut arrays, field)| { + let array = if arrays.len() > 1 { + // Concatenate all array chunks. + let unboxed_arrays = arrays.iter().map(Box::as_ref).collect::>(); + arrow2::compute::concatenate::concatenate(unboxed_arrays.as_slice())? + } else { + // Return single array chunk directly. + arrays.pop().unwrap() + }; + Series::try_from((field.name.as_ref(), cast_array_for_daft_if_needed(array))) + }) + .collect::>>()?; + // Build Daft Table. + let schema: arrow2::datatypes::Schema = fields.into(); + let daft_schema = Schema::try_from(&schema)?; + Table::new(daft_schema, columns_series) } fn fields_to_projection_indices( fields: &Vec, - include_columns: &Option>, + include_columns: &Option>, ) -> Arc> { let field_name_to_idx = fields .iter() @@ -475,7 +450,7 @@ fn fields_to_projection_indices( || (0..fields.len()).collect(), |cols| { cols.iter() - .map(|c| field_name_to_idx[c]) + .map(|c| field_name_to_idx[c.as_str()]) .collect::>() }, ) @@ -502,7 +477,9 @@ mod tests { use daft_table::Table; use rstest::rstest; - use super::{char_to_byte, read_csv}; + use crate::{char_to_byte, CsvConvertOptions, CsvParseOptions, CsvReadOptions}; + + use super::read_csv; fn check_equal_local_arrow2( path: &str, @@ -591,25 +568,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 20); assert_eq!( table.schema, @@ -662,22 +621,16 @@ mod tests { ]; let table = read_csv( file.as_ref(), - Some(column_names.clone()), - None, - None, - false, - None, - true, - None, - None, + Some( + CsvConvertOptions::default() + .with_column_names(Some(column_names.iter().map(|s| s.to_string()).collect())), + ), + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -723,23 +676,14 @@ mod tests { let table = read_csv( file.as_ref(), None, - None, - Some(5), - true, - Some('|'), - true, - None, - None, + Some(CsvParseOptions::default().with_delimiter(b'|')), None, io_client, None, true, None, - None, - None, - None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 20); assert_eq!( table.schema, Schema::new(vec![ @@ -762,7 +706,7 @@ mod tests { None, None, None, - Some(5), + None, ); Ok(()) @@ -783,23 +727,14 @@ mod tests { let table = read_csv( file.as_ref(), None, - None, - Some(5), - true, - None, - false, - None, - None, + Some(CsvParseOptions::default().with_double_quote(false)), None, io_client, None, true, None, - None, - None, - None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 19); assert_eq!( table.schema, Schema::new(vec![ @@ -822,7 +757,7 @@ mod tests { None, None, None, - Some(5), + None, ); Ok(()) @@ -842,23 +777,14 @@ mod tests { let table = read_csv( file.as_ref(), None, - None, - Some(5), - true, - None, - true, - Some('\''), // Testing with single quote - None, + Some(CsvParseOptions::default().with_quote(b'\'')), None, io_client, None, true, None, - None, - None, - None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 20); assert_eq!( table.schema, Schema::new(vec![ @@ -881,7 +807,7 @@ mod tests { None, None, None, - Some(5), + None, ); Ok(()) @@ -899,23 +825,14 @@ mod tests { let table = read_csv( file.as_ref(), None, - None, - Some(5), - true, - None, - true, - None, - Some('\\'), //testing with '\' as escape character + Some(CsvParseOptions::default().with_escape_char(Some(b'\\'))), None, io_client, None, true, None, - None, - None, - None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 20); assert_eq!( table.schema, Schema::new(vec![ @@ -938,7 +855,7 @@ mod tests { None, None, None, - Some(5), + None, ); Ok(()) @@ -956,23 +873,14 @@ mod tests { let table = read_csv( file.as_ref(), None, + Some(CsvParseOptions::default().with_comment(Some(b'#'))), None, - Some(5), - true, - None, - true, - None, - None, - Some('#'), io_client, None, - true, - None, - None, - None, + true, None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 19); assert_eq!( table.schema, Schema::new(vec![ @@ -995,7 +903,7 @@ mod tests { Some('#'), None, None, - Some(5), + None, ); Ok(()) @@ -1011,22 +919,13 @@ mod tests { let table = read_csv( file.as_ref(), - None, - None, - Some(5), - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_limit(Some(5))), None, None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 5); assert_eq!( @@ -1068,22 +967,16 @@ mod tests { let table = read_csv( file.as_ref(), - None, - Some(vec!["petal.length", "petal.width"]), - None, - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_include_columns(Some(vec![ + "petal.length".to_string(), + "petal.width".to_string(), + ]))), None, None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -1132,22 +1025,20 @@ mod tests { ]; let table = read_csv( file.as_ref(), - Some(column_names.clone()), - Some(vec!["petal.length", "petal.width"]), - None, - false, - None, - true, - None, - None, + Some( + CsvConvertOptions::default() + .with_column_names(Some(column_names.iter().map(|s| s.to_string()).collect())) + .with_include_columns(Some(vec![ + "petal.length".to_string(), + "petal.width".to_string(), + ])), + ), + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -1188,20 +1079,11 @@ mod tests { file.as_ref(), None, None, - None, - true, - None, - true, - None, - None, - None, + Some(CsvReadOptions::default().with_buffer_size(Some(128))), io_client, None, true, None, - Some(128), - None, - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -1245,20 +1127,11 @@ mod tests { file.as_ref(), None, None, - None, - true, - None, - true, - None, - None, - None, + Some(CsvReadOptions::default().with_chunk_size(Some(100))), io_client, None, true, None, - None, - Some(100), - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -1303,18 +1176,9 @@ mod tests { None, None, None, - true, - None, - true, - None, - None, - None, io_client, None, true, - None, - None, - None, Some(5), )?; assert_eq!(table.len(), 20); @@ -1355,25 +1219,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 6); assert_eq!( table.schema, @@ -1415,25 +1261,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 6); assert_eq!( table.schema, @@ -1482,21 +1310,12 @@ mod tests { let table = read_csv( file.as_ref(), - None, - None, - None, - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_schema(Some(schema.into()))), None, None, io_client, None, true, - Some(schema.into()), - None, - None, None, )?; assert_eq!(table.len(), 6); @@ -1538,25 +1357,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 3); assert_eq!( table.schema, @@ -1605,21 +1406,12 @@ mod tests { ])?; let table = read_csv( file.as_ref(), - None, - None, - None, - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_schema(Some(schema.into()))), None, None, io_client, None, true, - Some(schema.into()), - None, - None, None, )?; let num_rows = table.len(); @@ -1645,25 +1437,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let err = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - ); + let err = read_csv(file.as_ref(), None, None, None, io_client, None, true, None); assert!(err.is_err()); let err = err.unwrap_err(); assert!(matches!(err, DaftError::ArrowError(_)), "{}", err); @@ -1692,21 +1466,12 @@ mod tests { let err = read_csv( file.as_ref(), None, - None, - None, - false, - None, - true, - None, - None, + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, ); assert!(err.is_err()); let err = err.unwrap_err(); @@ -1755,25 +1520,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 100); assert_eq!( table.schema, @@ -1799,22 +1546,16 @@ mod tests { let column_names = vec!["a", "b"]; let table = read_csv( file, - Some(column_names.clone()), - None, - None, - false, - None, - true, - None, - None, + Some( + CsvConvertOptions::default() + .with_column_names(Some(column_names.iter().map(|s| s.to_string()).collect())), + ), + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 100); assert_eq!( @@ -1841,22 +1582,17 @@ mod tests { let column_names = vec!["a", "b"]; let table = read_csv( file, - Some(column_names.clone()), - Some(vec!["b"]), - None, - false, - None, - true, - None, - None, + Some( + CsvConvertOptions::default() + .with_column_names(Some(column_names.iter().map(|s| s.to_string()).collect())) + .with_include_columns(Some(vec!["b".to_string()])), + ), + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 100); assert_eq!( @@ -1878,22 +1614,13 @@ mod tests { let table = read_csv( file, - None, - None, - Some(10), - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_limit(Some(10))), None, None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 10); assert_eq!( @@ -1919,22 +1646,13 @@ mod tests { let table = read_csv( file, - None, - Some(vec!["b"]), - None, - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_include_columns(Some(vec!["b".to_string()]))), None, None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 100); assert_eq!( @@ -1958,20 +1676,11 @@ mod tests { file, None, None, - None, - true, - None, - true, - None, - None, - None, + Some(CsvReadOptions::default().with_buffer_size(Some(100))), io_client, None, true, None, - Some(100), - None, - None, )?; assert_eq!(table.len(), 5000); @@ -1991,20 +1700,11 @@ mod tests { file, None, None, - None, - true, - None, - true, - None, - None, - None, + Some(CsvReadOptions::default().with_chunk_size(Some(100))), io_client, None, true, None, - None, - Some(100), - None, )?; assert_eq!(table.len(), 5000); @@ -2020,25 +1720,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file, - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - Some(5), - )?; + let table = read_csv(file, None, None, None, io_client, None, true, Some(5))?; assert_eq!(table.len(), 5000); Ok(()) diff --git a/src/daft-decoding/src/deserialize.rs b/src/daft-decoding/src/deserialize.rs index 878b93064e..e8e79156a6 100644 --- a/src/daft-decoding/src/deserialize.rs +++ b/src/daft-decoding/src/deserialize.rs @@ -176,10 +176,6 @@ fn deserialize_datetime( for i in 0..ALL_TIMESTAMP_FMTS.len() { let idx = (i + *fmt_idx) % ALL_TIMESTAMP_FMTS.len(); let fmt = ALL_TIMESTAMP_FMTS[idx]; - println!( - "Deserializing dt: {string} with {fmt} as {:?}", - chrono::DateTime::parse_from_str(string, fmt) - ); if let Ok(dt) = chrono::DateTime::parse_from_str(string, fmt) { *fmt_idx = idx; return Some(dt.with_timezone(tz)); diff --git a/src/daft-micropartition/src/lib.rs b/src/daft-micropartition/src/lib.rs index c15e1e2c30..d0d305d13b 100644 --- a/src/daft-micropartition/src/lib.rs +++ b/src/daft-micropartition/src/lib.rs @@ -25,6 +25,9 @@ pub enum Error { #[snafu(display("Duplicate name found when evaluating expressions: {}", name))] DuplicatedField { name: String }, + #[snafu(display("CSV error: {}", source))] + DaftCSV { source: daft_csv::Error }, + #[snafu(display( "Field: {} not found in Parquet File: Available Fields: {:?}", field, diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index f0946748ba..b53af50908 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -7,7 +7,7 @@ use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; use daft_core::schema::{Schema, SchemaRef}; -use daft_csv::read::read_csv; +use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_parquet::read::{ read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions, }; @@ -18,9 +18,9 @@ use daft_table::Table; use snafu::ResultExt; -use crate::DaftCoreComputeSnafu; #[cfg(feature = "python")] use crate::PyIOSnafu; +use crate::{DaftCSVSnafu, DaftCoreComputeSnafu}; use daft_io::{IOConfig, IOStatsRef}; use daft_stats::TableMetadata; @@ -143,29 +143,40 @@ fn materialize_scan_task( } else { None }; - urls.map(|url| { - daft_csv::read::read_csv( - url, - col_names.clone(), - column_names.clone(), - scan_task.pushdowns.limit, - cfg.has_headers, - cfg.delimiter, - cfg.double_quote, - cfg.quote, - cfg.escape_char, - cfg.comment, - io_client.clone(), - io_stats.clone(), - native_storage_config.multithreaded_io, - None, // Allow read_csv to perform its own schema inference - cfg.buffer_size, - cfg.chunk_size, - None, // max_chunks_in_flight - ) - .context(DaftCoreComputeSnafu) - }) - .collect::>>()? + let convert_options = CsvConvertOptions::new_internal( + scan_task.pushdowns.limit, + column_names + .as_ref() + .map(|cols| cols.iter().map(|col| col.to_string()).collect()), + col_names + .as_ref() + .map(|cols| cols.iter().map(|col| col.to_string()).collect()), + None, + ); + let parse_options = CsvParseOptions::new_with_defaults( + cfg.has_headers, + cfg.delimiter, + cfg.double_quote, + cfg.quote, + cfg.escape_char, + cfg.comment, + ) + .context(DaftCSVSnafu)?; + let read_options = + CsvReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); + let uris = urls.collect::>(); + daft_csv::read_csv_bulk( + uris.as_slice(), + Some(convert_options), + Some(parse_options), + Some(read_options), + io_client, + io_stats, + native_storage_config.multithreaded_io, + None, + 8, + ) + .context(DaftCoreComputeSnafu)? } // **************** @@ -535,60 +546,33 @@ fn parquet_sources_to_row_groups(sources: &[DataFileSource]) -> Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_config: Arc, multithreaded_io: bool, io_stats: Option, - schema: Option, - buffer_size: Option, - chunk_size: Option, ) -> DaftResult { let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?; - let mut remaining_rows = num_rows; match uris { [] => Ok(MicroPartition::empty(None)), uris => { - // Naively load CSVs from URIs - let mut tables = vec![]; - for uri in uris { - // Terminate early if we have read enough rows already - if remaining_rows.map(|rr| rr == 0).unwrap_or(false) { - break; - } - let table = read_csv( - uri, - column_names.clone(), - include_columns.clone(), - remaining_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - io_client.clone(), - io_stats.clone(), - multithreaded_io, - schema.clone(), - buffer_size, - chunk_size, - None, - )?; - remaining_rows = remaining_rows.map(|rr| rr - table.len()); - tables.push(table); - } + // Perform a bulk read of URIs, materializing a table per URI. + let tables = daft_csv::read_csv_bulk( + uris, + convert_options, + parse_options, + read_options, + io_client, + io_stats, + multithreaded_io, + None, + 8, + ) + .context(DaftCoreComputeSnafu)?; // Union all schemas and cast all tables to the same schema let unioned_schema = tables @@ -802,5 +786,6 @@ impl Display for MicroPartition { Ok(()) } } + #[cfg(test)] mod test {} diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 1a028b79ad..f2a6796ffc 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -9,6 +9,7 @@ use daft_core::{ schema::Schema, Series, }; +use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_dsl::python::PyExpr; use daft_io::{python::IOConfig, IOStatsContext}; use daft_parquet::read::ParquetSchemaInferenceOptions; @@ -369,46 +370,27 @@ impl PyMicroPartition { Ok(mp.into()) } - #[allow(clippy::too_many_arguments)] #[staticmethod] pub fn read_csv( py: Python, uri: &str, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: Option, - delimiter: Option, - double_quote: Option, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_config: Option, multithreaded_io: Option, - schema: Option, - buffer_size: Option, - chunk_size: Option, ) -> PyResult { let mp = py.allow_threads(|| { let io_stats = IOStatsContext::new(format!("read_csv: for uri {uri}")); let io_config = io_config.unwrap_or_default().config.into(); crate::micropartition::read_csv_into_micropartition( [uri].as_ref(), - column_names, - include_columns, - num_rows, - has_header.unwrap_or(true), - delimiter, - double_quote.unwrap_or(true), - quote, - escape_char, - comment, + convert_options, + parse_options, + read_options, io_config, multithreaded_io.unwrap_or(true), Some(io_stats), - schema.map(|s| s.schema), - buffer_size, - chunk_size, ) })?; Ok(mp.into()) @@ -648,7 +630,6 @@ pub(crate) fn read_csv_into_py_table( .extract() } -#[allow(clippy::too_many_arguments)] pub(crate) fn read_parquet_into_py_table( py: Python, uri: &str, diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 5227c8fa42..226f0c91c0 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -2,6 +2,7 @@ use std::{fmt::Display, sync::Arc}; use common_error::{DaftError, DaftResult}; use daft_core::schema::SchemaRef; +use daft_csv::CsvParseOptions; use daft_io::{ get_io_client, get_runtime, parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef, }; @@ -178,14 +179,16 @@ impl GlobScanOperator { comment, .. }) => { - let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema( + let (schema, _) = daft_csv::metadata::read_csv_schema( first_filepath.as_str(), - *has_headers, - *delimiter, - *double_quote, - *quote, - *escape_char, - *comment, + Some(CsvParseOptions::new_with_defaults( + *has_headers, + *delimiter, + *double_quote, + *quote, + *escape_char, + *comment, + )?), None, io_client, Some(io_stats),