Skip to content

Commit

Permalink
[FEAT] [JSON Reader] Add native streaming + parallel JSON reader. (#1679
Browse files Browse the repository at this point in the history
)

This PR adds a streaming + parallel JSON reader, with full support for
most fundamental dtypes (sans decimal and binary types), arbitrary
nesting with JSON lists and objects, including nulls at all levels of
the JSON object tree.

## TODOs

- [x] Add schema inference unit test for dtype coverage (i.e. reading
the `dtypes.jsonl` file).
- [x] Add temporal type inference + parsing test coverage.
- [x] Benchmarking + performance audit: this reader follows the same
general concurrency + parallelism model of the streaming CSV reader,
which performs relatively well for cloud reads, but there's bound to be
a lot of low-hanging fruit around unnecessary copies.
- [ ] (Follow-up?) Add thorough parsing and dtype inference unit tests
on in-memory defined JSON strings.
- [ ] (Follow-up) Support for decimal and (large) binary types.
- [ ] (Follow-up) Add support for strict parsing, i.e. returning an
error instead of falling back to a null value when parsing fails.
- [ ] (Follow-up) Misc. bugs in Arrow2 that should be fixed and
upstreamed.
- [ ] (Follow-up) Deflate compression support.
  • Loading branch information
clarkzinzow authored Dec 6, 2023
1 parent a53cd51 commit 3693c22
Show file tree
Hide file tree
Showing 56 changed files with 3,657 additions and 268 deletions.
88 changes: 79 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
[dependencies]
common-daft-config = {path = "src/common/daft-config", default-features = false}
daft-compression = {path = "src/daft-compression", default-features = false}
daft-core = {path = "src/daft-core", default-features = false}
daft-csv = {path = "src/daft-csv", default-features = false}
daft-dsl = {path = "src/daft-dsl", default-features = false}
daft-io = {path = "src/daft-io", default-features = false}
daft-json = {path = "src/daft-json", default-features = false}
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
Expand All @@ -25,6 +27,7 @@ python = [
"daft-plan/python",
"daft-parquet/python",
"daft-csv/python",
"daft-json/python",
"daft-micropartition/python",
"daft-scan/python",
"daft-stats/python",
Expand Down Expand Up @@ -76,6 +79,7 @@ members = [
"src/daft-io",
"src/daft-parquet",
"src/daft-csv",
"src/daft-json",
"src/daft-dsl",
"src/daft-table",
"src/daft-plan",
Expand Down Expand Up @@ -108,10 +112,10 @@ tokio-util = "0.7.8"
url = "2.4.0"

[workspace.dependencies.arrow2]
# branch = "jay/fix-parquet-timezone-parsing"
# branch = "daft-fork"
git = "https://github.com/Eventual-Inc/arrow2"
package = "arrow2"
rev = "3a23c780d4d59ef0c9c8751675480e07f4e1c311"
rev = "d5685eebf1d65c3f3d854370ad39f93dcd91971a"

[workspace.dependencies.bincode]
version = "1.3.3"
Expand Down
69 changes: 69 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@ class JsonSourceConfig:
Configuration of a JSON data source.
"""

buffer_size: int | None
chunk_size: int | None

def __init__(
self,
buffer_size: int | None = None,
chunk_size: int | None = None,
): ...

class FileFormatConfig:
"""
Configuration for parsing a particular file format (Parquet, CSV, JSON).
Expand Down Expand Up @@ -298,6 +307,41 @@ class CsvReadOptions:
chunk_size: int | None = None,
): ...

class JsonConvertOptions:
"""
Options for converting JSON data to Daft data.
"""

limit: int | None
include_columns: list[str] | None
schema: PySchema | None

def __init__(
self,
limit: int | None = None,
include_columns: list[str] | None = None,
schema: PySchema | None = None,
): ...

class JsonParseOptions:
"""
Options for parsing JSON files.
"""

class JsonReadOptions:
"""
Options for reading JSON files.
"""

buffer_size: int | None
chunk_size: int | None

def __init__(
self,
buffer_size: int | None = None,
chunk_size: int | None = None,
): ...

class FileInfo:
"""
Metadata for a single file.
Expand Down Expand Up @@ -587,6 +631,21 @@ def read_csv_schema(
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
): ...
def read_json(
uri: str,
convert_options: JsonConvertOptions | None = None,
parse_options: JsonParseOptions | None = None,
read_options: JsonReadOptions | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
max_chunks_in_flight: int | None = None,
): ...
def read_json_schema(
uri: str,
parse_options: JsonParseOptions | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
): ...

class PyTimeUnit:
@staticmethod
Expand Down Expand Up @@ -931,6 +990,16 @@ class PyMicroPartition:
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
): ...
@classmethod
def read_json_native(
cls,
uri: str,
convert_options: JsonConvertOptions | None = None,
parse_options: JsonParseOptions | None = None,
read_options: JsonReadOptions | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
): ...

class PhysicalPlanScheduler:
"""
Expand Down
4 changes: 4 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
FileFormatConfig,
IOConfig,
JoinType,
JsonReadOptions,
JsonSourceConfig,
ParquetSourceConfig,
ResourceRequest,
Expand Down Expand Up @@ -370,6 +371,9 @@ def _handle_tabular_files_scan(
file=fp,
schema=self.schema,
storage_config=self.storage_config,
json_read_options=JsonReadOptions(
buffer_size=format_config.buffer_size, chunk_size=format_config.chunk_size
),
read_options=read_options,
)
for fp in filepaths
Expand Down
13 changes: 11 additions & 2 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
FileFormatConfig,
IOConfig,
JsonSourceConfig,
NativeStorageConfig,
PythonStorageConfig,
StorageConfig,
)
Expand All @@ -20,6 +21,9 @@ def read_json(
path: Union[str, List[str]],
schema_hints: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
use_native_downloader: bool = True,
_buffer_size: Optional[int] = None,
_chunk_size: Optional[int] = None,
) -> DataFrame:
"""Creates a DataFrame from line-delimited JSON file(s)
Expand All @@ -34,15 +38,20 @@ def read_json(
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will
disable all schema inference on data being read, and throw an error if data being read is incompatible.
io_config (IOConfig): Config to be used with the native downloader
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
returns:
DataFrame: parsed DataFrame
"""
if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of JSON filepaths")

json_config = JsonSourceConfig()
json_config = JsonSourceConfig(_buffer_size, _chunk_size)
file_format_config = FileFormatConfig.from_json_config(json_config)
storage_config = StorageConfig.python(PythonStorageConfig(io_config=io_config))
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(io_config=io_config))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
Loading

0 comments on commit 3693c22

Please sign in to comment.