Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add streaming + parallel CSV reader, with decompression support. #1501

Merged
merged 12 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 139 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ members = [

[workspace.dependencies]
async-compat = "0.2.1"
async-compression = {version = "0.4.4", features = ["tokio", "all-algorithms"]}
async-stream = "0.3.5"
bytes = "1.4.0"
futures = "0.3.28"
Expand All @@ -88,15 +89,16 @@ prettytable-rs = "0.10"
rand = "^0.8"
rayon = "1.7.0"
serde_json = "1.0.104"
snafu = "0.7.4"
snafu = {version = "0.7.4", features = ["futures"]}
tokio = {version = "1.32.0", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]}
tokio-stream = {version = "0.1.14", features = ["fs"]}
tokio-util = "0.7.8"
url = "2.4.0"

[workspace.dependencies.arrow2]
git = "https://github.com/Eventual-Inc/arrow2"
package = "arrow2"
rev = "065a31da8fd8a75cbece5f99295a4068713a71ed"
rev = "0a6f79e0da7e32cc30381f4cc8cf5a8483909f78"

[workspace.dependencies.bincode]
version = "1.3.3"
Expand Down
13 changes: 12 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,16 @@ class CsvSourceConfig:

delimiter: str
has_headers: bool
buffer_size: int | None
chunk_size: int | None

def __init__(self, delimiter: str, has_headers: bool): ...
def __init__(
self,
delimiter: str,
has_headers: bool,
buffer_size: int | None = None,
chunk_size: int | None = None,
): ...

class JsonSourceConfig:
"""
Expand Down Expand Up @@ -425,6 +433,9 @@ def read_csv(
delimiter: str | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
schema: PySchema | None = None,
buffer_size: int | None = None,
chunk_size: int | None = None,
): ...
def read_csv_schema(
uri: str,
Expand Down
2 changes: 2 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ def _handle_tabular_files_scan(
csv_options=TableParseCSVOptions(
delimiter=format_config.delimiter,
header_index=0 if format_config.has_headers else None,
buffer_size=format_config.buffer_size,
chunk_size=format_config.chunk_size,
),
read_options=read_options,
)
Expand Down
9 changes: 8 additions & 1 deletion daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def read_csv(
delimiter: str = ",",
io_config: Optional["IOConfig"] = None,
use_native_downloader: bool = False,
_buffer_size: Optional[int] = None,
_chunk_size: Optional[int] = None,
) -> DataFrame:
"""Creates a DataFrame from CSV file(s)

Expand Down Expand Up @@ -62,7 +64,12 @@ def read_csv(
if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of CSV filepaths")

csv_config = CsvSourceConfig(delimiter=delimiter, has_headers=has_headers)
csv_config = CsvSourceConfig(
delimiter=delimiter,
has_headers=has_headers,
buffer_size=_buffer_size,
chunk_size=_chunk_size,
)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
Expand Down
4 changes: 4 additions & 0 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ class TableParseCSVOptions:
Args:
delimiter: The delimiter to use when parsing CSVs, defaults to ","
header_index: Index of the header row, or None if no header
buffer_size: Size of the buffer (in bytes) used by the streaming reader.
chunk_size: Size of the chunks (in bytes) deserialized in parallel by the streaming reader.
"""

delimiter: str = ","
header_index: int | None = 0
buffer_size: int | None = None
chunk_size: int | None = None


@dataclass(frozen=True)
Expand Down
6 changes: 6 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,9 @@ def read_csv(
delimiter: str | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
schema: Schema | None = None,
buffer_size: int | None = None,
chunk_size: int | None = None,
) -> Table:
return Table._from_pytable(
_read_csv(
Expand All @@ -457,6 +460,9 @@ def read_csv(
delimiter=delimiter,
io_config=io_config,
multithreaded_io=multithreaded_io,
schema=schema._schema if schema is not None else None,
buffer_size=buffer_size,
chunk_size=chunk_size,
)
)

Expand Down
3 changes: 3 additions & 0 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ def read_csv(
has_header=has_header,
delimiter=csv_options.delimiter,
io_config=config.io_config,
schema=schema,
buffer_size=csv_options.buffer_size,
chunk_size=csv_options.chunk_size,
)
return _cast_table_to_schema(tbl, read_options=read_options, schema=schema)

Expand Down
2 changes: 1 addition & 1 deletion src/daft-core/src/array/fixed_size_list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::{DaftArrayType, Field};
use crate::series::Series;
use crate::DataType;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FixedSizeListArray {
pub field: Arc<Field>,
pub flat_child: Series,
Expand Down
Loading
Loading