Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Complete removal of fsspec #1448

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 8 additions & 41 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ from daft.runners.partitioning import PartitionCacheEntry
from daft.execution import physical_plan
from daft.planner.planner import PartitionT
import pyarrow
import fsspec

class ImageMode(Enum):
"""
Expand Down Expand Up @@ -319,45 +318,6 @@ class IOConfig:
Recreate an IOConfig from a JSON string.
"""

class NativeStorageConfig:
"""
Storage configuration for the Rust-native I/O layer.
"""

io_config: IOConfig

def __init__(self, io_config: IOConfig | None = None): ...

class PythonStorageConfig:
"""
Storage configuration for the legacy Python I/O layer.
"""

fs: fsspec.AbstractFileSystem

def __init__(self, fs: fsspec.AbstractFileSystem | None = None): ...

class StorageConfig:
"""
Configuration for interacting with a particular storage backend, using a particular
I/O layer implementation.
"""

@staticmethod
def native(config: NativeStorageConfig) -> StorageConfig:
"""
Create from a native storage config.
"""
...
@staticmethod
def python(config: PythonStorageConfig) -> StorageConfig:
"""
Create from a Python storage config.
"""
...
@property
def config(self) -> NativeStorageConfig | PythonStorageConfig: ...

def read_parquet(
uri: str,
columns: list[str] | None = None,
Expand Down Expand Up @@ -694,7 +654,7 @@ class LogicalPlanBuilder:
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan(
file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig
file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, io_config: IOConfig | None = None
) -> LogicalPlanBuilder: ...
def project(self, projection: list[PyExpr], resource_request: ResourceRequest) -> LogicalPlanBuilder: ...
def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ...
Expand Down Expand Up @@ -727,3 +687,10 @@ class LogicalPlanBuilder:
def build_type() -> str: ...
def version() -> str: ...
def __getattr__(name) -> Any: ...
def io_glob(
path: str,
multithreaded_io: bool | None = None,
io_config: IOConfig | None = None,
fanout_limit: int | None = None,
page_size: int | None = None,
) -> list[dict]: ...
8 changes: 3 additions & 5 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
CsvSourceConfig,
FileFormat,
FileFormatConfig,
IOConfig,
JoinType,
JsonSourceConfig,
ParquetSourceConfig,
ResourceRequest,
StorageConfig,
)
from daft.expressions import Expression, ExpressionsProjection, col
from daft.logical.map_partition_ops import MapPartitionOp
Expand Down Expand Up @@ -314,7 +314,7 @@ class ReadFile(SingleOutputInstruction):
# Max number of rows to read.
limit_rows: int | None
schema: Schema
storage_config: StorageConfig
io_config: IOConfig | None
columns_to_read: list[str] | None
file_format_config: FileFormatConfig

Expand Down Expand Up @@ -369,7 +369,6 @@ def _handle_tabular_files_scan(
table_io.read_csv(
file=fp,
schema=self.schema,
storage_config=self.storage_config,
csv_options=TableParseCSVOptions(
delimiter=format_config.delimiter,
header_index=0 if format_config.has_headers else None,
Expand All @@ -386,7 +385,6 @@ def _handle_tabular_files_scan(
table_io.read_json(
file=fp,
schema=self.schema,
storage_config=self.storage_config,
read_options=read_options,
)
for fp in filepaths
Expand All @@ -399,7 +397,7 @@ def _handle_tabular_files_scan(
table_io.read_parquet(
file=fp,
schema=self.schema,
storage_config=self.storage_config,
io_config=self.io_config,
read_options=read_options,
)
for fp in filepaths
Expand Down
12 changes: 3 additions & 9 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@

from loguru import logger

from daft.daft import (
FileFormat,
FileFormatConfig,
JoinType,
ResourceRequest,
StorageConfig,
)
from daft.daft import FileFormat, FileFormatConfig, IOConfig, JoinType, ResourceRequest
from daft.execution import execution_step
from daft.execution.execution_step import (
Instruction,
Expand Down Expand Up @@ -70,7 +64,7 @@ def file_read(
# Max number of rows to read.
limit_rows: int | None,
schema: Schema,
storage_config: StorageConfig,
io_config: IOConfig | None,
columns_to_read: list[str] | None,
file_format_config: FileFormatConfig,
) -> InProgressPhysicalPlan[PartitionT]:
Expand Down Expand Up @@ -102,7 +96,7 @@ def file_read(
file_rows=file_rows[i],
limit_rows=limit_rows,
schema=schema,
storage_config=storage_config,
io_config=io_config,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
),
Expand Down
2 changes: 1 addition & 1 deletion daft/execution/physical_plan_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _get_physical_plan(node: LogicalPlan, psets: dict[str, list[PartitionT]]) ->
child_plan=child_plan,
limit_rows=node._limit_rows,
schema=node._schema,
storage_config=node._storage_config,
io_config=node._io_config,
columns_to_read=node._column_names,
file_format_config=node._file_format_config,
)
Expand Down
6 changes: 3 additions & 3 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from daft.daft import (
FileFormat,
FileFormatConfig,
IOConfig,
JoinType,
PyExpr,
PySchema,
PyTable,
ResourceRequest,
StorageConfig,
)
from daft.execution import execution_step, physical_plan
from daft.expressions import Expression, ExpressionsProjection
Expand All @@ -26,7 +26,7 @@ def tabular_scan(
columns_to_read: list[str],
file_info_table: PyTable,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
io_config: IOConfig,
limit: int,
is_ray_runner: bool,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
Expand All @@ -45,7 +45,7 @@ def tabular_scan(
child_plan=file_info_iter,
limit_rows=limit,
schema=Schema._from_pyschema(schema),
storage_config=storage_config,
io_config=io_config,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
)
Expand Down
39 changes: 11 additions & 28 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from datetime import date, datetime
from typing import TYPE_CHECKING, Callable, Iterable, Iterator, TypeVar, overload

import fsspec
import pyarrow as pa

from daft import context
Expand Down Expand Up @@ -424,47 +423,31 @@ def download(
self,
max_connections: int = 32,
on_error: Literal["raise"] | Literal["null"] = "raise",
fs: fsspec.AbstractFileSystem | None = None,
io_config: IOConfig | None = None,
use_native_downloader: bool = False,
) -> Expression:
"""Treats each string as a URL, and downloads the bytes contents as a bytes column

Args:
max_connections: The maximum number of connections to use per thread to use for downloading URLs, defaults to 32
on_error: Behavior when a URL download error is encountered - "raise" to raise the error immediately or "null" to log
the error but fallback to a Null value. Defaults to "raise".
fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for downloading data.
By default, Daft will automatically construct a FileSystem instance internally.
use_native_downloader (bool): Use the native downloader rather than python based one.
Defaults to False.

Returns:
Expression: a Binary expression which is the bytes contents of the URL, or None if an error occured during download
"""
if fs is None and use_native_downloader:
raise_on_error = False
if on_error == "raise":
raise_on_error = True
elif on_error == "null":
raise_on_error = False
if on_error == "raise":
raise_on_error = True
elif on_error == "null":
raise_on_error = False
else:
raise NotImplemented(f"Unimplemented on_error option: {on_error}.")
if not (isinstance(max_connections, int) and max_connections > 0):
raise ValueError(f"Invalid value for `max_connections`: {max_connections}")
using_ray_runner = context.get_context().is_ray_runner
return Expression._from_pyexpr(
self._expr.url_download(max_connections, raise_on_error, not using_ray_runner, io_config)
)
else:
from daft.udf_library import url_udfs

return url_udfs.download_udf(
Expression._from_pyexpr(self._expr),
max_worker_threads=max_connections,
on_error=on_error,
fs=fs,
)
raise NotImplemented(f"Unimplemented on_error option: {on_error}.")
if not (isinstance(max_connections, int) and max_connections > 0):
raise ValueError(f"Invalid value for `max_connections`: {max_connections}")
using_ray_runner = context.get_context().is_ray_runner
return Expression._from_pyexpr(
self._expr.url_download(max_connections, raise_on_error, not using_ray_runner, io_config)
)


class ExpressionFloatNamespace(ExpressionNamespace):
Expand Down
Loading