From f289da17114a642306e12efa07e38e725ef27535 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 21 Nov 2023 14:33:14 -0800 Subject: [PATCH] [FEAT] [CSV Reader] Bulk CSV reader + general CSV reader refactor (#1614) This PR adds support for bulk CSV reading to the native CSV reader, and integrates bulk CSV reading with `MicroPartition` as the default reading path. ## Driveby Refactors - Consolidated the execution-side configuration of CSV reading into 3 config classes: `CsvConvertOptions`, `CsvParseOptions`, and `CsvReadOptions`. This reduces the bloat of our execution-side code (and tests) by a good bit, and providing config objects that are transparently passed through the execution layer should make it easier to add more CSV configuration options in the future (less code to change). Note that these are currently _not_ exposed to the query plan (logical or physical), although these might be moved into the `FileFormatConfig` enum once the old `Table`-based I/O path is removed. - CSV reading has been refactored into a pipeline of stream transformations. - A lot of repeated intermediate CSV reading code has been eliminated by making the Arrow2 CSV reader a trait object. - Stats gathered during schema inference are now bundled into a `CsvReadStats` struct. --- Cargo.lock | 2 + daft/daft.pyi | 91 +- daft/logical/schema.py | 15 +- daft/table/micropartition.py | 38 +- daft/table/schema_inference.py | 17 +- daft/table/table.py | 32 +- daft/table/table_io.py | 31 +- src/daft-csv/Cargo.toml | 2 + src/daft-csv/src/lib.rs | 29 +- src/daft-csv/src/metadata.rs | 392 +++---- src/daft-csv/src/options.rs | 352 ++++++ src/daft-csv/src/python.rs | 58 +- src/daft-csv/src/read.rs | 1032 ++++++----------- src/daft-decoding/src/deserialize.rs | 4 - src/daft-micropartition/src/lib.rs | 3 + src/daft-micropartition/src/micropartition.rs | 121 +- src/daft-micropartition/src/python.rs | 33 +- src/daft-scan/src/glob.rs | 17 +- 18 files changed, 1092 insertions(+), 1177 deletions(-) create mode 100644 src/daft-csv/src/options.rs diff --git a/Cargo.lock b/Cargo.lock index ddb418c9b7..a013e5abc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1112,6 +1112,7 @@ dependencies = [ "async-compat", "async-compression", "async-stream", + "bincode", "bytes", "chrono", "chrono-tz", @@ -1128,6 +1129,7 @@ dependencies = [ "pyo3-log", "rayon", "rstest", + "serde", "simdutf8", "snafu", "tokio", diff --git a/daft/daft.pyi b/daft/daft.pyi index 8cc8f40e6e..3055883316 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -244,6 +244,60 @@ class FileFormatConfig: def __eq__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override] def __ne__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override] +class CsvConvertOptions: + """ + Options for converting CSV data to Daft data. + """ + + limit: int | None + include_columns: list[str] | None + column_names: list[str] | None + schema: PySchema | None + + def __init__( + self, + limit: int | None = None, + include_columns: list[str] | None = None, + column_names: list[str] | None = None, + schema: PySchema | None = None, + ): ... + +class CsvParseOptions: + """ + Options for parsing CSV files. + """ + + has_header: bool + delimiter: str | None + double_quote: bool + quote: str | None + escape_char: str | None + comment: str | None + + def __init__( + self, + has_header: bool = True, + delimiter: str | None = None, + double_quote: bool = True, + quote: str | None = None, + escape_char: str | None = None, + comment: str | None = None, + ): ... + +class CsvReadOptions: + """ + Options for reading CSV files. + """ + + buffer_size: int | None + chunk_size: int | None + + def __init__( + self, + buffer_size: int | None = None, + chunk_size: int | None = None, + ): ... + class FileInfo: """ Metadata for a single file. @@ -521,29 +575,15 @@ def read_parquet_schema( ): ... def read_csv( uri: str, - column_names: list[str] | None = None, - include_columns: list[str] | None = None, - num_rows: int | None = None, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + convert_options: CsvConvertOptions | None = None, + parse_options: CsvParseOptions | None = None, + read_options: CsvReadOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, - schema: PySchema | None = None, - buffer_size: int | None = None, - chunk_size: int | None = None, ): ... def read_csv_schema( uri: str, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + parse_options: CsvParseOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, ): ... @@ -877,20 +917,11 @@ class PyMicroPartition: def read_csv( cls, uri: str, - column_names: list[str] | None = None, - include_columns: list[str] | None = None, - num_rows: int | None = None, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + convert_options: CsvConvertOptions | None = None, + parse_options: CsvParseOptions | None = None, + read_options: CsvReadOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, - schema: PySchema | None = None, - buffer_size: int | None = None, - chunk_size: int | None = None, ): ... class PhysicalPlanScheduler: diff --git a/daft/logical/schema.py b/daft/logical/schema.py index e1a2b30112..fe3cf72fe6 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -3,6 +3,7 @@ import sys from typing import TYPE_CHECKING, Iterator +from daft.daft import CsvParseOptions from daft.daft import PyField as _PyField from daft.daft import PySchema as _PySchema from daft.daft import read_csv_schema as _read_csv_schema @@ -164,24 +165,14 @@ def from_parquet( def from_csv( cls, path: str, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + parse_options: CsvParseOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, ) -> Schema: return Schema._from_pyschema( _read_csv_schema( uri=path, - has_header=has_header, - delimiter=delimiter, - double_quote=double_quote, - quote=quote, - escape_char=escape_char, - comment=comment, + parse_options=parse_options, io_config=io_config, multithreaded_io=multithreaded_io, ) diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index 74e56c67d7..c8bb398cd0 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -5,7 +5,13 @@ import pyarrow as pa -from daft.daft import IOConfig, JoinType +from daft.daft import ( + CsvConvertOptions, + CsvParseOptions, + CsvReadOptions, + IOConfig, + JoinType, +) from daft.daft import PyMicroPartition as _PyMicroPartition from daft.daft import PyTable as _PyTable from daft.daft import ScanTask as _ScanTask @@ -352,37 +358,19 @@ def read_parquet_bulk( def read_csv( cls, path: str, - column_names: list[str] | None = None, - include_columns: list[str] | None = None, - num_rows: int | None = None, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + convert_options: CsvConvertOptions, + parse_options: CsvParseOptions, + read_options: CsvReadOptions, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, - schema: Schema | None = None, - buffer_size: int | None = None, - chunk_size: int | None = None, ) -> MicroPartition: return MicroPartition._from_pymicropartition( _PyMicroPartition.read_csv( uri=path, - column_names=column_names, - include_columns=include_columns, - num_rows=num_rows, - has_header=has_header, - delimiter=delimiter, - double_quote=double_quote, - quote=quote, - escape_char=escape_char, - comment=comment, + convert_options=convert_options, + parse_options=parse_options, + read_options=read_options, io_config=io_config, multithreaded_io=multithreaded_io, - schema=schema._schema if schema is not None else None, - buffer_size=buffer_size, - chunk_size=chunk_size, ) ) diff --git a/daft/table/schema_inference.py b/daft/table/schema_inference.py index f76d101bd4..9b8d43094a 100644 --- a/daft/table/schema_inference.py +++ b/daft/table/schema_inference.py @@ -6,7 +6,12 @@ import pyarrow.json as pajson import pyarrow.parquet as papq -from daft.daft import NativeStorageConfig, PythonStorageConfig, StorageConfig +from daft.daft import ( + CsvParseOptions, + NativeStorageConfig, + PythonStorageConfig, + StorageConfig, +) from daft.datatype import DataType from daft.filesystem import _resolve_paths_and_filesystem from daft.logical.schema import Schema @@ -41,8 +46,14 @@ def from_csv( io_config = config.io_config return Schema.from_csv( str(file), - has_header=csv_options.header_index is not None, - delimiter=csv_options.delimiter, + parse_options=CsvParseOptions( + has_header=csv_options.header_index is not None, + delimiter=csv_options.delimiter, + double_quote=csv_options.double_quote, + quote=csv_options.quote, + escape_char=csv_options.escape_char, + comment=csv_options.comment, + ), io_config=io_config, ) diff --git a/daft/table/table.py b/daft/table/table.py index 5217f35f21..1669170e23 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -6,7 +6,7 @@ import pyarrow as pa from daft.arrow_utils import ensure_table -from daft.daft import JoinType +from daft.daft import CsvConvertOptions, CsvParseOptions, CsvReadOptions, JoinType from daft.daft import PyTable as _PyTable from daft.daft import ScanTask as _ScanTask from daft.daft import read_csv as _read_csv @@ -448,38 +448,20 @@ def read_parquet_statistics( def read_csv( cls, path: str, - column_names: list[str] | None = None, - include_columns: list[str] | None = None, - num_rows: int | None = None, - has_header: bool | None = None, - delimiter: str | None = None, - double_quote: bool | None = None, - quote: str | None = None, - escape_char: str | None = None, - comment: str | None = None, + convert_options: CsvConvertOptions | None = None, + parse_options: CsvParseOptions | None = None, + read_options: CsvReadOptions | None = None, io_config: IOConfig | None = None, multithreaded_io: bool | None = None, - schema: Schema | None = None, - buffer_size: int | None = None, - chunk_size: int | None = None, ) -> Table: return Table._from_pytable( _read_csv( uri=path, - column_names=column_names, - include_columns=include_columns, - num_rows=num_rows, - has_header=has_header, - delimiter=delimiter, - double_quote=double_quote, - quote=quote, - escape_char=escape_char, - comment=comment, + convert_options=convert_options, + parse_options=parse_options, + read_options=read_options, io_config=io_config, multithreaded_io=multithreaded_io, - schema=schema._schema if schema is not None else None, - buffer_size=buffer_size, - chunk_size=chunk_size, ) ) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index e2408ae2a3..5ce668012b 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -12,7 +12,15 @@ from pyarrow import json as pajson from pyarrow import parquet as papq -from daft.daft import IOConfig, NativeStorageConfig, PythonStorageConfig, StorageConfig +from daft.daft import ( + CsvConvertOptions, + CsvParseOptions, + CsvReadOptions, + IOConfig, + NativeStorageConfig, + PythonStorageConfig, + StorageConfig, +) from daft.expressions import ExpressionsProjection from daft.filesystem import _resolve_paths_and_filesystem from daft.logical.schema import Schema @@ -219,21 +227,27 @@ def read_csv( file, (str, pathlib.Path) ), "Native downloader only works on string inputs to read_parquet" has_header = csv_options.header_index is not None - tbl = Table.read_csv( - str(file), - column_names=schema.column_names() if not has_header else None, + csv_convert_options = CsvConvertOptions( + limit=read_options.num_rows, include_columns=read_options.column_names, - num_rows=read_options.num_rows, + column_names=schema.column_names() if not has_header else None, + schema=schema._schema if schema is not None else None, + ) + csv_parse_options = CsvParseOptions( has_header=has_header, delimiter=csv_options.delimiter, double_quote=csv_options.double_quote, quote=csv_options.quote, escape_char=csv_options.escape_char, comment=csv_options.comment, + ) + csv_read_options = CsvReadOptions(buffer_size=csv_options.buffer_size, chunk_size=csv_options.chunk_size) + tbl = Table.read_csv( + str(file), + convert_options=csv_convert_options, + parse_options=csv_parse_options, + read_options=csv_read_options, io_config=config.io_config, - schema=schema, - buffer_size=csv_options.buffer_size, - chunk_size=csv_options.chunk_size, ) return _cast_table_to_schema(tbl, read_options=read_options, schema=schema) else: @@ -241,7 +255,6 @@ def read_csv( io_config = config.io_config with _open_stream(file, io_config) as f: - from daft.utils import ARROW_VERSION if csv_options.comment is not None and ARROW_VERSION < (7, 0, 0): diff --git a/src/daft-csv/Cargo.toml b/src/daft-csv/Cargo.toml index 36f97fb765..d0e164b7d9 100644 --- a/src/daft-csv/Cargo.toml +++ b/src/daft-csv/Cargo.toml @@ -3,6 +3,7 @@ arrow2 = {workspace = true, features = ["io_csv", "io_csv_async"]} async-compat = {workspace = true} async-compression = {workspace = true} async-stream = {workspace = true} +bincode = {workspace = true} bytes = {workspace = true} chrono = {workspace = true} chrono-tz = {workspace = true} @@ -18,6 +19,7 @@ log = {workspace = true} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true, optional = true} rayon = {workspace = true} +serde = {workspace = true} simdutf8 = "0.1.3" snafu = {workspace = true} tokio = {workspace = true} diff --git a/src/daft-csv/src/lib.rs b/src/daft-csv/src/lib.rs index 4e9af21c81..7541563578 100644 --- a/src/daft-csv/src/lib.rs +++ b/src/daft-csv/src/lib.rs @@ -1,16 +1,23 @@ #![feature(async_closure)] #![feature(let_chains)] +#![feature(trait_alias)] +#![feature(trait_upcasting)] use common_error::DaftError; use snafu::Snafu; mod compression; pub mod metadata; +pub mod options; #[cfg(feature = "python")] pub mod python; pub mod read; mod schema; + +pub use metadata::read_csv_schema_bulk; +pub use options::{char_to_byte, CsvConvertOptions, CsvParseOptions, CsvReadOptions}; #[cfg(feature = "python")] -pub use python::register_modules; +use pyo3::prelude::*; +pub use read::{read_csv, read_csv_bulk}; #[derive(Debug, Snafu)] pub enum Error { @@ -50,3 +57,23 @@ impl From for Error { Error::IOError { source: err } } } + +#[cfg(feature = "python")] +impl From for pyo3::PyErr { + fn from(value: Error) -> Self { + let daft_error: DaftError = value.into(); + daft_error.into() + } +} + +type Result = std::result::Result; + +#[cfg(feature = "python")] +pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { + parent.add_class::()?; + parent.add_class::()?; + parent.add_class::()?; + parent.add_wrapped(wrap_pyfunction!(python::pylib::read_csv))?; + parent.add_wrapped(wrap_pyfunction!(python::pylib::read_csv_schema))?; + Ok(()) +} diff --git a/src/daft-csv/src/metadata.rs b/src/daft-csv/src/metadata.rs index 5e04846266..75ac0ad43a 100644 --- a/src/daft-csv/src/metadata.rs +++ b/src/daft-csv/src/metadata.rs @@ -6,42 +6,62 @@ use common_error::DaftResult; use csv_async::ByteRecord; use daft_core::schema::Schema; use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef}; +use futures::{StreamExt, TryStreamExt}; +use snafu::ResultExt; use tokio::{ fs::File, io::{AsyncBufRead, AsyncRead, BufReader}, }; use tokio_util::io::StreamReader; -use crate::read::char_to_byte; -use crate::{compression::CompressionCodec, schema::merge_schema}; +use crate::{compression::CompressionCodec, schema::merge_schema, CsvParseOptions}; use daft_decoding::inference::infer; const DEFAULT_COLUMN_PREFIX: &str = "column_"; -#[allow(clippy::too_many_arguments)] +#[derive(Debug, Clone)] +pub struct CsvReadStats { + pub total_bytes_read: usize, + pub total_records_read: usize, + pub mean_record_size_bytes: f64, + pub stddev_record_size_bytes: f64, +} + +impl CsvReadStats { + pub fn new( + total_bytes_read: usize, + total_records_read: usize, + mean_record_size_bytes: f64, + stddev_record_size_bytes: f64, + ) -> Self { + Self { + total_bytes_read, + total_records_read, + mean_record_size_bytes, + stddev_record_size_bytes, + } + } +} + +impl Default for CsvReadStats { + fn default() -> Self { + Self::new(0, 0, 0f64, 0f64) + } +} + pub fn read_csv_schema( uri: &str, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: Option, max_bytes: Option, io_client: Arc, io_stats: Option, -) -> DaftResult<(Schema, usize, usize, f64, f64)> { +) -> DaftResult<(Schema, CsvReadStats)> { let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); runtime_handle.block_on(async { read_csv_schema_single( uri, - has_header, - char_to_byte(delimiter)?, - double_quote, - char_to_byte(quote)?, - char_to_byte(escape_char)?, - char_to_byte(comment)?, + parse_options.unwrap_or_default(), // Default to 1 MiB. max_bytes.or(Some(1024 * 1024)), io_client, @@ -51,19 +71,50 @@ pub fn read_csv_schema( }) } -#[allow(clippy::too_many_arguments)] +pub async fn read_csv_schema_bulk( + uris: &[&str], + parse_options: Option, + max_bytes: Option, + io_client: Arc, + io_stats: Option, + num_parallel_tasks: usize, +) -> DaftResult> { + let runtime_handle = get_runtime(true)?; + let _rt_guard = runtime_handle.enter(); + let result = runtime_handle + .block_on(async { + let task_stream = futures::stream::iter(uris.iter().map(|uri| { + let owned_string = uri.to_string(); + let owned_client = io_client.clone(); + let owned_io_stats = io_stats.clone(); + let owned_parse_options = parse_options.clone(); + tokio::spawn(async move { + read_csv_schema_single( + &owned_string, + owned_parse_options.unwrap_or_default(), + max_bytes, + owned_client, + owned_io_stats, + ) + .await + }) + })); + task_stream + .buffered(num_parallel_tasks) + .try_collect::>() + .await + }) + .context(super::JoinSnafu {})?; + result.into_iter().collect::>>() +} + pub(crate) async fn read_csv_schema_single( uri: &str, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: CsvParseOptions, max_bytes: Option, io_client: Arc, io_stats: Option, -) -> DaftResult<(Schema, usize, usize, f64, f64)> { +) -> DaftResult<(Schema, CsvReadStats)> { let compression_codec = CompressionCodec::from_uri(uri); match io_client .single_url_get(uri.to_string(), None, io_stats) @@ -73,12 +124,7 @@ pub(crate) async fn read_csv_schema_single( read_csv_schema_from_compressed_reader( BufReader::new(File::open(file.path).await?), compression_codec, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, + parse_options, max_bytes, ) .await @@ -87,12 +133,7 @@ pub(crate) async fn read_csv_schema_single( read_csv_schema_from_compressed_reader( StreamReader::new(stream), compression_codec, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, + parse_options, // Truncate max_bytes to size if both are set. max_bytes.map(|m| size.map(|s| m.min(s)).unwrap_or(m)), ) @@ -105,14 +146,9 @@ pub(crate) async fn read_csv_schema_single( async fn read_csv_schema_from_compressed_reader( reader: R, compression_codec: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: CsvParseOptions, max_bytes: Option, -) -> DaftResult<(Schema, usize, usize, f64, f64)> +) -> DaftResult<(Schema, CsvReadStats)> where R: AsyncBufRead + Unpin + Send + 'static, { @@ -120,99 +156,50 @@ where Some(compression) => { read_csv_schema_from_uncompressed_reader( compression.to_decoder(reader), - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - max_bytes, - ) - .await - } - None => { - read_csv_schema_from_uncompressed_reader( - reader, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, + parse_options, max_bytes, ) .await } + None => read_csv_schema_from_uncompressed_reader(reader, parse_options, max_bytes).await, } } #[allow(clippy::too_many_arguments)] async fn read_csv_schema_from_uncompressed_reader( reader: R, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: CsvParseOptions, max_bytes: Option, -) -> DaftResult<(Schema, usize, usize, f64, f64)> +) -> DaftResult<(Schema, CsvReadStats)> where R: AsyncRead + Unpin + Send, { - let (schema, total_bytes_read, num_records_read, mean_size, std_size) = - read_csv_arrow_schema_from_uncompressed_reader( - reader, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - max_bytes, - ) - .await?; - Ok(( - Schema::try_from(&schema)?, - total_bytes_read, - num_records_read, - mean_size, - std_size, - )) + let (schema, read_stats) = + read_csv_arrow_schema_from_uncompressed_reader(reader, parse_options, max_bytes).await?; + Ok((Schema::try_from(&schema)?, read_stats)) } #[allow(clippy::too_many_arguments)] async fn read_csv_arrow_schema_from_uncompressed_reader( reader: R, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: CsvParseOptions, max_bytes: Option, -) -> DaftResult<(arrow2::datatypes::Schema, usize, usize, f64, f64)> +) -> DaftResult<(arrow2::datatypes::Schema, CsvReadStats)> where R: AsyncRead + Unpin + Send, { let mut reader = AsyncReaderBuilder::new() - .has_headers(has_header) - .delimiter(delimiter.unwrap_or(b',')) - .double_quote(double_quote) - .quote(quote.unwrap_or(b'"')) - .escape(escape_char) - .comment(comment) + .has_headers(parse_options.has_header) + .delimiter(parse_options.delimiter) + .double_quote(parse_options.double_quote) + .quote(parse_options.quote) + .escape(parse_options.escape_char) + .comment(parse_options.comment) .buffer_capacity(max_bytes.unwrap_or(1 << 20).min(1 << 20)) .create_reader(reader.compat()); - let (fields, total_bytes_read, num_records_read, mean_size, std_size) = - infer_schema(&mut reader, None, max_bytes, has_header).await?; - Ok(( - fields.into(), - total_bytes_read, - num_records_read, - mean_size, - std_size, - )) + let (fields, read_stats) = + infer_schema(&mut reader, None, max_bytes, parse_options.has_header).await?; + Ok((fields.into(), read_stats)) } async fn infer_schema( @@ -220,7 +207,7 @@ async fn infer_schema( max_rows: Option, max_bytes: Option, has_header: bool, -) -> arrow2::error::Result<(Vec, usize, usize, f64, f64)> +) -> arrow2::error::Result<(Vec, CsvReadStats)> where R: futures::AsyncRead + Unpin + Send, { @@ -240,7 +227,7 @@ where } else { // Save the csv reader position before reading headers if !reader.read_byte_record(&mut record).await? { - return Ok((vec![], 0, 0, 0f64, 0f64)); + return Ok((vec![], Default::default())); } let first_record_count = record.len(); ( @@ -292,7 +279,10 @@ where } let fields = merge_schema(&headers, &mut column_types); let std = (m2 / ((records_count - 1) as f64)).sqrt(); - Ok((fields, total_bytes, records_count, mean, std)) + Ok(( + fields, + CsvReadStats::new(total_bytes, records_count, mean, std), + )) } #[cfg(test)] @@ -304,6 +294,8 @@ mod tests { use daft_io::{IOClient, IOConfig}; use rstest::rstest; + use crate::CsvParseOptions; + use super::read_csv_schema; #[rstest] @@ -340,18 +332,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -362,8 +344,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 328); - assert_eq!(num_records_read, 20); + assert_eq!(read_stats.total_bytes_read, 328); + assert_eq!(read_stats.total_records_read, 20); Ok(()) } @@ -379,14 +361,9 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( + let (schema, read_stats) = read_csv_schema( file.as_ref(), - true, - Some('|'), - true, - None, - None, - None, + Some(CsvParseOptions::default().with_delimiter(b'|')), None, io_client.clone(), None, @@ -401,8 +378,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 328); - assert_eq!(num_records_read, 20); + assert_eq!(read_stats.total_bytes_read, 328); + assert_eq!(read_stats.total_records_read, 20); Ok(()) } @@ -415,20 +392,9 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (_, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; - assert_eq!(total_bytes_read, 328); - assert_eq!(num_records_read, 20); + let (_, read_stats) = read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; + assert_eq!(read_stats.total_bytes_read, 328); + assert_eq!(read_stats.total_records_read, 20); Ok(()) } @@ -444,14 +410,9 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( + let (schema, read_stats) = read_csv_schema( file.as_ref(), - false, - None, - true, - None, - None, - None, + Some(CsvParseOptions::default().with_has_header(false)), None, io_client.clone(), None, @@ -466,8 +427,8 @@ mod tests { Field::new("column_5", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 328); - assert_eq!(num_records_read, 20); + assert_eq!(read_stats.total_bytes_read, 328); + assert_eq!(read_stats.total_records_read, 20); Ok(()) } @@ -483,18 +444,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -505,8 +456,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 49); - assert_eq!(num_records_read, 3); + assert_eq!(read_stats.total_bytes_read, 49); + assert_eq!(read_stats.total_records_read, 3); Ok(()) } @@ -519,18 +470,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -541,8 +482,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 82); - assert_eq!(num_records_read, 6); + assert_eq!(read_stats.total_bytes_read, 82); + assert_eq!(read_stats.total_records_read, 6); Ok(()) } @@ -558,18 +499,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -581,8 +512,8 @@ mod tests { Field::new("variety", DataType::Utf8), ])?, ); - assert_eq!(total_bytes_read, 33); - assert_eq!(num_records_read, 2); + assert_eq!(read_stats.total_bytes_read, 33); + assert_eq!(read_stats.total_records_read, 2); Ok(()) } @@ -595,18 +526,8 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, total_bytes_read, num_records_read, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - Some(100), - io_client.clone(), - None, - )?; + let (schema, read_stats) = + read_csv_schema(file.as_ref(), None, Some(100), io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ @@ -618,8 +539,16 @@ mod tests { ])?, ); // Max bytes doesn't include header, so add 15 bytes to upper bound. - assert!(total_bytes_read <= 100 + 15, "{}", total_bytes_read); - assert!(num_records_read <= 10, "{}", num_records_read); + assert!( + read_stats.total_bytes_read <= 100 + 15, + "{}", + read_stats.total_bytes_read + ); + assert!( + read_stats.total_records_read <= 10, + "{}", + read_stats.total_records_read + ); Ok(()) } @@ -635,18 +564,7 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let err = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - ); + let err = read_csv_schema(file.as_ref(), None, None, io_client.clone(), None); assert!(err.is_err()); let err = err.unwrap_err(); assert!(matches!(err, DaftError::ArrowError(_)), "{}", err); @@ -673,12 +591,7 @@ mod tests { let err = read_csv_schema( file.as_ref(), - true, - None, - true, - None, - None, - None, + Some(CsvParseOptions::default().with_has_header(false)), None, io_client.clone(), None, @@ -729,18 +642,7 @@ mod tests { io_config.s3.anonymous = true; let io_client = Arc::new(IOClient::new(io_config.into())?); - let (schema, _, _, _, _) = read_csv_schema( - file.as_ref(), - true, - None, - true, - None, - None, - None, - None, - io_client.clone(), - None, - )?; + let (schema, _) = read_csv_schema(file.as_ref(), None, None, io_client.clone(), None)?; assert_eq!( schema, Schema::new(vec![ diff --git a/src/daft-csv/src/options.rs b/src/daft-csv/src/options.rs new file mode 100644 index 0000000000..eceebadfd7 --- /dev/null +++ b/src/daft-csv/src/options.rs @@ -0,0 +1,352 @@ +use daft_core::{impl_bincode_py_state_serialization, schema::SchemaRef}; +use serde::{Deserialize, Serialize}; +#[cfg(feature = "python")] +use { + daft_core::python::schema::PySchema, + pyo3::{ + pyclass, pyclass::CompareOp, pymethods, types::PyBytes, PyObject, PyResult, PyTypeInfo, + Python, ToPyObject, + }, +}; + +/// Options for converting CSV data to Daft data. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] +pub struct CsvConvertOptions { + pub limit: Option, + pub include_columns: Option>, + pub column_names: Option>, + pub schema: Option, +} + +impl CsvConvertOptions { + pub fn new_internal( + limit: Option, + include_columns: Option>, + column_names: Option>, + schema: Option, + ) -> Self { + Self { + limit, + include_columns, + column_names, + schema, + } + } + + pub fn with_limit(self, limit: Option) -> Self { + Self { + limit, + include_columns: self.include_columns, + column_names: self.column_names, + schema: self.schema, + } + } + + pub fn with_include_columns(self, include_columns: Option>) -> Self { + Self { + limit: self.limit, + include_columns, + column_names: self.column_names, + schema: self.schema, + } + } + + pub fn with_column_names(self, column_names: Option>) -> Self { + Self { + limit: self.limit, + include_columns: self.include_columns, + column_names, + schema: self.schema, + } + } + + pub fn with_schema(self, schema: Option) -> Self { + Self { + limit: self.limit, + include_columns: self.include_columns, + column_names: self.column_names, + schema, + } + } +} + +impl Default for CsvConvertOptions { + fn default() -> Self { + Self::new_internal(None, None, None, None) + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl CsvConvertOptions { + /// Create conversion options for the CSV reader. + /// + /// # Arguments: + /// + /// * `limit` - Only read this many rows. + /// * `include_columns` - The names of the columns that should be kept, e.g. via a projection. + /// * `column_names` - The names for the CSV columns. + /// * `schema` - The names and dtypes for the CSV columns. + #[new] + #[pyo3(signature = (limit=None, include_columns=None, column_names=None, schema=None))] + pub fn new( + limit: Option, + include_columns: Option>, + column_names: Option>, + schema: Option, + ) -> Self { + Self::new_internal( + limit, + include_columns, + column_names, + schema.map(|s| s.into()), + ) + } + + #[getter] + pub fn get_limit(&self) -> PyResult> { + Ok(self.limit) + } + + #[getter] + pub fn get_include_columns(&self) -> PyResult>> { + Ok(self.include_columns.clone()) + } + + #[getter] + pub fn get_column_names(&self) -> PyResult>> { + Ok(self.column_names.clone()) + } + + #[getter] + pub fn get_schema(&self) -> PyResult> { + Ok(self.schema.as_ref().map(|s| s.clone().into())) + } + + fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { + match op { + CompareOp::Eq => self == other, + CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq), + _ => unimplemented!("not implemented"), + } + } + + pub fn __str__(&self) -> PyResult { + Ok(format!("{:?}", self)) + } +} + +impl_bincode_py_state_serialization!(CsvConvertOptions); + +/// Options for parsing CSV files. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] +pub struct CsvParseOptions { + pub has_header: bool, + pub delimiter: u8, + pub double_quote: bool, + pub quote: u8, + pub escape_char: Option, + pub comment: Option, +} + +impl CsvParseOptions { + pub fn new_internal( + has_header: bool, + delimiter: u8, + double_quote: bool, + quote: u8, + escape_char: Option, + comment: Option, + ) -> Self { + Self { + has_header, + delimiter, + double_quote, + quote, + escape_char, + comment, + } + } + + pub fn new_with_defaults( + has_header: bool, + delimiter: Option, + double_quote: bool, + quote: Option, + escape_char: Option, + comment: Option, + ) -> super::Result { + Ok(Self::new_internal( + has_header, + char_to_byte(delimiter)?.unwrap_or(b','), + double_quote, + char_to_byte(quote)?.unwrap_or(b'"'), + char_to_byte(escape_char)?, + char_to_byte(comment)?, + )) + } + + pub fn with_has_header(self, has_header: bool) -> Self { + Self { has_header, ..self } + } + + pub fn with_delimiter(self, delimiter: u8) -> Self { + Self { delimiter, ..self } + } + + pub fn with_double_quote(self, double_quote: bool) -> Self { + Self { + double_quote, + ..self + } + } + + pub fn with_quote(self, quote: u8) -> Self { + Self { quote, ..self } + } + + pub fn with_escape_char(self, escape_char: Option) -> Self { + Self { + escape_char, + ..self + } + } + + pub fn with_comment(self, comment: Option) -> Self { + Self { comment, ..self } + } +} + +impl Default for CsvParseOptions { + fn default() -> Self { + Self::new_with_defaults(true, None, true, None, None, None).unwrap() + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl CsvParseOptions { + /// Create parsing options for the CSV reader. + /// + /// # Arguments: + /// + /// * `has_headers` - Whether the CSV has a header row; if so, it will be skipped during data parsing. + /// * `delimiter` - The character delmiting individual cells in the CSV data. + /// * `double_quote` - Whether double-quote escapes are enabled. + /// * `quote` - The character to use for quoting strings. + /// * `escape_char` - The character to use as an escape character. + /// * `comment` - The character at the start of a line that indicates that the rest of the line is a comment, + /// which should be ignored while parsing. + #[new] + #[pyo3(signature = (has_header=true, delimiter=None, double_quote=false, quote=None, escape_char=None, comment=None))] + pub fn new( + has_header: bool, + delimiter: Option, + double_quote: bool, + quote: Option, + escape_char: Option, + comment: Option, + ) -> PyResult { + Ok(Self::new_with_defaults( + has_header, + delimiter, + double_quote, + quote, + escape_char, + comment, + )?) + } + + fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { + match op { + CompareOp::Eq => self == other, + CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq), + _ => unimplemented!("not implemented"), + } + } + + pub fn __str__(&self) -> PyResult { + Ok(format!("{:?}", self)) + } +} + +pub fn char_to_byte(c: Option) -> Result, super::Error> { + match c.map(u8::try_from).transpose() { + Ok(b) => Ok(b), + Err(e) => Err(super::Error::WrongChar { + source: e, + val: c.unwrap_or(' '), + }), + } +} + +impl_bincode_py_state_serialization!(CsvParseOptions); + +/// Options for reading CSV files. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] +pub struct CsvReadOptions { + pub buffer_size: Option, + pub chunk_size: Option, +} + +impl CsvReadOptions { + pub fn new_internal(buffer_size: Option, chunk_size: Option) -> Self { + Self { + buffer_size, + chunk_size, + } + } + + pub fn with_buffer_size(self, buffer_size: Option) -> Self { + Self { + buffer_size, + chunk_size: self.chunk_size, + } + } + + pub fn with_chunk_size(self, chunk_size: Option) -> Self { + Self { + buffer_size: self.buffer_size, + chunk_size, + } + } +} + +impl Default for CsvReadOptions { + fn default() -> Self { + Self::new_internal(None, None) + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl CsvReadOptions { + /// Create reading options for the CSV reader. + /// + /// # Arguments: + /// + /// * `buffer_size` - Size of the buffer (in bytes) used by the streaming reader. + /// * `chunk_size` - Size of the chunks (in bytes) deserialized in parallel by the streaming reader. + #[new] + #[pyo3(signature = (buffer_size=None, chunk_size=None))] + pub fn new(buffer_size: Option, chunk_size: Option) -> Self { + Self::new_internal(buffer_size, chunk_size) + } + + fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { + match op { + CompareOp::Eq => self == other, + CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq), + _ => unimplemented!("not implemented"), + } + } + + pub fn __str__(&self) -> PyResult { + Ok(format!("{:?}", self)) + } +} + +impl_bincode_py_state_serialization!(CsvReadOptions); diff --git a/src/daft-csv/src/python.rs b/src/daft-csv/src/python.rs index 965af9d750..229f4ada65 100644 --- a/src/daft-csv/src/python.rs +++ b/src/daft-csv/src/python.rs @@ -1,5 +1,3 @@ -use pyo3::prelude::*; - pub mod pylib { use std::sync::Arc; @@ -8,25 +6,17 @@ pub mod pylib { use daft_table::python::PyTable; use pyo3::{pyfunction, PyResult, Python}; + use crate::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; + #[pyfunction] - #[allow(clippy::too_many_arguments)] pub fn read_csv( py: Python, uri: &str, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: Option, - delimiter: Option, - double_quote: Option, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_config: Option, multithreaded_io: Option, - schema: Option, - buffer_size: Option, - chunk_size: Option, ) -> PyResult { py.allow_threads(|| { let io_stats = IOStatsContext::new(format!("read_csv: for uri {uri}")); @@ -37,21 +27,12 @@ pub mod pylib { )?; Ok(crate::read::read_csv( uri, - column_names, - include_columns, - num_rows, - has_header.unwrap_or(true), - delimiter, - double_quote.unwrap_or(true), - quote, - escape_char, - comment, + convert_options, + parse_options, + read_options, io_client, Some(io_stats), multithreaded_io.unwrap_or(true), - schema.map(|s| s.schema), - buffer_size, - chunk_size, None, )? .into()) @@ -59,16 +40,10 @@ pub mod pylib { } #[pyfunction] - #[allow(clippy::too_many_arguments)] pub fn read_csv_schema( py: Python, uri: &str, - has_header: Option, - delimiter: Option, - double_quote: Option, - quote: Option, - escape_char: Option, - comment: Option, + parse_options: Option, max_bytes: Option, io_config: Option, multithreaded_io: Option, @@ -80,14 +55,9 @@ pub mod pylib { multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), )?; - let (schema, _, _, _, _) = crate::metadata::read_csv_schema( + let (schema, _) = crate::metadata::read_csv_schema( uri, - has_header.unwrap_or(true), - delimiter, - double_quote.unwrap_or(true), - quote, - escape_char, - comment, + parse_options, max_bytes, io_client, Some(io_stats), @@ -96,9 +66,3 @@ pub mod pylib { }) } } - -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { - parent.add_wrapped(wrap_pyfunction!(pylib::read_csv))?; - parent.add_wrapped(wrap_pyfunction!(pylib::read_csv_schema))?; - Ok(()) -} diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 8c4a14b98b..a51e56ef60 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -7,77 +7,60 @@ use arrow2::{ use async_compat::{Compat, CompatExt}; use common_error::DaftResult; use csv_async::AsyncReader; -use daft_core::{ - schema::{Schema, SchemaRef}, - utils::arrow::cast_array_for_daft_if_needed, - Series, -}; +use daft_core::{schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series}; use daft_io::{get_runtime, GetResult, IOClient, IOStatsRef}; use daft_table::Table; -use futures::TryStreamExt; +use futures::{Stream, StreamExt, TryStreamExt}; use rayon::prelude::{ IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator, }; -use snafu::{futures::TryFutureExt, ResultExt}; +use snafu::{ + futures::{try_future::Context, TryFutureExt}, + ResultExt, +}; use tokio::{ fs::File, io::{AsyncBufRead, AsyncRead, BufReader}, + task::JoinHandle, }; use tokio_util::io::StreamReader; -use crate::metadata::read_csv_schema_single; -use crate::{compression::CompressionCodec, ArrowSnafu, Error}; +use crate::{compression::CompressionCodec, ArrowSnafu}; +use crate::{metadata::read_csv_schema_single, CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_decoding::deserialize::deserialize_column; -pub fn char_to_byte(c: Option) -> Result, Error> { - match c.map(u8::try_from).transpose() { - Ok(b) => Ok(b), - Err(e) => Err(Error::WrongChar { - source: e, - val: c.unwrap_or(' '), - }), - } -} +trait ByteRecordChunkStream = Stream>>; +trait ColumnArrayChunkStream = Stream< + Item = super::Result< + Context< + JoinHandle>>>, + super::JoinSnafu, + super::Error, + >, + >, +>; #[allow(clippy::too_many_arguments)] pub fn read_csv( uri: &str, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_client: Arc, io_stats: Option, multithreaded_io: bool, - schema: Option, - buffer_size: Option, - chunk_size: Option, max_chunks_in_flight: Option, ) -> DaftResult { let runtime_handle = get_runtime(multithreaded_io)?; let _rt_guard = runtime_handle.enter(); runtime_handle.block_on(async { - read_csv_single( + read_csv_single_into_table( uri, - column_names, - include_columns, - num_rows, - has_header, - char_to_byte(delimiter)?, - double_quote, - char_to_byte(quote)?, - char_to_byte(escape_char)?, - char_to_byte(comment)?, + convert_options, + parse_options, + read_options, io_client, io_stats, - schema, - buffer_size, - chunk_size, max_chunks_in_flight, ) .await @@ -85,295 +68,257 @@ pub fn read_csv( } #[allow(clippy::too_many_arguments)] -async fn read_csv_single( +pub fn read_csv_bulk( + uris: &[&str], + convert_options: Option, + parse_options: Option, + read_options: Option, + io_client: Arc, + io_stats: Option, + multithreaded_io: bool, + max_chunks_in_flight: Option, + num_parallel_tasks: usize, +) -> DaftResult> { + let runtime_handle = get_runtime(multithreaded_io)?; + let _rt_guard = runtime_handle.enter(); + let tables = runtime_handle + .block_on(async move { + // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. + let task_stream = futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| { + let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( + uri.to_string(), + convert_options.clone(), + parse_options.clone(), + read_options.clone(), + io_client.clone(), + io_stats.clone(), + ); + tokio::task::spawn(async move { + let table = read_csv_single_into_table( + uri.as_str(), + convert_options, + parse_options, + read_options, + io_client, + io_stats, + max_chunks_in_flight, + ) + .await?; + Ok((i, table)) + }) + })); + let mut remaining_rows = convert_options + .as_ref() + .and_then(|opts| opts.limit.map(|limit| limit as i64)); + task_stream + // Each task is annotated with its position in the output, so we can use unordered buffering to help mitigate stragglers + // and sort the task results at the end. + .buffer_unordered(num_parallel_tasks) + // Terminate the stream if we have already reached the row limit. With the upstream buffering, we will still read up to + // num_parallel_tasks redundant files. + .try_take_while(|result| { + match (result, remaining_rows) { + // Limit has been met, early-teriminate. + (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), + // Limit has not yet been met, update remaining limit slack and continue. + (Ok((_, table)), Some(rows_left)) => { + remaining_rows = Some(rows_left - table.len() as i64); + futures::future::ready(Ok(true)) + } + // (1) No limit, never early-terminate. + // (2) Encountered error, propagate error to try_collect to allow it to short-circuit. + (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), + } + }) + .try_collect::>() + .await + }) + .context(super::JoinSnafu {})?; + + // Sort the task results by task index, yielding tables whose order matches the input URI order. + let mut collected = tables.into_iter().collect::>>()?; + collected.sort_by_key(|(idx, _)| *idx); + Ok(collected.into_iter().map(|(_, v)| v).collect()) +} + +async fn read_csv_single_into_table( uri: &str, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_client: Arc, io_stats: Option, - schema: Option, - buffer_size: Option, - chunk_size: Option, max_chunks_in_flight: Option, ) -> DaftResult
{ - let (schema, estimated_mean_row_size, estimated_std_row_size) = match schema { + let include_columns = convert_options + .as_ref() + .and_then(|opts| opts.include_columns.clone()); + let (chunk_stream, fields) = read_csv_single_into_stream( + uri, + convert_options.unwrap_or_default(), + parse_options.unwrap_or_default(), + read_options, + io_client, + io_stats, + ) + .await?; + // Default max chunks in flight is set to 2x the number of cores, which should ensure pipelining of reading chunks + // with the parsing of chunks on the rayon threadpool. + let max_chunks_in_flight = max_chunks_in_flight.unwrap_or_else(|| { + std::thread::available_parallelism() + .unwrap_or(NonZeroUsize::new(2).unwrap()) + .checked_mul(2.try_into().unwrap()) + .unwrap() + .try_into() + .unwrap() + }); + // Collect all chunks in chunk x column form. + let chunks = chunk_stream + // Limit the number of chunks we have in flight at any given time. + .try_buffered(max_chunks_in_flight) + .try_collect::>() + .await? + .into_iter() + .collect::>>()?; + // Handle empty table case. + if chunks.is_empty() { + let schema: arrow2::datatypes::Schema = fields.into(); + let daft_schema = Arc::new(Schema::try_from(&schema)?); + return Table::empty(Some(daft_schema)); + } + // Transpose chunk x column into column x chunk. + let mut column_arrays = vec![Vec::with_capacity(chunks.len()); chunks[0].len()]; + for chunk in chunks.into_iter() { + for (idx, col) in chunk.into_iter().enumerate() { + column_arrays[idx].push(col); + } + } + // Build table from chunks. + // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked. + chunks_to_table(column_arrays, include_columns, fields) +} + +async fn read_csv_single_into_stream( + uri: &str, + convert_options: CsvConvertOptions, + parse_options: CsvParseOptions, + read_options: Option, + io_client: Arc, + io_stats: Option, +) -> DaftResult<(impl ColumnArrayChunkStream + Send, Vec)> { + let (mut schema, estimated_mean_row_size, estimated_std_row_size) = match convert_options.schema + { Some(schema) => (schema.to_arrow()?, None, None), None => { - let (schema, _, _, mean, std) = read_csv_schema_single( + let (schema, read_stats) = read_csv_schema_single( uri, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, + parse_options.clone(), // Read at most 1 MiB when doing schema inference. Some(1024 * 1024), io_client.clone(), io_stats.clone(), ) .await?; - (schema.to_arrow()?, Some(mean), Some(std)) - } - }; - let compression_codec = CompressionCodec::from_uri(uri); - match io_client - .single_url_get(uri.to_string(), None, io_stats) - .await? - { - GetResult::File(file) => { - read_csv_from_compressed_reader( - BufReader::new(File::open(file.path).await?), - compression_codec, - column_names, - include_columns, - num_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - schema, - // Default buffer size of 512 KiB. - buffer_size.unwrap_or(512 * 1024), - // Default chunk size of 64 KiB. - chunk_size.unwrap_or(64 * 1024), - // Default max chunks in flight is set to 2x the number of cores, which should ensure pipelining of reading chunks - // with the parsing of chunks on the rayon threadpool. - max_chunks_in_flight.unwrap_or( - std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(2).unwrap()) - .checked_mul(2.try_into().unwrap()) - .unwrap() - .try_into() - .unwrap(), - ), - estimated_mean_row_size, - estimated_std_row_size, - ) - .await - } - GetResult::Stream(stream, _, _) => { - read_csv_from_compressed_reader( - StreamReader::new(stream), - compression_codec, - column_names, - include_columns, - num_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - schema, - // Default buffer size of 512 KiB. - buffer_size.unwrap_or(512 * 1024), - // Default chunk size of 64 KiB. - chunk_size.unwrap_or(64 * 1024), - // Default max chunks in flight is set to 2x the number of cores, which should ensure pipelining of reading chunks - // with the parsing of chunks on the rayon threadpool. - max_chunks_in_flight.unwrap_or( - std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(2).unwrap()) - .checked_mul(2.try_into().unwrap()) - .unwrap() - .try_into() - .unwrap(), - ), - estimated_mean_row_size, - estimated_std_row_size, - ) - .await - } - } -} - -#[allow(clippy::too_many_arguments)] -async fn read_csv_from_compressed_reader( - reader: R, - compression_codec: Option, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, - schema: arrow2::datatypes::Schema, - buffer_size: usize, - chunk_size: usize, - max_chunks_in_flight: usize, - estimated_mean_row_size: Option, - estimated_std_row_size: Option, -) -> DaftResult
-where - R: AsyncBufRead + Unpin + Send + 'static, -{ - match compression_codec { - Some(compression) => { - read_csv_from_uncompressed_reader( - compression.to_decoder(reader), - column_names, - include_columns, - num_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - schema, - buffer_size, - chunk_size, - max_chunks_in_flight, - estimated_mean_row_size, - estimated_std_row_size, - ) - .await - } - None => { - read_csv_from_uncompressed_reader( - reader, - column_names, - include_columns, - num_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - schema, - buffer_size, - chunk_size, - max_chunks_in_flight, - estimated_mean_row_size, - estimated_std_row_size, + ( + schema.to_arrow()?, + Some(read_stats.mean_record_size_bytes), + Some(read_stats.stddev_record_size_bytes), ) - .await } - } -} - -#[allow(clippy::too_many_arguments)] -async fn read_csv_from_uncompressed_reader( - stream_reader: R, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, - schema: arrow2::datatypes::Schema, - buffer_size: usize, - chunk_size: usize, - max_chunks_in_flight: usize, - estimated_mean_row_size: Option, - estimated_std_row_size: Option, -) -> DaftResult
-where - R: AsyncRead + Unpin + Send, -{ - let reader = AsyncReaderBuilder::new() - .has_headers(has_header) - .delimiter(delimiter.unwrap_or(b',')) - .double_quote(double_quote) - .quote(quote.unwrap_or(b'"')) - .escape(escape_char) - .comment(comment) - .buffer_capacity(buffer_size) - .create_reader(stream_reader.compat()); - let mut fields = schema.fields; + }; // Rename fields, if necessary. - if let Some(column_names) = column_names { - fields = fields + if let Some(column_names) = convert_options.column_names { + schema = schema + .fields .into_iter() .zip(column_names.iter()) .map(|(field, name)| { - Field::new(*name, field.data_type, field.is_nullable).with_metadata(field.metadata) + Field::new(name, field.data_type, field.is_nullable).with_metadata(field.metadata) }) - .collect(); + .collect::>() + .into(); } - // Read CSV into Arrow2 column chunks. - let column_chunks = read_into_column_chunks( + let (reader, buffer_size, chunk_size): (Box, usize, usize) = + match io_client + .single_url_get(uri.to_string(), None, io_stats) + .await? + { + GetResult::File(file) => { + ( + Box::new(BufReader::new(File::open(file.path).await?)), + // Use user-provided buffer size, falling back to 8 * the user-provided chunk size if that exists, otherwise falling back to 512 KiB as the default. + read_options + .as_ref() + .and_then(|opt| opt.buffer_size.or_else(|| opt.chunk_size.map(|cs| 8 * cs))) + .unwrap_or(512 * 1024), + read_options + .as_ref() + .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) + .unwrap_or(64 * 1024), + ) + } + GetResult::Stream(stream, _, _) => ( + Box::new(StreamReader::new(stream)), + read_options + .as_ref() + .and_then(|opt| opt.buffer_size.or_else(|| opt.chunk_size.map(|cs| 8 * cs))) + .unwrap_or(512 * 1024), + read_options + .as_ref() + .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) + .unwrap_or(64 * 1024), + ), + }; + let reader: Box = match CompressionCodec::from_uri(uri) { + Some(compression) => Box::new(compression.to_decoder(reader)), + None => reader, + }; + let reader = AsyncReaderBuilder::new() + .has_headers(parse_options.has_header) + .delimiter(parse_options.delimiter) + .double_quote(parse_options.double_quote) + .quote(parse_options.quote) + .escape(parse_options.escape_char) + .comment(parse_options.comment) + .buffer_capacity(buffer_size) + .create_reader(reader.compat()); + let read_stream = read_into_byterecord_chunk_stream( reader, - fields.clone().into(), - fields_to_projection_indices(&fields, &include_columns), - num_rows, + schema.fields.len(), + convert_options.limit, chunk_size, - max_chunks_in_flight, estimated_mean_row_size, estimated_std_row_size, - ) - .await?; - // Truncate fields to only contain projected columns. - if let Some(include_columns) = include_columns { - let field_map = fields - .into_iter() - .map(|field| (field.name.clone(), field)) - .collect::>(); - fields = include_columns - .into_iter() - .map(|col| field_map[col].clone()) - .collect::>(); - } - // Concatenate column chunks and convert into Daft Series. - // Note that this concatenation is done in parallel on the rayon threadpool. - let columns_series = column_chunks - .into_par_iter() - .zip(&fields) - .map(|(mut arrays, field)| { - let array = if arrays.len() > 1 { - // Concatenate all array chunks. - let unboxed_arrays = arrays.iter().map(Box::as_ref).collect::>(); - arrow2::compute::concatenate::concatenate(unboxed_arrays.as_slice())? - } else { - // Return single array chunk directly. - arrays.pop().unwrap() - }; - Series::try_from((field.name.as_ref(), cast_array_for_daft_if_needed(array))) - }) - .collect::>>()?; - // Build Daft Table. - let schema: arrow2::datatypes::Schema = fields.into(); - let daft_schema = Schema::try_from(&schema)?; - Table::new(daft_schema, columns_series) + ); + let projection_indices = + fields_to_projection_indices(&schema.fields, &convert_options.include_columns); + let fields = schema.fields; + Ok(( + parse_into_column_array_chunk_stream( + read_stream, + Arc::new(fields.clone()), + projection_indices, + ), + fields, + )) } -#[allow(clippy::too_many_arguments)] -async fn read_into_column_chunks( +fn read_into_byterecord_chunk_stream( mut reader: AsyncReader>, - fields: Arc>, - projection_indices: Arc>, + num_fields: usize, num_rows: Option, chunk_size: usize, - max_chunks_in_flight: usize, estimated_mean_row_size: Option, estimated_std_row_size: Option, -) -> DaftResult>>> +) -> impl ByteRecordChunkStream + Send where - R: AsyncRead + Unpin + Send, + R: AsyncRead + Unpin + Send + 'static, { - let num_fields = fields.len(); let num_rows = num_rows.unwrap_or(usize::MAX); let mut estimated_mean_row_size = estimated_mean_row_size.unwrap_or(200f64); let mut estimated_std_row_size = estimated_std_row_size.unwrap_or(20f64); // Stream of unparsed CSV byte record chunks. - let read_stream = async_stream::try_stream! { + async_stream::try_stream! { // Number of rows read in last read. let mut rows_read = 1; // Total number of rows read across all reads. @@ -413,12 +358,18 @@ where chunk_buffer.truncate(rows_read); yield chunk_buffer } - }; + } +} + +fn parse_into_column_array_chunk_stream( + stream: impl ByteRecordChunkStream + Send, + fields: Arc>, + projection_indices: Arc>, +) -> impl ColumnArrayChunkStream + Send { // Parsing stream: we spawn background tokio + rayon tasks so we can pipeline chunk parsing with chunk reading, and // we further parse each chunk column in parallel on the rayon threadpool. - let parse_stream = read_stream.map_ok(|record| { - let fields = fields.clone(); - let projection_indices = projection_indices.clone(); + stream.map_ok(move |record| { + let (fields, projection_indices) = (fields.clone(), projection_indices.clone()); tokio::spawn(async move { let (send, recv) = tokio::sync::oneshot::channel(); rayon::spawn(move || { @@ -433,36 +384,60 @@ where 0, ) }) - .collect::>>>()?; - DaftResult::Ok(chunk) + .collect::>>>() + .context(ArrowSnafu)?; + Ok(chunk) })(); let _ = send.send(result); }); recv.await.context(super::OneShotRecvSnafu {})? }) .context(super::JoinSnafu {}) - }); - // Collect all chunks in chunk x column form. - let chunks = parse_stream - // Limit the number of chunks we have in flight at any given time. - .try_buffered(max_chunks_in_flight) - .try_collect::>() - .await? - .into_iter() - .collect::>>()?; - // Transpose chunk x column into column x chunk. - let mut column_arrays = vec![Vec::with_capacity(chunks.len()); projection_indices.len()]; - for chunk in chunks.into_iter() { - for (idx, col) in chunk.into_iter().enumerate() { - column_arrays[idx].push(col); - } + }) +} + +fn chunks_to_table( + chunks: Vec>>, + include_columns: Option>, + mut fields: Vec, +) -> DaftResult
{ + // Truncate fields to only contain projected columns. + if let Some(include_columns) = include_columns { + let field_map = fields + .into_iter() + .map(|field| (field.name.clone(), field)) + .collect::>(); + fields = include_columns + .into_iter() + .map(|col| field_map[&col].clone()) + .collect::>(); } - Ok(column_arrays) + // Concatenate column chunks and convert into Daft Series. + // Note that this concatenation is done in parallel on the rayon threadpool. + let columns_series = chunks + .into_par_iter() + .zip(&fields) + .map(|(mut arrays, field)| { + let array = if arrays.len() > 1 { + // Concatenate all array chunks. + let unboxed_arrays = arrays.iter().map(Box::as_ref).collect::>(); + arrow2::compute::concatenate::concatenate(unboxed_arrays.as_slice())? + } else { + // Return single array chunk directly. + arrays.pop().unwrap() + }; + Series::try_from((field.name.as_ref(), cast_array_for_daft_if_needed(array))) + }) + .collect::>>()?; + // Build Daft Table. + let schema: arrow2::datatypes::Schema = fields.into(); + let daft_schema = Schema::try_from(&schema)?; + Table::new(daft_schema, columns_series) } fn fields_to_projection_indices( fields: &Vec, - include_columns: &Option>, + include_columns: &Option>, ) -> Arc> { let field_name_to_idx = fields .iter() @@ -475,7 +450,7 @@ fn fields_to_projection_indices( || (0..fields.len()).collect(), |cols| { cols.iter() - .map(|c| field_name_to_idx[c]) + .map(|c| field_name_to_idx[c.as_str()]) .collect::>() }, ) @@ -502,7 +477,9 @@ mod tests { use daft_table::Table; use rstest::rstest; - use super::{char_to_byte, read_csv}; + use crate::{char_to_byte, CsvConvertOptions, CsvParseOptions, CsvReadOptions}; + + use super::read_csv; fn check_equal_local_arrow2( path: &str, @@ -591,25 +568,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 20); assert_eq!( table.schema, @@ -662,22 +621,16 @@ mod tests { ]; let table = read_csv( file.as_ref(), - Some(column_names.clone()), - None, - None, - false, - None, - true, - None, - None, + Some( + CsvConvertOptions::default() + .with_column_names(Some(column_names.iter().map(|s| s.to_string()).collect())), + ), + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -723,23 +676,14 @@ mod tests { let table = read_csv( file.as_ref(), None, - None, - Some(5), - true, - Some('|'), - true, - None, - None, + Some(CsvParseOptions::default().with_delimiter(b'|')), None, io_client, None, true, None, - None, - None, - None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 20); assert_eq!( table.schema, Schema::new(vec![ @@ -762,7 +706,7 @@ mod tests { None, None, None, - Some(5), + None, ); Ok(()) @@ -783,23 +727,14 @@ mod tests { let table = read_csv( file.as_ref(), None, - None, - Some(5), - true, - None, - false, - None, - None, + Some(CsvParseOptions::default().with_double_quote(false)), None, io_client, None, true, None, - None, - None, - None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 19); assert_eq!( table.schema, Schema::new(vec![ @@ -822,7 +757,7 @@ mod tests { None, None, None, - Some(5), + None, ); Ok(()) @@ -842,23 +777,14 @@ mod tests { let table = read_csv( file.as_ref(), None, - None, - Some(5), - true, - None, - true, - Some('\''), // Testing with single quote - None, + Some(CsvParseOptions::default().with_quote(b'\'')), None, io_client, None, true, None, - None, - None, - None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 20); assert_eq!( table.schema, Schema::new(vec![ @@ -881,7 +807,7 @@ mod tests { None, None, None, - Some(5), + None, ); Ok(()) @@ -899,23 +825,14 @@ mod tests { let table = read_csv( file.as_ref(), None, - None, - Some(5), - true, - None, - true, - None, - Some('\\'), //testing with '\' as escape character + Some(CsvParseOptions::default().with_escape_char(Some(b'\\'))), None, io_client, None, true, None, - None, - None, - None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 20); assert_eq!( table.schema, Schema::new(vec![ @@ -938,7 +855,7 @@ mod tests { None, None, None, - Some(5), + None, ); Ok(()) @@ -956,23 +873,14 @@ mod tests { let table = read_csv( file.as_ref(), None, + Some(CsvParseOptions::default().with_comment(Some(b'#'))), None, - Some(5), - true, - None, - true, - None, - None, - Some('#'), io_client, None, - true, - None, - None, - None, + true, None, )?; - assert_eq!(table.len(), 5); + assert_eq!(table.len(), 19); assert_eq!( table.schema, Schema::new(vec![ @@ -995,7 +903,7 @@ mod tests { Some('#'), None, None, - Some(5), + None, ); Ok(()) @@ -1011,22 +919,13 @@ mod tests { let table = read_csv( file.as_ref(), - None, - None, - Some(5), - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_limit(Some(5))), None, None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 5); assert_eq!( @@ -1068,22 +967,16 @@ mod tests { let table = read_csv( file.as_ref(), - None, - Some(vec!["petal.length", "petal.width"]), - None, - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_include_columns(Some(vec![ + "petal.length".to_string(), + "petal.width".to_string(), + ]))), None, None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -1132,22 +1025,20 @@ mod tests { ]; let table = read_csv( file.as_ref(), - Some(column_names.clone()), - Some(vec!["petal.length", "petal.width"]), - None, - false, - None, - true, - None, - None, + Some( + CsvConvertOptions::default() + .with_column_names(Some(column_names.iter().map(|s| s.to_string()).collect())) + .with_include_columns(Some(vec![ + "petal.length".to_string(), + "petal.width".to_string(), + ])), + ), + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -1188,20 +1079,11 @@ mod tests { file.as_ref(), None, None, - None, - true, - None, - true, - None, - None, - None, + Some(CsvReadOptions::default().with_buffer_size(Some(128))), io_client, None, true, None, - Some(128), - None, - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -1245,20 +1127,11 @@ mod tests { file.as_ref(), None, None, - None, - true, - None, - true, - None, - None, - None, + Some(CsvReadOptions::default().with_chunk_size(Some(100))), io_client, None, true, None, - None, - Some(100), - None, )?; assert_eq!(table.len(), 20); assert_eq!( @@ -1303,18 +1176,9 @@ mod tests { None, None, None, - true, - None, - true, - None, - None, - None, io_client, None, true, - None, - None, - None, Some(5), )?; assert_eq!(table.len(), 20); @@ -1355,25 +1219,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 6); assert_eq!( table.schema, @@ -1415,25 +1261,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 6); assert_eq!( table.schema, @@ -1482,21 +1310,12 @@ mod tests { let table = read_csv( file.as_ref(), - None, - None, - None, - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_schema(Some(schema.into()))), None, None, io_client, None, true, - Some(schema.into()), - None, - None, None, )?; assert_eq!(table.len(), 6); @@ -1538,25 +1357,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 3); assert_eq!( table.schema, @@ -1605,21 +1406,12 @@ mod tests { ])?; let table = read_csv( file.as_ref(), - None, - None, - None, - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_schema(Some(schema.into()))), None, None, io_client, None, true, - Some(schema.into()), - None, - None, None, )?; let num_rows = table.len(); @@ -1645,25 +1437,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let err = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - ); + let err = read_csv(file.as_ref(), None, None, None, io_client, None, true, None); assert!(err.is_err()); let err = err.unwrap_err(); assert!(matches!(err, DaftError::ArrowError(_)), "{}", err); @@ -1692,21 +1466,12 @@ mod tests { let err = read_csv( file.as_ref(), None, - None, - None, - false, - None, - true, - None, - None, + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, ); assert!(err.is_err()); let err = err.unwrap_err(); @@ -1755,25 +1520,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file.as_ref(), - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - None, - )?; + let table = read_csv(file.as_ref(), None, None, None, io_client, None, true, None)?; assert_eq!(table.len(), 100); assert_eq!( table.schema, @@ -1799,22 +1546,16 @@ mod tests { let column_names = vec!["a", "b"]; let table = read_csv( file, - Some(column_names.clone()), - None, - None, - false, - None, - true, - None, - None, + Some( + CsvConvertOptions::default() + .with_column_names(Some(column_names.iter().map(|s| s.to_string()).collect())), + ), + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 100); assert_eq!( @@ -1841,22 +1582,17 @@ mod tests { let column_names = vec!["a", "b"]; let table = read_csv( file, - Some(column_names.clone()), - Some(vec!["b"]), - None, - false, - None, - true, - None, - None, + Some( + CsvConvertOptions::default() + .with_column_names(Some(column_names.iter().map(|s| s.to_string()).collect())) + .with_include_columns(Some(vec!["b".to_string()])), + ), + Some(CsvParseOptions::default().with_has_header(false)), None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 100); assert_eq!( @@ -1878,22 +1614,13 @@ mod tests { let table = read_csv( file, - None, - None, - Some(10), - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_limit(Some(10))), None, None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 10); assert_eq!( @@ -1919,22 +1646,13 @@ mod tests { let table = read_csv( file, - None, - Some(vec!["b"]), - None, - true, - None, - true, - None, + Some(CsvConvertOptions::default().with_include_columns(Some(vec!["b".to_string()]))), None, None, io_client, None, true, None, - None, - None, - None, )?; assert_eq!(table.len(), 100); assert_eq!( @@ -1958,20 +1676,11 @@ mod tests { file, None, None, - None, - true, - None, - true, - None, - None, - None, + Some(CsvReadOptions::default().with_buffer_size(Some(100))), io_client, None, true, None, - Some(100), - None, - None, )?; assert_eq!(table.len(), 5000); @@ -1991,20 +1700,11 @@ mod tests { file, None, None, - None, - true, - None, - true, - None, - None, - None, + Some(CsvReadOptions::default().with_chunk_size(Some(100))), io_client, None, true, None, - None, - Some(100), - None, )?; assert_eq!(table.len(), 5000); @@ -2020,25 +1720,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_csv( - file, - None, - None, - None, - true, - None, - true, - None, - None, - None, - io_client, - None, - true, - None, - None, - None, - Some(5), - )?; + let table = read_csv(file, None, None, None, io_client, None, true, Some(5))?; assert_eq!(table.len(), 5000); Ok(()) diff --git a/src/daft-decoding/src/deserialize.rs b/src/daft-decoding/src/deserialize.rs index 878b93064e..e8e79156a6 100644 --- a/src/daft-decoding/src/deserialize.rs +++ b/src/daft-decoding/src/deserialize.rs @@ -176,10 +176,6 @@ fn deserialize_datetime( for i in 0..ALL_TIMESTAMP_FMTS.len() { let idx = (i + *fmt_idx) % ALL_TIMESTAMP_FMTS.len(); let fmt = ALL_TIMESTAMP_FMTS[idx]; - println!( - "Deserializing dt: {string} with {fmt} as {:?}", - chrono::DateTime::parse_from_str(string, fmt) - ); if let Ok(dt) = chrono::DateTime::parse_from_str(string, fmt) { *fmt_idx = idx; return Some(dt.with_timezone(tz)); diff --git a/src/daft-micropartition/src/lib.rs b/src/daft-micropartition/src/lib.rs index c15e1e2c30..d0d305d13b 100644 --- a/src/daft-micropartition/src/lib.rs +++ b/src/daft-micropartition/src/lib.rs @@ -25,6 +25,9 @@ pub enum Error { #[snafu(display("Duplicate name found when evaluating expressions: {}", name))] DuplicatedField { name: String }, + #[snafu(display("CSV error: {}", source))] + DaftCSV { source: daft_csv::Error }, + #[snafu(display( "Field: {} not found in Parquet File: Available Fields: {:?}", field, diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index f0946748ba..b53af50908 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -7,7 +7,7 @@ use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; use daft_core::schema::{Schema, SchemaRef}; -use daft_csv::read::read_csv; +use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_parquet::read::{ read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions, }; @@ -18,9 +18,9 @@ use daft_table::Table; use snafu::ResultExt; -use crate::DaftCoreComputeSnafu; #[cfg(feature = "python")] use crate::PyIOSnafu; +use crate::{DaftCSVSnafu, DaftCoreComputeSnafu}; use daft_io::{IOConfig, IOStatsRef}; use daft_stats::TableMetadata; @@ -143,29 +143,40 @@ fn materialize_scan_task( } else { None }; - urls.map(|url| { - daft_csv::read::read_csv( - url, - col_names.clone(), - column_names.clone(), - scan_task.pushdowns.limit, - cfg.has_headers, - cfg.delimiter, - cfg.double_quote, - cfg.quote, - cfg.escape_char, - cfg.comment, - io_client.clone(), - io_stats.clone(), - native_storage_config.multithreaded_io, - None, // Allow read_csv to perform its own schema inference - cfg.buffer_size, - cfg.chunk_size, - None, // max_chunks_in_flight - ) - .context(DaftCoreComputeSnafu) - }) - .collect::>>()? + let convert_options = CsvConvertOptions::new_internal( + scan_task.pushdowns.limit, + column_names + .as_ref() + .map(|cols| cols.iter().map(|col| col.to_string()).collect()), + col_names + .as_ref() + .map(|cols| cols.iter().map(|col| col.to_string()).collect()), + None, + ); + let parse_options = CsvParseOptions::new_with_defaults( + cfg.has_headers, + cfg.delimiter, + cfg.double_quote, + cfg.quote, + cfg.escape_char, + cfg.comment, + ) + .context(DaftCSVSnafu)?; + let read_options = + CsvReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size); + let uris = urls.collect::>(); + daft_csv::read_csv_bulk( + uris.as_slice(), + Some(convert_options), + Some(parse_options), + Some(read_options), + io_client, + io_stats, + native_storage_config.multithreaded_io, + None, + 8, + ) + .context(DaftCoreComputeSnafu)? } // **************** @@ -535,60 +546,33 @@ fn parquet_sources_to_row_groups(sources: &[DataFileSource]) -> Option>, - include_columns: Option>, - num_rows: Option, - has_header: bool, - delimiter: Option, - double_quote: bool, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_config: Arc, multithreaded_io: bool, io_stats: Option, - schema: Option, - buffer_size: Option, - chunk_size: Option, ) -> DaftResult { let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?; - let mut remaining_rows = num_rows; match uris { [] => Ok(MicroPartition::empty(None)), uris => { - // Naively load CSVs from URIs - let mut tables = vec![]; - for uri in uris { - // Terminate early if we have read enough rows already - if remaining_rows.map(|rr| rr == 0).unwrap_or(false) { - break; - } - let table = read_csv( - uri, - column_names.clone(), - include_columns.clone(), - remaining_rows, - has_header, - delimiter, - double_quote, - quote, - escape_char, - comment, - io_client.clone(), - io_stats.clone(), - multithreaded_io, - schema.clone(), - buffer_size, - chunk_size, - None, - )?; - remaining_rows = remaining_rows.map(|rr| rr - table.len()); - tables.push(table); - } + // Perform a bulk read of URIs, materializing a table per URI. + let tables = daft_csv::read_csv_bulk( + uris, + convert_options, + parse_options, + read_options, + io_client, + io_stats, + multithreaded_io, + None, + 8, + ) + .context(DaftCoreComputeSnafu)?; // Union all schemas and cast all tables to the same schema let unioned_schema = tables @@ -802,5 +786,6 @@ impl Display for MicroPartition { Ok(()) } } + #[cfg(test)] mod test {} diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 1a028b79ad..f2a6796ffc 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -9,6 +9,7 @@ use daft_core::{ schema::Schema, Series, }; +use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_dsl::python::PyExpr; use daft_io::{python::IOConfig, IOStatsContext}; use daft_parquet::read::ParquetSchemaInferenceOptions; @@ -369,46 +370,27 @@ impl PyMicroPartition { Ok(mp.into()) } - #[allow(clippy::too_many_arguments)] #[staticmethod] pub fn read_csv( py: Python, uri: &str, - column_names: Option>, - include_columns: Option>, - num_rows: Option, - has_header: Option, - delimiter: Option, - double_quote: Option, - quote: Option, - escape_char: Option, - comment: Option, + convert_options: Option, + parse_options: Option, + read_options: Option, io_config: Option, multithreaded_io: Option, - schema: Option, - buffer_size: Option, - chunk_size: Option, ) -> PyResult { let mp = py.allow_threads(|| { let io_stats = IOStatsContext::new(format!("read_csv: for uri {uri}")); let io_config = io_config.unwrap_or_default().config.into(); crate::micropartition::read_csv_into_micropartition( [uri].as_ref(), - column_names, - include_columns, - num_rows, - has_header.unwrap_or(true), - delimiter, - double_quote.unwrap_or(true), - quote, - escape_char, - comment, + convert_options, + parse_options, + read_options, io_config, multithreaded_io.unwrap_or(true), Some(io_stats), - schema.map(|s| s.schema), - buffer_size, - chunk_size, ) })?; Ok(mp.into()) @@ -648,7 +630,6 @@ pub(crate) fn read_csv_into_py_table( .extract() } -#[allow(clippy::too_many_arguments)] pub(crate) fn read_parquet_into_py_table( py: Python, uri: &str, diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 5227c8fa42..226f0c91c0 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -2,6 +2,7 @@ use std::{fmt::Display, sync::Arc}; use common_error::{DaftError, DaftResult}; use daft_core::schema::SchemaRef; +use daft_csv::CsvParseOptions; use daft_io::{ get_io_client, get_runtime, parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef, }; @@ -178,14 +179,16 @@ impl GlobScanOperator { comment, .. }) => { - let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema( + let (schema, _) = daft_csv::metadata::read_csv_schema( first_filepath.as_str(), - *has_headers, - *delimiter, - *double_quote, - *quote, - *escape_char, - *comment, + Some(CsvParseOptions::new_with_defaults( + *has_headers, + *delimiter, + *double_quote, + *quote, + *escape_char, + *comment, + )?), None, io_client, Some(io_stats),