Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [New Query Planner] Add support for fsspec filesystems to new query planner. #1357

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import pathlib
import sys
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Generic, TypeVar

if TYPE_CHECKING:
import fsspec
from typing import Generic, TypeVar

if sys.version_info < (3, 8):
from typing_extensions import Protocol
Expand All @@ -22,6 +19,7 @@
JsonSourceConfig,
ParquetSourceConfig,
ResourceRequest,
StorageConfig,
)
from daft.expressions import Expression, ExpressionsProjection, col
from daft.logical.map_partition_ops import MapPartitionOp
Expand Down Expand Up @@ -316,7 +314,7 @@ class ReadFile(SingleOutputInstruction):
# Max number of rows to read.
limit_rows: int | None
schema: Schema
fs: fsspec.AbstractFileSystem | None
storage_config: StorageConfig
columns_to_read: list[str] | None
file_format_config: FileFormatConfig

Expand Down Expand Up @@ -363,48 +361,46 @@ def _handle_tabular_files_scan(
)

file_format = self.file_format_config.file_format()
config = self.file_format_config.config
format_config = self.file_format_config.config
if file_format == FileFormat.Csv:
assert isinstance(config, CsvSourceConfig)
assert isinstance(format_config, CsvSourceConfig)
table = Table.concat(
[
table_io.read_csv(
file=fp,
schema=self.schema,
fs=self.fs,
storage_config=self.storage_config,
csv_options=TableParseCSVOptions(
delimiter=config.delimiter,
header_index=0 if config.has_headers else None,
delimiter=format_config.delimiter,
header_index=0 if format_config.has_headers else None,
),
read_options=read_options,
)
for fp in filepaths
]
)
elif file_format == FileFormat.Json:
assert isinstance(config, JsonSourceConfig)
assert isinstance(format_config, JsonSourceConfig)
table = Table.concat(
[
table_io.read_json(
file=fp,
schema=self.schema,
fs=self.fs,
storage_config=self.storage_config,
read_options=read_options,
)
for fp in filepaths
]
)
elif file_format == FileFormat.Parquet:
assert isinstance(config, ParquetSourceConfig)
assert isinstance(format_config, ParquetSourceConfig)
table = Table.concat(
[
table_io.read_parquet(
file=fp,
schema=self.schema,
fs=self.fs,
storage_config=self.storage_config,
read_options=read_options,
io_config=config.io_config,
use_native_downloader=config.use_native_downloader,
)
for fp in filepaths
]
Expand Down
17 changes: 10 additions & 7 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
import math
import pathlib
from collections import deque
from typing import TYPE_CHECKING, Generator, Iterator, TypeVar, Union

if TYPE_CHECKING:
import fsspec
from typing import Generator, Iterator, TypeVar, Union

from loguru import logger

from daft.daft import FileFormat, FileFormatConfig, JoinType, ResourceRequest
from daft.daft import (
FileFormat,
FileFormatConfig,
JoinType,
ResourceRequest,
StorageConfig,
)
from daft.execution import execution_step
from daft.execution.execution_step import (
Instruction,
Expand Down Expand Up @@ -67,7 +70,7 @@ def file_read(
# Max number of rows to read.
limit_rows: int | None,
schema: Schema,
fs: fsspec.AbstractFileSystem | None,
storage_config: StorageConfig,
columns_to_read: list[str] | None,
file_format_config: FileFormatConfig,
) -> InProgressPhysicalPlan[PartitionT]:
Expand Down Expand Up @@ -99,7 +102,7 @@ def file_read(
file_rows=file_rows[i],
limit_rows=limit_rows,
schema=schema,
fs=fs,
storage_config=storage_config,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
),
Expand Down
2 changes: 1 addition & 1 deletion daft/execution/physical_plan_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _get_physical_plan(node: LogicalPlan, psets: dict[str, list[PartitionT]]) ->
child_plan=child_plan,
limit_rows=node._limit_rows,
schema=node._schema,
fs=node._fs,
storage_config=node._storage_config,
columns_to_read=node._column_names,
file_format_config=node._file_format_config,
)
Expand Down
4 changes: 3 additions & 1 deletion daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
PySchema,
PyTable,
ResourceRequest,
StorageConfig,
)
from daft.execution import execution_step, physical_plan
from daft.expressions import Expression, ExpressionsProjection
Expand All @@ -26,6 +27,7 @@ def tabular_scan(
columns_to_read: list[str],
file_info_table: PyTable,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
limit: int,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
# TODO(Clark): Fix this Ray runner hack.
Expand All @@ -43,7 +45,7 @@ def tabular_scan(
child_plan=file_info_iter,
limit_rows=limit,
schema=Schema._from_pyschema(schema),
fs=None,
storage_config=storage_config,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
)
Expand Down
12 changes: 6 additions & 6 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
_resolve_filesystem_and_path,
)

from daft.daft import FileFormat, FileFormatConfig, FileInfos, ParquetSourceConfig
from daft.daft import FileFormat, FileInfos, NativeStorageConfig, StorageConfig
from daft.table import Table

_CACHED_FSES: dict[str, FileSystem] = {}
Expand Down Expand Up @@ -294,8 +294,9 @@ def _path_is_glob(path: str) -> bool:

def glob_path_with_stats(
path: str,
file_format_config: FileFormatConfig | None,
file_format: FileFormat | None,
fs: fsspec.AbstractFileSystem,
storage_config: StorageConfig | None,
) -> FileInfos:
"""Glob a path, returning a list ListingInfo."""
protocol = get_protocol_from_path(path)
Expand Down Expand Up @@ -326,10 +327,9 @@ def glob_path_with_stats(
raise FileNotFoundError(f"File or directory not found: {path}")

# Set number of rows if available.
if file_format_config is not None and file_format_config.file_format() == FileFormat.Parquet:
config = file_format_config.config
assert isinstance(config, ParquetSourceConfig)
if config.use_native_downloader:
if file_format is not None and file_format == FileFormat.Parquet:
config = storage_config.config if storage_config is not None else None
if config is not None and isinstance(config, NativeStorageConfig):
parquet_statistics = Table.read_parquet_statistics(
list(filepaths_to_infos.keys()), config.io_config
).to_pydict()
Expand Down
10 changes: 8 additions & 2 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import fsspec

from daft.api_annotations import PublicAPI
from daft.daft import CsvSourceConfig, FileFormatConfig
from daft.daft import (
CsvSourceConfig,
FileFormatConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.io.common import _get_tabular_files_scan
Expand Down Expand Up @@ -52,5 +57,6 @@ def read_csv(

csv_config = CsvSourceConfig(delimiter=delimiter, has_headers=has_headers)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, fs)
storage_config = StorageConfig.python(PythonStorageConfig(fs))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
10 changes: 8 additions & 2 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import fsspec

from daft.api_annotations import PublicAPI
from daft.daft import FileFormatConfig, JsonSourceConfig
from daft.daft import (
FileFormatConfig,
JsonSourceConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.io.common import _get_tabular_files_scan
Expand Down Expand Up @@ -40,5 +45,6 @@ def read_json(

json_config = JsonSourceConfig()
file_format_config = FileFormatConfig.from_json_config(json_config)
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, fs)
storage_config = StorageConfig.python(PythonStorageConfig(fs))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
17 changes: 13 additions & 4 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
import fsspec

from daft.api_annotations import PublicAPI
from daft.daft import FileFormatConfig, ParquetSourceConfig
from daft.daft import (
FileFormatConfig,
NativeStorageConfig,
ParquetSourceConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.io.common import _get_tabular_files_scan
Expand Down Expand Up @@ -47,8 +53,11 @@ def read_parquet(
if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of Parquet filepaths")

parquet_config = ParquetSourceConfig(use_native_downloader=use_native_downloader, io_config=io_config)
file_format_config = FileFormatConfig.from_parquet_config(parquet_config)
file_format_config = FileFormatConfig.from_parquet_config(ParquetSourceConfig())
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(fs))

builder = _get_tabular_files_scan(path, schema_hints, file_format_config, fs)
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config, fs=fs)
return DataFrame(builder)
16 changes: 10 additions & 6 deletions daft/io/common.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from __future__ import annotations

import fsspec
from typing import TYPE_CHECKING

from daft.context import get_context
from daft.daft import FileFormatConfig, LogicalPlanBuilder
from daft.daft import FileFormatConfig, LogicalPlanBuilder, StorageConfig
from daft.datatype import DataType
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.schema import Schema

if TYPE_CHECKING:
import fsspec


def _get_schema_from_hints(hints: dict[str, DataType]) -> Schema:
if isinstance(hints, dict):
Expand All @@ -20,27 +23,28 @@ def _get_tabular_files_scan(
path: str | list[str],
schema_hints: dict[str, DataType] | None,
file_format_config: FileFormatConfig,
fs: fsspec.AbstractFileSystem | None,
storage_config: StorageConfig,
fs: fsspec.AbstractFileSystem | None = None,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
paths = path if isinstance(path, list) else [str(path)]
schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None
# Glob the path using the Runner
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config, fs)
file_infos = runner_io.glob_paths_details(paths, file_format_config, fs=fs, storage_config=storage_config)

# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, fs)
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)
# Construct plan
builder_cls = get_context().logical_plan_builder_class()
builder = builder_cls.from_tabular_scan(
file_infos=file_infos,
schema=inferred_or_provided_schema,
file_format_config=file_format_config,
fs=fs,
storage_config=storage_config,
)
return builder
5 changes: 2 additions & 3 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

import fsspec

from daft.daft import (
FileFormat,
FileFormatConfig,
Expand All @@ -14,6 +12,7 @@
PartitionScheme,
PartitionSpec,
ResourceRequest,
StorageConfig,
)
from daft.expressions.expressions import Expression
from daft.logical.schema import Schema
Expand Down Expand Up @@ -84,7 +83,7 @@ def from_tabular_scan(
file_infos: FileInfos,
schema: Schema,
file_format_config: FileFormatConfig,
fs: fsspec.AbstractFileSystem | None,
storage_config: StorageConfig,
) -> LogicalPlanBuilder:
pass

Expand Down
Loading