Skip to content

Commit

Permalink
Merge branch 'main' into sammy/python-scan-op
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Oct 26, 2023
2 parents 351fc91 + f49275b commit fbd94a1
Show file tree
Hide file tree
Showing 24 changed files with 118 additions and 433 deletions.
8 changes: 8 additions & 0 deletions .github/release-drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ change-template: '- $TITLE @$AUTHOR (#$NUMBER)'
change-title-escapes: \<*_& # You can add # and @ to disable mentions, and add ` to disable code blocks.


# Add the "increment-minor-version" label to increment minor versions
version-resolver:
minor:
labels:
- increment-minor-version
default: patch


template: |
## Changes
Expand Down
7 changes: 7 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -839,3 +839,10 @@ class LogicalPlanBuilder:
def build_type() -> str: ...
def version() -> str: ...
def __getattr__(name) -> Any: ...
def io_glob(
path: str,
multithreaded_io: bool | None = None,
io_config: IOConfig | None = None,
fanout_limit: int | None = None,
page_size: int | None = None,
) -> list[dict]: ...
7 changes: 2 additions & 5 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from datetime import date, datetime
from typing import TYPE_CHECKING, Callable, Iterable, Iterator, TypeVar, overload

import fsspec
import pyarrow as pa

from daft import context
Expand Down Expand Up @@ -424,9 +423,8 @@ def download(
self,
max_connections: int = 32,
on_error: Literal["raise"] | Literal["null"] = "raise",
fs: fsspec.AbstractFileSystem | None = None,
io_config: IOConfig | None = None,
use_native_downloader: bool = False,
use_native_downloader: bool = True,
) -> Expression:
"""Treats each string as a URL, and downloads the bytes contents as a bytes column
Expand All @@ -442,7 +440,7 @@ def download(
Returns:
Expression: a Binary expression which is the bytes contents of the URL, or None if an error occured during download
"""
if fs is None and use_native_downloader:
if use_native_downloader:
raise_on_error = False
if on_error == "raise":
raise_on_error = True
Expand All @@ -463,7 +461,6 @@ def download(
Expression._from_pyexpr(self._expr),
max_worker_threads=max_connections,
on_error=on_error,
fs=fs,
)


Expand Down
57 changes: 14 additions & 43 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import pathlib
import sys
import urllib.parse
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor

if sys.version_info < (3, 8):
from typing_extensions import Literal
Expand All @@ -25,7 +23,7 @@
_resolve_filesystem_and_path,
)

from daft.daft import FileFormat, FileInfos, NativeStorageConfig, StorageConfig
from daft.daft import FileFormat, FileInfos, IOConfig, io_glob
from daft.table import Table

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -221,6 +219,9 @@ def _resolve_path_and_filesystem(
None, the provided filesystem will still be validated against the
filesystem inferred from the provided path to ensure compatibility.
"""
# NOTE: PyArrow really dislikes the "file://" prefix, so we trim it out if present
path = path[7:] if path.startswith("file://") else path

# Use pyarrow utility to resolve filesystem and this particular path.
# If a non-None filesystem is provided to this utility, it will ensure that
# it is compatible with the provided path.
Expand Down Expand Up @@ -300,56 +301,26 @@ def _path_is_glob(path: str) -> bool:
def glob_path_with_stats(
path: str,
file_format: FileFormat | None,
fs: fsspec.AbstractFileSystem,
storage_config: StorageConfig | None,
io_config: IOConfig | None,
) -> FileInfos:
"""Glob a path, returning a list ListingInfo."""
protocol = get_protocol_from_path(path)

filepaths_to_infos: dict[str, dict[str, Any]] = defaultdict(dict)

if _path_is_glob(path):
globbed_data = fs.glob(path, detail=True)

for path, details in globbed_data.items():
path = _ensure_path_protocol(protocol, path)
filepaths_to_infos[path]["size"] = details["size"]

elif fs.isfile(path):
file_info = fs.info(path)

filepaths_to_infos[path]["size"] = file_info["size"]

elif fs.isdir(path):
files_info = fs.ls(path, detail=True)

for file_info in files_info:
path = file_info["name"]
path = _ensure_path_protocol(protocol, path)
filepaths_to_infos[path]["size"] = file_info["size"]

else:
raise FileNotFoundError(f"File or directory not found: {path}")
files = io_glob(path, io_config=io_config)
filepaths_to_infos = {f["path"]: {"size": f["size"], "type": f["type"]} for f in files}

# Set number of rows if available.
if file_format is not None and file_format == FileFormat.Parquet:
config = storage_config.config if storage_config is not None else None
if config is not None and isinstance(config, NativeStorageConfig):
parquet_statistics = Table.read_parquet_statistics(
list(filepaths_to_infos.keys()), config.io_config
).to_pydict()
for path, num_rows in zip(parquet_statistics["uris"], parquet_statistics["row_count"]):
filepaths_to_infos[path]["rows"] = num_rows
else:
parquet_metadatas = ThreadPoolExecutor().map(_get_parquet_metadata_single, filepaths_to_infos.keys())
for path, parquet_metadata in zip(filepaths_to_infos.keys(), parquet_metadatas):
filepaths_to_infos[path]["rows"] = parquet_metadata.num_rows
parquet_statistics = Table.read_parquet_statistics(
list(filepaths_to_infos.keys()),
io_config=io_config,
).to_pydict()
for path, num_rows in zip(parquet_statistics["uris"], parquet_statistics["row_count"]):
filepaths_to_infos[path]["rows"] = num_rows

file_paths = []
file_sizes = []
num_rows = []
for path, infos in filepaths_to_infos.items():
file_paths.append(_ensure_path_protocol(protocol, path))
file_paths.append(path)
file_sizes.append(infos.get("size"))
num_rows.append(infos.get("rows"))

Expand Down
9 changes: 2 additions & 7 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from typing import Dict, List, Optional, Union

import fsspec

from daft.api_annotations import PublicAPI
from daft.daft import (
CsvSourceConfig,
Expand All @@ -22,12 +20,11 @@
def read_csv(
path: Union[str, List[str]],
schema_hints: Optional[Dict[str, DataType]] = None,
fs: Optional[fsspec.AbstractFileSystem] = None,
has_headers: bool = True,
column_names: Optional[List[str]] = None,
delimiter: str = ",",
io_config: Optional["IOConfig"] = None,
use_native_downloader: bool = False,
use_native_downloader: bool = True,
_buffer_size: Optional[int] = None,
_chunk_size: Optional[int] = None,
) -> DataFrame:
Expand All @@ -43,8 +40,6 @@ def read_csv(
path (str): Path to CSV (allows for wildcards)
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will
disable all schema inference on data being read, and throw an error if data being read is incompatible.
fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for reading data.
By default, Daft will automatically construct a FileSystem instance internally.
has_headers (bool): Whether the CSV has a header or not, defaults to True
delimiter (Str): Delimiter used in the CSV, defaults to ","
io_config (IOConfig): Config to be used with the native downloader
Expand Down Expand Up @@ -74,6 +69,6 @@ def read_csv(
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(fs))
storage_config = StorageConfig.python(PythonStorageConfig(None))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
7 changes: 1 addition & 6 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from typing import Dict, List, Optional, Union

import fsspec

from daft.api_annotations import PublicAPI
from daft.daft import (
FileFormatConfig,
Expand All @@ -20,7 +18,6 @@
def read_json(
path: Union[str, List[str]],
schema_hints: Optional[Dict[str, DataType]] = None,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> DataFrame:
"""Creates a DataFrame from line-delimited JSON file(s)
Expand All @@ -34,8 +31,6 @@ def read_json(
path (str): Path to JSON files (allows for wildcards)
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will
disable all schema inference on data being read, and throw an error if data being read is incompatible.
fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for reading data.
By default, Daft will automatically construct a FileSystem instance internally.
returns:
DataFrame: parsed DataFrame
Expand All @@ -45,6 +40,6 @@ def read_json(

json_config = JsonSourceConfig()
file_format_config = FileFormatConfig.from_json_config(json_config)
storage_config = StorageConfig.python(PythonStorageConfig(fs))
storage_config = StorageConfig.python(PythonStorageConfig(None))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
11 changes: 3 additions & 8 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from typing import Dict, List, Optional, Union

import fsspec

from daft import context
from daft.api_annotations import PublicAPI
from daft.daft import (
Expand All @@ -23,9 +21,8 @@
def read_parquet(
path: Union[str, List[str]],
schema_hints: Optional[Dict[str, DataType]] = None,
fs: Optional[fsspec.AbstractFileSystem] = None,
io_config: Optional["IOConfig"] = None,
use_native_downloader: bool = False,
use_native_downloader: bool = True,
_multithreaded_io: Optional[bool] = None,
) -> DataFrame:
"""Creates a DataFrame from Parquet file(s)
Expand All @@ -40,8 +37,6 @@ def read_parquet(
path (str): Path to Parquet file (allows for wildcards)
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will
disable all schema inference on data being read, and throw an error if data being read is incompatible.
fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for reading data.
By default, Daft will automatically construct a FileSystem instance internally.
io_config (IOConfig): Config to be used with the native downloader
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand All @@ -68,7 +63,7 @@ def read_parquet(
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(fs))
storage_config = StorageConfig.python(PythonStorageConfig(None))

builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config, fs=fs)
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
12 changes: 8 additions & 4 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
from typing import TYPE_CHECKING

from daft.context import get_context
from daft.daft import FileFormatConfig, StorageConfig
from daft.daft import FileFormatConfig, NativeStorageConfig, StorageConfig
from daft.datatype import DataType
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.schema import Schema

if TYPE_CHECKING:
import fsspec
pass


def _get_schema_from_hints(hints: dict[str, DataType]) -> Schema:
Expand All @@ -24,14 +24,18 @@ def _get_tabular_files_scan(
schema_hints: dict[str, DataType] | None,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
fs: fsspec.AbstractFileSystem | None = None,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
paths = path if isinstance(path, list) else [str(path)]
schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None

# Glob the path using the Runner
# NOTE: Globbing will
io_config = None
if isinstance(storage_config.config, NativeStorageConfig):
io_config = storage_config.config.io_config
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config, fs=fs, storage_config=storage_config)
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config)

# Infer schema if no hints provided
inferred_or_provided_schema = (
Expand Down
12 changes: 5 additions & 7 deletions daft/io/file_path.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
# isort: dont-add-import: from __future__ import annotations

from typing import Optional

import fsspec
from typing import Optional

from daft.api_annotations import PublicAPI
from daft.context import get_context
from daft.daft import PartitionScheme, PartitionSpec
from daft.daft import IOConfig, PartitionScheme, PartitionSpec
from daft.dataframe import DataFrame
from daft.runners.pyrunner import LocalPartitionSet
from daft.table import Table


@PublicAPI
def from_glob_path(path: str, fs: Optional[fsspec.AbstractFileSystem] = None) -> DataFrame:
def from_glob_path(path: str, io_config: Optional[IOConfig] = None) -> DataFrame:
"""Creates a DataFrame of file paths and other metadata from a glob path.
This method supports wildcards:
Expand All @@ -36,16 +35,15 @@ def from_glob_path(path: str, fs: Optional[fsspec.AbstractFileSystem] = None) ->
Args:
path (str): Path to files on disk (allows wildcards).
fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for globbing and fetching metadata.
By default, Daft will automatically construct a FileSystem instance internally.
io_config (IOConfig): Configuration to use when running IO with remote services
Returns:
DataFrame: DataFrame containing the path to each file as a row, along with other metadata
parsed from the provided filesystem.
"""
context = get_context()
runner_io = context.runner().runner_io()
file_infos = runner_io.glob_paths_details([path], fs=fs)
file_infos = runner_io.glob_paths_details([path], io_config=io_config)
file_infos_table = Table._from_pytable(file_infos.to_table())
partition = LocalPartitionSet({0: file_infos_table})
cache_entry = context.runner().put_partition_set_into_cache(partition)
Expand Down
18 changes: 5 additions & 13 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
from daft.daft import (
FileFormatConfig,
FileInfos,
PythonStorageConfig,
IOConfig,
ResourceRequest,
StorageConfig,
)
from daft.execution import physical_plan
from daft.execution.execution_step import Instruction, MaterializedResult, PartitionTask
from daft.filesystem import get_filesystem_from_path, glob_path_with_stats
from daft.filesystem import glob_path_with_stats
from daft.internal.gpu import cuda_device_count
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.schema import Schema
Expand All @@ -33,7 +33,7 @@
from daft.table import Table

if TYPE_CHECKING:
import fsspec
pass


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -83,20 +83,12 @@ def glob_paths_details(
self,
source_paths: list[str],
file_format_config: FileFormatConfig | None = None,
fs: fsspec.AbstractFileSystem | None = None,
storage_config: StorageConfig | None = None,
io_config: IOConfig | None = None,
) -> FileInfos:
if fs is None and storage_config is not None:
config = storage_config.config
if isinstance(config, PythonStorageConfig):
fs = config.fs
file_infos = FileInfos()
file_format = file_format_config.file_format() if file_format_config is not None else None
for source_path in source_paths:
if fs is None:
fs = get_filesystem_from_path(source_path)

path_file_infos = glob_path_with_stats(source_path, file_format, fs, storage_config)
path_file_infos = glob_path_with_stats(source_path, file_format, io_config)

if len(path_file_infos) == 0:
raise FileNotFoundError(f"No files found at {source_path}")
Expand Down
Loading

0 comments on commit fbd94a1

Please sign in to comment.