Skip to content

Commit

Permalink
[FEAT] [Native I/O] Add a native CSV reader. (#1475)
Browse files Browse the repository at this point in the history
This PR adds a simple native CSV reader. Local reads are fully async,
while remote object store reads currently bulk-download each file into a
byte buffer; streaming remote read support will be added in a follow-up
PR.

**NOTE:** This PR required some changes to Arrow2's async CSV reading
machinery, namely:
1. Schema inference was broken for headerless CSV files
([commit](Eventual-Inc/arrow2@a14e8c7)).
2. Type inference was broken for CSV columns that contain nulls
([commit](Eventual-Inc/arrow2@065a31d)).

The requisite changes are contained on this branch:
https://github.com/jorgecarleitao/arrow2/compare/main...Eventual-Inc:arrow2:clark/async-csv-fixes?expand=1

## TODOs (follow-up PRs)

- [ ] Add streaming remote reads
- [ ] Parallelize column/chunk deserialization

Closes #1462
  • Loading branch information
clarkzinzow authored Oct 9, 2023
1 parent 278e7cd commit 553a911
Show file tree
Hide file tree
Showing 20 changed files with 879 additions and 146 deletions.
46 changes: 45 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[dependencies]
daft-core = {path = "src/daft-core", default-features = false}
daft-csv = {path = "src/daft-csv", default-features = false}
daft-dsl = {path = "src/daft-dsl", default-features = false}
daft-io = {path = "src/daft-io", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
Expand All @@ -18,7 +19,8 @@ python = [
"daft-dsl/python",
"daft-io/python",
"daft-plan/python",
"daft-parquet/python"
"daft-parquet/python",
"daft-csv/python"
]

[lib]
Expand Down Expand Up @@ -63,12 +65,15 @@ members = [
"src/daft-core",
"src/daft-io",
"src/daft-parquet",
"src/daft-csv",
"src/daft-dsl",
"src/daft-table",
"src/daft-plan"
]

[workspace.dependencies]
async-compat = "0.2.1"
async-stream = "0.3.5"
bytes = "1.4.0"
futures = "0.3.28"
html-escape = "0.2.13"
Expand All @@ -78,17 +83,17 @@ num-derive = "0.3.3"
num-traits = "0.2"
prettytable-rs = "0.10"
rand = "^0.8"
rayon = "1.7.0"
serde_json = "1.0.104"
snafu = "0.7.4"
tokio = {version = "1.32.0", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]}
tokio-stream = {version = "0.1.14", features = ["fs"]}
tokio-util = "0.7.8"

[workspace.dependencies.arrow2]
git = "https://github.com/Eventual-Inc/arrow2"
package = "arrow2"
# branch = "jay/logical-type-null"
rev = "3ffe9226"
version = "0.17.1"
rev = "065a31da8fd8a75cbece5f99295a4068713a71ed"

[workspace.dependencies.bincode]
version = "1.3.3"
Expand Down
17 changes: 17 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,23 @@ def read_parquet_schema(
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: PyTimeUnit | None = None,
): ...
def read_csv(
uri: str,
column_names: list[str] | None = None,
include_columns: list[str] | None = None,
num_rows: int | None = None,
has_header: bool | None = None,
delimiter: str | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
): ...
def read_csv_schema(
uri: str,
has_header: bool | None = None,
delimiter: str | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
): ...

class PyTimeUnit:
@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

def tabular_scan(
schema: PySchema,
columns_to_read: list[str],
columns_to_read: list[str] | None,
file_info_table: PyTable,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
Expand Down
12 changes: 11 additions & 1 deletion daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from daft.daft import (
CsvSourceConfig,
FileFormatConfig,
IOConfig,
NativeStorageConfig,
PythonStorageConfig,
StorageConfig,
)
Expand All @@ -24,6 +26,8 @@ def read_csv(
has_headers: bool = True,
column_names: Optional[List[str]] = None,
delimiter: str = ",",
io_config: Optional["IOConfig"] = None,
use_native_downloader: bool = False,
) -> DataFrame:
"""Creates a DataFrame from CSV file(s)
Expand All @@ -41,6 +45,9 @@ def read_csv(
By default, Daft will automatically construct a FileSystem instance internally.
has_headers (bool): Whether the CSV has a header or not, defaults to True
delimiter (Str): Delimiter used in the CSV, defaults to ","
io_config (IOConfig): Config to be used with the native downloader
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
returns:
DataFrame: parsed DataFrame
Expand All @@ -57,6 +64,9 @@ def read_csv(

csv_config = CsvSourceConfig(delimiter=delimiter, has_headers=has_headers)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
storage_config = StorageConfig.python(PythonStorageConfig(fs))
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(fs))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
20 changes: 20 additions & 0 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from daft.daft import PyField as _PyField
from daft.daft import PySchema as _PySchema
from daft.daft import read_csv_schema as _read_csv_schema
from daft.daft import read_parquet_schema as _read_parquet_schema
from daft.datatype import DataType, TimeUnit

Expand Down Expand Up @@ -153,3 +154,22 @@ def from_parquet(
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
)
)

@classmethod
def from_csv(
cls,
path: str,
has_header: bool | None = None,
delimiter: str | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
) -> Schema:
return Schema._from_pyschema(
_read_csv_schema(
uri=path,
has_header=has_header,
delimiter=delimiter,
io_config=io_config,
multithreaded_io=multithreaded_io,
)
)
10 changes: 10 additions & 0 deletions daft/table/schema_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ def from_csv(

if storage_config is not None:
config = storage_config.config
if isinstance(config, NativeStorageConfig):
assert isinstance(file, (str, pathlib.Path)), "Native downloader only works on string inputs to read_csv"
io_config = config.io_config
return Schema.from_csv(
str(file),
has_header=csv_options.header_index is not None,
delimiter=csv_options.delimiter,
io_config=io_config,
)

assert isinstance(config, PythonStorageConfig)
fs = config.fs
else:
Expand Down
26 changes: 26 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from daft.arrow_utils import ensure_table
from daft.daft import JoinType
from daft.daft import PyTable as _PyTable
from daft.daft import read_csv as _read_csv
from daft.daft import read_parquet as _read_parquet
from daft.daft import read_parquet_bulk as _read_parquet_bulk
from daft.daft import read_parquet_into_pyarrow as _read_parquet_into_pyarrow
Expand Down Expand Up @@ -431,6 +432,31 @@ def read_parquet_statistics(
)
)

@classmethod
def read_csv(
cls,
path: str,
column_names: list[str] | None = None,
include_columns: list[str] | None = None,
num_rows: int | None = None,
has_header: bool | None = None,
delimiter: str | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
) -> Table:
return Table._from_pytable(
_read_csv(
uri=path,
column_names=column_names,
include_columns=include_columns,
num_rows=num_rows,
has_header=has_header,
delimiter=delimiter,
io_config=io_config,
multithreaded_io=multithreaded_io,
)
)


def _trim_pyarrow_large_arrays(arr: pa.ChunkedArray) -> pa.ChunkedArray:
if pa.types.is_large_binary(arr.type) or pa.types.is_large_string(arr.type):
Expand Down
16 changes: 16 additions & 0 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ def read_csv(
"""
if storage_config is not None:
config = storage_config.config
if isinstance(config, NativeStorageConfig):
assert isinstance(
file, (str, pathlib.Path)
), "Native downloader only works on string inputs to read_parquet"
has_header = csv_options.header_index is not None
tbl = Table.read_csv(
str(file),
column_names=schema.column_names() if not has_header else None,
include_columns=read_options.column_names,
num_rows=read_options.num_rows,
has_header=has_header,
delimiter=csv_options.delimiter,
io_config=config.io_config,
)
return _cast_table_to_schema(tbl, read_options=read_options, schema=schema)

assert isinstance(config, PythonStorageConfig)
fs = config.fs
else:
Expand Down
28 changes: 28 additions & 0 deletions src/daft-csv/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[dependencies]
arrow2 = {workspace = true, features = ["io_csv", "io_csv_async"]}
async-compat = {workspace = true}
async-stream = {workspace = true}
bytes = {workspace = true}
common-error = {path = "../common/error", default-features = false}
csv-async = "1.2.6"
daft-core = {path = "../daft-core", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
log = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true, optional = true}
rayon = {workspace = true}
snafu = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}
tokio-util = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python"]

[package]
edition = {workspace = true}
name = "daft-csv"
version = {workspace = true}
34 changes: 34 additions & 0 deletions src/daft-csv/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#![feature(async_closure)]
#![feature(let_chains)]
use common_error::DaftError;
use snafu::Snafu;

pub mod metadata;
#[cfg(feature = "python")]
pub mod python;
pub mod read;
#[cfg(feature = "python")]
pub use python::register_modules;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("{source}"))]
IOError { source: daft_io::Error },
#[snafu(display("{source}"))]
CSVError { source: csv_async::Error },
}

impl From<Error> for DaftError {
fn from(err: Error) -> DaftError {
match err {
Error::IOError { source } => source.into(),
_ => DaftError::External(err.into()),
}
}
}

impl From<daft_io::Error> for Error {
fn from(err: daft_io::Error) -> Self {
Error::IOError { source: err }
}
}
Loading

0 comments on commit 553a911

Please sign in to comment.