Skip to content

Commit

Permalink
Add support for fsspec filesystems to new query planner.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Sep 8, 2023
1 parent 1c0087a commit 8980f15
Show file tree
Hide file tree
Showing 34 changed files with 507 additions and 229 deletions.
28 changes: 12 additions & 16 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import pathlib
import sys
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Generic, TypeVar

if TYPE_CHECKING:
import fsspec
from typing import Generic, TypeVar

if sys.version_info < (3, 8):
from typing_extensions import Protocol
Expand All @@ -22,6 +19,7 @@
JsonSourceConfig,
ParquetSourceConfig,
ResourceRequest,
StorageConfig,
)
from daft.expressions import Expression, ExpressionsProjection, col
from daft.logical.map_partition_ops import MapPartitionOp
Expand Down Expand Up @@ -316,7 +314,7 @@ class ReadFile(SingleOutputInstruction):
# Max number of rows to read.
limit_rows: int | None
schema: Schema
fs: fsspec.AbstractFileSystem | None
storage_config: StorageConfig
columns_to_read: list[str] | None
file_format_config: FileFormatConfig

Expand Down Expand Up @@ -363,48 +361,46 @@ def _handle_tabular_files_scan(
)

file_format = self.file_format_config.file_format()
config = self.file_format_config.config
format_config = self.file_format_config.config
if file_format == FileFormat.Csv:
assert isinstance(config, CsvSourceConfig)
assert isinstance(format_config, CsvSourceConfig)
table = Table.concat(
[
table_io.read_csv(
file=fp,
schema=self.schema,
fs=self.fs,
storage_config=self.storage_config,
csv_options=TableParseCSVOptions(
delimiter=config.delimiter,
header_index=0 if config.has_headers else None,
delimiter=format_config.delimiter,
header_index=0 if format_config.has_headers else None,
),
read_options=read_options,
)
for fp in filepaths
]
)
elif file_format == FileFormat.Json:
assert isinstance(config, JsonSourceConfig)
assert isinstance(format_config, JsonSourceConfig)
table = Table.concat(
[
table_io.read_json(
file=fp,
schema=self.schema,
fs=self.fs,
storage_config=self.storage_config,
read_options=read_options,
)
for fp in filepaths
]
)
elif file_format == FileFormat.Parquet:
assert isinstance(config, ParquetSourceConfig)
assert isinstance(format_config, ParquetSourceConfig)
table = Table.concat(
[
table_io.read_parquet(
file=fp,
schema=self.schema,
fs=self.fs,
storage_config=self.storage_config,
read_options=read_options,
io_config=config.io_config,
use_native_downloader=config.use_native_downloader,
)
for fp in filepaths
]
Expand Down
17 changes: 10 additions & 7 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
import math
import pathlib
from collections import deque
from typing import TYPE_CHECKING, Generator, Iterator, TypeVar, Union

if TYPE_CHECKING:
import fsspec
from typing import Generator, Iterator, TypeVar, Union

from loguru import logger

from daft.daft import FileFormat, FileFormatConfig, JoinType, ResourceRequest
from daft.daft import (
FileFormat,
FileFormatConfig,
JoinType,
ResourceRequest,
StorageConfig,
)
from daft.execution import execution_step
from daft.execution.execution_step import (
Instruction,
Expand Down Expand Up @@ -67,7 +70,7 @@ def file_read(
# Max number of rows to read.
limit_rows: int | None,
schema: Schema,
fs: fsspec.AbstractFileSystem | None,
storage_config: StorageConfig,
columns_to_read: list[str] | None,
file_format_config: FileFormatConfig,
) -> InProgressPhysicalPlan[PartitionT]:
Expand Down Expand Up @@ -99,7 +102,7 @@ def file_read(
file_rows=file_rows[i],
limit_rows=limit_rows,
schema=schema,
fs=fs,
storage_config=storage_config,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
),
Expand Down
2 changes: 1 addition & 1 deletion daft/execution/physical_plan_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _get_physical_plan(node: LogicalPlan, psets: dict[str, list[PartitionT]]) ->
child_plan=child_plan,
limit_rows=node._limit_rows,
schema=node._schema,
fs=node._fs,
storage_config=node._storage_config,
columns_to_read=node._column_names,
file_format_config=node._file_format_config,
)
Expand Down
4 changes: 3 additions & 1 deletion daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
PySchema,
PyTable,
ResourceRequest,
StorageConfig,
)
from daft.execution import execution_step, physical_plan
from daft.expressions import Expression, ExpressionsProjection
Expand All @@ -26,6 +27,7 @@ def tabular_scan(
columns_to_read: list[str],
file_info_table: PyTable,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
limit: int,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
# TODO(Clark): Fix this Ray runner hack.
Expand All @@ -43,7 +45,7 @@ def tabular_scan(
child_plan=file_info_iter,
limit_rows=limit,
schema=Schema._from_pyschema(schema),
fs=None,
storage_config=storage_config,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
)
Expand Down
12 changes: 6 additions & 6 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
_resolve_filesystem_and_path,
)

from daft.daft import FileFormat, FileFormatConfig, FileInfos, ParquetSourceConfig
from daft.daft import FileFormat, FileInfos, NativeStorageConfig, StorageConfig
from daft.table import Table

_CACHED_FSES: dict[str, FileSystem] = {}
Expand Down Expand Up @@ -294,8 +294,9 @@ def _path_is_glob(path: str) -> bool:

def glob_path_with_stats(
path: str,
file_format_config: FileFormatConfig | None,
file_format: FileFormat | None,
fs: fsspec.AbstractFileSystem,
storage_config: StorageConfig | None,
) -> FileInfos:
"""Glob a path, returning a list ListingInfo."""
protocol = get_protocol_from_path(path)
Expand Down Expand Up @@ -326,10 +327,9 @@ def glob_path_with_stats(
raise FileNotFoundError(f"File or directory not found: {path}")

# Set number of rows if available.
if file_format_config is not None and file_format_config.file_format() == FileFormat.Parquet:
config = file_format_config.config
assert isinstance(config, ParquetSourceConfig)
if config.use_native_downloader:
if file_format is not None and file_format == FileFormat.Parquet:
config = storage_config.config if storage_config is not None else None
if config is not None and isinstance(config, NativeStorageConfig):
parquet_statistics = Table.read_parquet_statistics(
list(filepaths_to_infos.keys()), config.io_config
).to_pydict()
Expand Down
10 changes: 8 additions & 2 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import fsspec

from daft.api_annotations import PublicAPI
from daft.daft import CsvSourceConfig, FileFormatConfig
from daft.daft import (
CsvSourceConfig,
FileFormatConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.io.common import _get_tabular_files_scan
Expand Down Expand Up @@ -52,5 +57,6 @@ def read_csv(

csv_config = CsvSourceConfig(delimiter=delimiter, has_headers=has_headers)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, fs)
storage_config = StorageConfig.python(PythonStorageConfig(fs))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
10 changes: 8 additions & 2 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import fsspec

from daft.api_annotations import PublicAPI
from daft.daft import FileFormatConfig, JsonSourceConfig
from daft.daft import (
FileFormatConfig,
JsonSourceConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.io.common import _get_tabular_files_scan
Expand Down Expand Up @@ -40,5 +45,6 @@ def read_json(

json_config = JsonSourceConfig()
file_format_config = FileFormatConfig.from_json_config(json_config)
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, fs)
storage_config = StorageConfig.python(PythonStorageConfig(fs))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
17 changes: 13 additions & 4 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
import fsspec

from daft.api_annotations import PublicAPI
from daft.daft import FileFormatConfig, ParquetSourceConfig
from daft.daft import (
FileFormatConfig,
NativeStorageConfig,
ParquetSourceConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.io.common import _get_tabular_files_scan
Expand Down Expand Up @@ -47,8 +53,11 @@ def read_parquet(
if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of Parquet filepaths")

parquet_config = ParquetSourceConfig(use_native_downloader=use_native_downloader, io_config=io_config)
file_format_config = FileFormatConfig.from_parquet_config(parquet_config)
file_format_config = FileFormatConfig.from_parquet_config(ParquetSourceConfig())
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(fs))

builder = _get_tabular_files_scan(path, schema_hints, file_format_config, fs)
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config, fs=fs)
return DataFrame(builder)
16 changes: 10 additions & 6 deletions daft/io/common.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from __future__ import annotations

import fsspec
from typing import TYPE_CHECKING

from daft.context import get_context
from daft.daft import FileFormatConfig, LogicalPlanBuilder
from daft.daft import FileFormatConfig, LogicalPlanBuilder, StorageConfig
from daft.datatype import DataType
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.schema import Schema

if TYPE_CHECKING:
import fsspec


def _get_schema_from_hints(hints: dict[str, DataType]) -> Schema:
if isinstance(hints, dict):
Expand All @@ -20,27 +23,28 @@ def _get_tabular_files_scan(
path: str | list[str],
schema_hints: dict[str, DataType] | None,
file_format_config: FileFormatConfig,
fs: fsspec.AbstractFileSystem | None,
storage_config: StorageConfig,
fs: fsspec.AbstractFileSystem | None = None,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
paths = path if isinstance(path, list) else [str(path)]
schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None
# Glob the path using the Runner
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config, fs)
file_infos = runner_io.glob_paths_details(paths, file_format_config, fs=fs, storage_config=storage_config)

# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, fs)
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)
# Construct plan
builder_cls = get_context().logical_plan_builder_class()
builder = builder_cls.from_tabular_scan(
file_infos=file_infos,
schema=inferred_or_provided_schema,
file_format_config=file_format_config,
fs=fs,
storage_config=storage_config,
)
return builder
5 changes: 2 additions & 3 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

import fsspec

from daft.daft import (
FileFormat,
FileFormatConfig,
Expand All @@ -14,6 +12,7 @@
PartitionScheme,
PartitionSpec,
ResourceRequest,
StorageConfig,
)
from daft.expressions.expressions import Expression
from daft.logical.schema import Schema
Expand Down Expand Up @@ -84,7 +83,7 @@ def from_tabular_scan(
file_infos: FileInfos,
schema: Schema,
file_format_config: FileFormatConfig,
fs: fsspec.AbstractFileSystem | None,
storage_config: StorageConfig,
) -> LogicalPlanBuilder:
pass

Expand Down
Loading

0 comments on commit 8980f15

Please sign in to comment.