From 402b09dd567fc1c95cd9a1d0fbfdcdcf6fb83f39 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Wed, 25 Oct 2023 20:47:53 -0700 Subject: [PATCH] [CHORE] Soft deprecation of fsspec from user-facing APIs (#1467) 1. Removes the `fs: fsspec.AbstractFileSystem | None = None` argument from user-facing APIs 2. Defaults `use_native_downloader=True` to use native I/O instead of fsspec/pyarrow where possible NOTE: This is a breaking change!! After this PR, we use PyArrow when reading CSV and JSON. And we use FSSpec only when reading CSV and JSON on the HTTP and Azure filesystems (FYI @clarkzinzow). This means that once we move to native reads of CSV and JSON, we can fully deprecate fsspec. --------- Co-authored-by: Jay Chia --- daft/daft.pyi | 7 + daft/expressions/expressions.py | 7 +- daft/filesystem.py | 57 ++------ daft/io/_csv.py | 9 +- daft/io/_json.py | 7 +- daft/io/_parquet.py | 11 +- daft/io/common.py | 12 +- daft/io/file_path.py | 12 +- daft/runners/pyrunner.py | 18 +-- daft/runners/ray_runner.py | 26 +--- daft/runners/runner_io.py | 6 +- daft/udf_library/url_udfs.py | 19 +-- src/daft-core/src/array/ops/groups.rs | 2 +- src/daft-core/src/datatypes/dtype.rs | 5 +- src/daft-core/src/datatypes/matching.rs | 2 + tests/cookbook/test_dataloading.py | 55 +------- tests/dataframe/test_creation.py | 131 ++---------------- .../io/parquet/test_reads_public_data.py | 80 +---------- .../io/test_url_download_public_aws_s3.py | 11 +- .../io/test_url_download_s3_minio.py | 18 +-- tests/udf_library/test_url_udfs.py | 22 --- 21 files changed, 93 insertions(+), 424 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index 4d3a328b4b..6a535b19ad 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -839,3 +839,10 @@ class LogicalPlanBuilder: def build_type() -> str: ... def version() -> str: ... def __getattr__(name) -> Any: ... +def io_glob( + path: str, + multithreaded_io: bool | None = None, + io_config: IOConfig | None = None, + fanout_limit: int | None = None, + page_size: int | None = None, +) -> list[dict]: ... diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index d73f8b4709..3769de2894 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -5,7 +5,6 @@ from datetime import date, datetime from typing import TYPE_CHECKING, Callable, Iterable, Iterator, TypeVar, overload -import fsspec import pyarrow as pa from daft import context @@ -424,9 +423,8 @@ def download( self, max_connections: int = 32, on_error: Literal["raise"] | Literal["null"] = "raise", - fs: fsspec.AbstractFileSystem | None = None, io_config: IOConfig | None = None, - use_native_downloader: bool = False, + use_native_downloader: bool = True, ) -> Expression: """Treats each string as a URL, and downloads the bytes contents as a bytes column @@ -442,7 +440,7 @@ def download( Returns: Expression: a Binary expression which is the bytes contents of the URL, or None if an error occured during download """ - if fs is None and use_native_downloader: + if use_native_downloader: raise_on_error = False if on_error == "raise": raise_on_error = True @@ -463,7 +461,6 @@ def download( Expression._from_pyexpr(self._expr), max_worker_threads=max_connections, on_error=on_error, - fs=fs, ) diff --git a/daft/filesystem.py b/daft/filesystem.py index d23747fb88..a739b3afc0 100644 --- a/daft/filesystem.py +++ b/daft/filesystem.py @@ -4,8 +4,6 @@ import pathlib import sys import urllib.parse -from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor if sys.version_info < (3, 8): from typing_extensions import Literal @@ -25,7 +23,7 @@ _resolve_filesystem_and_path, ) -from daft.daft import FileFormat, FileInfos, NativeStorageConfig, StorageConfig +from daft.daft import FileFormat, FileInfos, IOConfig, io_glob from daft.table import Table logger = logging.getLogger(__name__) @@ -221,6 +219,9 @@ def _resolve_path_and_filesystem( None, the provided filesystem will still be validated against the filesystem inferred from the provided path to ensure compatibility. """ + # NOTE: PyArrow really dislikes the "file://" prefix, so we trim it out if present + path = path[7:] if path.startswith("file://") else path + # Use pyarrow utility to resolve filesystem and this particular path. # If a non-None filesystem is provided to this utility, it will ensure that # it is compatible with the provided path. @@ -300,56 +301,26 @@ def _path_is_glob(path: str) -> bool: def glob_path_with_stats( path: str, file_format: FileFormat | None, - fs: fsspec.AbstractFileSystem, - storage_config: StorageConfig | None, + io_config: IOConfig | None, ) -> FileInfos: """Glob a path, returning a list ListingInfo.""" - protocol = get_protocol_from_path(path) - - filepaths_to_infos: dict[str, dict[str, Any]] = defaultdict(dict) - - if _path_is_glob(path): - globbed_data = fs.glob(path, detail=True) - - for path, details in globbed_data.items(): - path = _ensure_path_protocol(protocol, path) - filepaths_to_infos[path]["size"] = details["size"] - - elif fs.isfile(path): - file_info = fs.info(path) - - filepaths_to_infos[path]["size"] = file_info["size"] - - elif fs.isdir(path): - files_info = fs.ls(path, detail=True) - - for file_info in files_info: - path = file_info["name"] - path = _ensure_path_protocol(protocol, path) - filepaths_to_infos[path]["size"] = file_info["size"] - - else: - raise FileNotFoundError(f"File or directory not found: {path}") + files = io_glob(path, io_config=io_config) + filepaths_to_infos = {f["path"]: {"size": f["size"], "type": f["type"]} for f in files} # Set number of rows if available. if file_format is not None and file_format == FileFormat.Parquet: - config = storage_config.config if storage_config is not None else None - if config is not None and isinstance(config, NativeStorageConfig): - parquet_statistics = Table.read_parquet_statistics( - list(filepaths_to_infos.keys()), config.io_config - ).to_pydict() - for path, num_rows in zip(parquet_statistics["uris"], parquet_statistics["row_count"]): - filepaths_to_infos[path]["rows"] = num_rows - else: - parquet_metadatas = ThreadPoolExecutor().map(_get_parquet_metadata_single, filepaths_to_infos.keys()) - for path, parquet_metadata in zip(filepaths_to_infos.keys(), parquet_metadatas): - filepaths_to_infos[path]["rows"] = parquet_metadata.num_rows + parquet_statistics = Table.read_parquet_statistics( + list(filepaths_to_infos.keys()), + io_config=io_config, + ).to_pydict() + for path, num_rows in zip(parquet_statistics["uris"], parquet_statistics["row_count"]): + filepaths_to_infos[path]["rows"] = num_rows file_paths = [] file_sizes = [] num_rows = [] for path, infos in filepaths_to_infos.items(): - file_paths.append(_ensure_path_protocol(protocol, path)) + file_paths.append(path) file_sizes.append(infos.get("size")) num_rows.append(infos.get("rows")) diff --git a/daft/io/_csv.py b/daft/io/_csv.py index fac80c76f0..7019ceff54 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -2,8 +2,6 @@ from typing import Dict, List, Optional, Union -import fsspec - from daft.api_annotations import PublicAPI from daft.daft import ( CsvSourceConfig, @@ -22,12 +20,11 @@ def read_csv( path: Union[str, List[str]], schema_hints: Optional[Dict[str, DataType]] = None, - fs: Optional[fsspec.AbstractFileSystem] = None, has_headers: bool = True, column_names: Optional[List[str]] = None, delimiter: str = ",", io_config: Optional["IOConfig"] = None, - use_native_downloader: bool = False, + use_native_downloader: bool = True, _buffer_size: Optional[int] = None, _chunk_size: Optional[int] = None, ) -> DataFrame: @@ -43,8 +40,6 @@ def read_csv( path (str): Path to CSV (allows for wildcards) schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will disable all schema inference on data being read, and throw an error if data being read is incompatible. - fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for reading data. - By default, Daft will automatically construct a FileSystem instance internally. has_headers (bool): Whether the CSV has a header or not, defaults to True delimiter (Str): Delimiter used in the CSV, defaults to "," io_config (IOConfig): Config to be used with the native downloader @@ -74,6 +69,6 @@ def read_csv( if use_native_downloader: storage_config = StorageConfig.native(NativeStorageConfig(io_config)) else: - storage_config = StorageConfig.python(PythonStorageConfig(fs)) + storage_config = StorageConfig.python(PythonStorageConfig(None)) builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config) return DataFrame(builder) diff --git a/daft/io/_json.py b/daft/io/_json.py index fc37ec4b82..95e2556524 100644 --- a/daft/io/_json.py +++ b/daft/io/_json.py @@ -2,8 +2,6 @@ from typing import Dict, List, Optional, Union -import fsspec - from daft.api_annotations import PublicAPI from daft.daft import ( FileFormatConfig, @@ -20,7 +18,6 @@ def read_json( path: Union[str, List[str]], schema_hints: Optional[Dict[str, DataType]] = None, - fs: Optional[fsspec.AbstractFileSystem] = None, ) -> DataFrame: """Creates a DataFrame from line-delimited JSON file(s) @@ -34,8 +31,6 @@ def read_json( path (str): Path to JSON files (allows for wildcards) schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will disable all schema inference on data being read, and throw an error if data being read is incompatible. - fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for reading data. - By default, Daft will automatically construct a FileSystem instance internally. returns: DataFrame: parsed DataFrame @@ -45,6 +40,6 @@ def read_json( json_config = JsonSourceConfig() file_format_config = FileFormatConfig.from_json_config(json_config) - storage_config = StorageConfig.python(PythonStorageConfig(fs)) + storage_config = StorageConfig.python(PythonStorageConfig(None)) builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config) return DataFrame(builder) diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index 8a824867d0..94f597fe05 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -2,8 +2,6 @@ from typing import Dict, List, Optional, Union -import fsspec - from daft import context from daft.api_annotations import PublicAPI from daft.daft import ( @@ -23,9 +21,8 @@ def read_parquet( path: Union[str, List[str]], schema_hints: Optional[Dict[str, DataType]] = None, - fs: Optional[fsspec.AbstractFileSystem] = None, io_config: Optional["IOConfig"] = None, - use_native_downloader: bool = False, + use_native_downloader: bool = True, _multithreaded_io: Optional[bool] = None, ) -> DataFrame: """Creates a DataFrame from Parquet file(s) @@ -40,8 +37,6 @@ def read_parquet( path (str): Path to Parquet file (allows for wildcards) schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will disable all schema inference on data being read, and throw an error if data being read is incompatible. - fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for reading data. - By default, Daft will automatically construct a FileSystem instance internally. io_config (IOConfig): Config to be used with the native downloader use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This is currently experimental. @@ -68,7 +63,7 @@ def read_parquet( if use_native_downloader: storage_config = StorageConfig.native(NativeStorageConfig(io_config)) else: - storage_config = StorageConfig.python(PythonStorageConfig(fs)) + storage_config = StorageConfig.python(PythonStorageConfig(None)) - builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config, fs=fs) + builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config) return DataFrame(builder) diff --git a/daft/io/common.py b/daft/io/common.py index ce316a201f..c3fd2f4e67 100644 --- a/daft/io/common.py +++ b/daft/io/common.py @@ -3,13 +3,13 @@ from typing import TYPE_CHECKING from daft.context import get_context -from daft.daft import FileFormatConfig, StorageConfig +from daft.daft import FileFormatConfig, NativeStorageConfig, StorageConfig from daft.datatype import DataType from daft.logical.builder import LogicalPlanBuilder from daft.logical.schema import Schema if TYPE_CHECKING: - import fsspec + pass def _get_schema_from_hints(hints: dict[str, DataType]) -> Schema: @@ -24,14 +24,18 @@ def _get_tabular_files_scan( schema_hints: dict[str, DataType] | None, file_format_config: FileFormatConfig, storage_config: StorageConfig, - fs: fsspec.AbstractFileSystem | None = None, ) -> LogicalPlanBuilder: """Returns a TabularFilesScan LogicalPlan for a given glob filepath.""" paths = path if isinstance(path, list) else [str(path)] schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None + # Glob the path using the Runner + # NOTE: Globbing will + io_config = None + if isinstance(storage_config.config, NativeStorageConfig): + io_config = storage_config.config.io_config runner_io = get_context().runner().runner_io() - file_infos = runner_io.glob_paths_details(paths, file_format_config, fs=fs, storage_config=storage_config) + file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config) # Infer schema if no hints provided inferred_or_provided_schema = ( diff --git a/daft/io/file_path.py b/daft/io/file_path.py index f94a7e948b..205a5b63c5 100644 --- a/daft/io/file_path.py +++ b/daft/io/file_path.py @@ -1,19 +1,18 @@ # isort: dont-add-import: from __future__ import annotations -from typing import Optional -import fsspec +from typing import Optional from daft.api_annotations import PublicAPI from daft.context import get_context -from daft.daft import PartitionScheme, PartitionSpec +from daft.daft import IOConfig, PartitionScheme, PartitionSpec from daft.dataframe import DataFrame from daft.runners.pyrunner import LocalPartitionSet from daft.table import Table @PublicAPI -def from_glob_path(path: str, fs: Optional[fsspec.AbstractFileSystem] = None) -> DataFrame: +def from_glob_path(path: str, io_config: Optional[IOConfig] = None) -> DataFrame: """Creates a DataFrame of file paths and other metadata from a glob path. This method supports wildcards: @@ -36,8 +35,7 @@ def from_glob_path(path: str, fs: Optional[fsspec.AbstractFileSystem] = None) -> Args: path (str): Path to files on disk (allows wildcards). - fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for globbing and fetching metadata. - By default, Daft will automatically construct a FileSystem instance internally. + io_config (IOConfig): Configuration to use when running IO with remote services Returns: DataFrame: DataFrame containing the path to each file as a row, along with other metadata @@ -45,7 +43,7 @@ def from_glob_path(path: str, fs: Optional[fsspec.AbstractFileSystem] = None) -> """ context = get_context() runner_io = context.runner().runner_io() - file_infos = runner_io.glob_paths_details([path], fs=fs) + file_infos = runner_io.glob_paths_details([path], io_config=io_config) file_infos_table = Table._from_pytable(file_infos.to_table()) partition = LocalPartitionSet({0: file_infos_table}) cache_entry = context.runner().put_partition_set_into_cache(partition) diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index df0321f60a..dd02572e11 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -11,13 +11,13 @@ from daft.daft import ( FileFormatConfig, FileInfos, - PythonStorageConfig, + IOConfig, ResourceRequest, StorageConfig, ) from daft.execution import physical_plan from daft.execution.execution_step import Instruction, MaterializedResult, PartitionTask -from daft.filesystem import get_filesystem_from_path, glob_path_with_stats +from daft.filesystem import glob_path_with_stats from daft.internal.gpu import cuda_device_count from daft.logical.builder import LogicalPlanBuilder from daft.logical.schema import Schema @@ -33,7 +33,7 @@ from daft.table import Table if TYPE_CHECKING: - import fsspec + pass logger = logging.getLogger(__name__) @@ -83,20 +83,12 @@ def glob_paths_details( self, source_paths: list[str], file_format_config: FileFormatConfig | None = None, - fs: fsspec.AbstractFileSystem | None = None, - storage_config: StorageConfig | None = None, + io_config: IOConfig | None = None, ) -> FileInfos: - if fs is None and storage_config is not None: - config = storage_config.config - if isinstance(config, PythonStorageConfig): - fs = config.fs file_infos = FileInfos() file_format = file_format_config.file_format() if file_format_config is not None else None for source_path in source_paths: - if fs is None: - fs = get_filesystem_from_path(source_path) - - path_file_infos = glob_path_with_stats(source_path, file_format, fs, storage_config) + path_file_infos = glob_path_with_stats(source_path, file_format, io_config) if len(path_file_infos) == 0: raise FileNotFoundError(f"No files found at {source_path}") diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index d2e45a0698..6b80981da3 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -27,7 +27,7 @@ from daft.daft import ( FileFormatConfig, FileInfos, - PythonStorageConfig, + IOConfig, ResourceRequest, StorageConfig, ) @@ -41,7 +41,7 @@ ReduceInstruction, SingleOutputPartitionTask, ) -from daft.filesystem import get_filesystem_from_path, glob_path_with_stats +from daft.filesystem import glob_path_with_stats from daft.runners import runner_io from daft.runners.partitioning import ( PartID, @@ -56,7 +56,6 @@ if TYPE_CHECKING: import dask - import fsspec import pandas as pd from ray.data.block import Block as RayDatasetBlock from ray.data.dataset import Dataset as RayDataset @@ -76,20 +75,12 @@ def _glob_path_into_file_infos( paths: list[str], file_format_config: FileFormatConfig | None, - fs: fsspec.AbstractFileSystem | None, - storage_config: StorageConfig | None, + io_config: IOConfig | None, ) -> Table: - if fs is None and storage_config is not None: - config = storage_config.config - if isinstance(config, PythonStorageConfig): - fs = config.fs file_infos = FileInfos() file_format = file_format_config.file_format() if file_format_config is not None else None for path in paths: - if fs is None: - fs = get_filesystem_from_path(path) - - path_file_infos = glob_path_with_stats(path, file_format, fs, storage_config) + path_file_infos = glob_path_with_stats(path, file_format=file_format, io_config=io_config) if len(path_file_infos) == 0: raise FileNotFoundError(f"No files found at {path}") file_infos.extend(path_file_infos) @@ -221,16 +212,11 @@ def glob_paths_details( self, source_paths: list[str], file_format_config: FileFormatConfig | None = None, - fs: fsspec.AbstractFileSystem | None = None, - storage_config: StorageConfig | None = None, + io_config: IOConfig | None = None, ) -> FileInfos: # Synchronously fetch the file infos, for now. return FileInfos.from_table( - ray.get( - _glob_path_into_file_infos.remote( - source_paths, file_format_config, fs=fs, storage_config=storage_config - ) - ) + ray.get(_glob_path_into_file_infos.remote(source_paths, file_format_config, io_config=io_config)) .to_table() ._table ) diff --git a/daft/runners/runner_io.py b/daft/runners/runner_io.py index 58e7147f8c..59f0832889 100644 --- a/daft/runners/runner_io.py +++ b/daft/runners/runner_io.py @@ -8,6 +8,7 @@ FileFormat, FileFormatConfig, FileInfos, + IOConfig, JsonSourceConfig, ParquetSourceConfig, StorageConfig, @@ -17,7 +18,7 @@ from daft.table import schema_inference if TYPE_CHECKING: - import fsspec + pass PartitionT = TypeVar("PartitionT") @@ -33,8 +34,7 @@ def glob_paths_details( self, source_path: list[str], file_format_config: FileFormatConfig | None = None, - fs: fsspec.AbstractFileSystem | None = None, - storage_config: StorageConfig | None = None, + io_config: IOConfig | None = None, ) -> FileInfos: """Globs the specified filepath to construct a FileInfos object containing file and dir metadata. diff --git a/daft/udf_library/url_udfs.py b/daft/udf_library/url_udfs.py index b5a9c682d7..28f66e19ca 100644 --- a/daft/udf_library/url_udfs.py +++ b/daft/udf_library/url_udfs.py @@ -5,8 +5,6 @@ import threading from concurrent.futures import ThreadPoolExecutor, as_completed -import fsspec - from daft import filesystem from daft.datatype import DataType from daft.series import Series @@ -28,21 +26,17 @@ def _worker_thread_initializer() -> None: thread_local.filesystems_cache = {} -def _download( - path: str | None, on_error: Literal["raise"] | Literal["null"], fs: fsspec.AbstractFileSystem | None -) -> bytes | None: +def _download(path: str | None, on_error: Literal["raise"] | Literal["null"]) -> bytes | None: if path is None: return None protocol = filesystem.get_protocol_from_path(path) # If no fsspec filesystem provided, first check the cache. - if fs is None: - fs = thread_local.filesystems_cache.get(protocol, None) + # If none in the cache, create one based on the path protocol. + fs = thread_local.filesystems_cache.get(protocol, None) + fs = filesystem.get_filesystem(protocol) + thread_local.filesystems_cache[protocol] = fs - # If no fsspec filesystem provided and none in the cache, create one based on the path protocol. - if fs is None: - fs = filesystem.get_filesystem(protocol) - thread_local.filesystems_cache[protocol] = fs try: return fs.cat_file(path) except Exception as e: @@ -74,7 +68,6 @@ def download_udf( urls, max_worker_threads: int = 8, on_error: Literal["raise"] | Literal["null"] = "raise", - fs: fsspec.AbstractFileSystem | None = None, ): """Downloads the contents of the supplied URLs. @@ -93,7 +86,7 @@ def download_udf( executor = ThreadPoolExecutor(max_workers=max_worker_threads, initializer=_worker_thread_initializer) results: list[bytes | None] = [None for _ in range(len(urls))] - future_to_idx = {executor.submit(_download, urls_pylist[i], on_error, fs): i for i in range(len(urls))} + future_to_idx = {executor.submit(_download, urls_pylist[i], on_error): i for i in range(len(urls))} for future in as_completed(future_to_idx): results[future_to_idx[future]] = future.result() diff --git a/src/daft-core/src/array/ops/groups.rs b/src/daft-core/src/array/ops/groups.rs index d9b75d4c4e..940f847d43 100644 --- a/src/daft-core/src/array/ops/groups.rs +++ b/src/daft-core/src/array/ops/groups.rs @@ -52,7 +52,7 @@ where impl IntoGroups for DataArray where T: DaftIntegerType, - ::Native: arrow2::types::Index, + ::Native: Ord, ::Native: Hash, ::Native: std::cmp::Eq, { diff --git a/src/daft-core/src/datatypes/dtype.rs b/src/daft-core/src/datatypes/dtype.rs index 6e1a109874..1ef4e2ec2e 100644 --- a/src/daft-core/src/datatypes/dtype.rs +++ b/src/daft-core/src/datatypes/dtype.rs @@ -125,7 +125,10 @@ impl DataType { DataType::Int16 => Ok(ArrowType::Int16), DataType::Int32 => Ok(ArrowType::Int32), DataType::Int64 => Ok(ArrowType::Int64), - DataType::Int128 => Ok(ArrowType::Decimal(38, 0)), + // Must maintain same default mapping as Arrow2, otherwise this will throw errors in + // DataArray::new() which makes strong assumptions about the arrow/Daft types + // https://github.com/jorgecarleitao/arrow2/blob/b0734542c2fef5d2d0c7b6ffce5d094de371168a/src/datatypes/mod.rs#L493 + DataType::Int128 => Ok(ArrowType::Decimal(32, 32)), DataType::UInt8 => Ok(ArrowType::UInt8), DataType::UInt16 => Ok(ArrowType::UInt16), DataType::UInt32 => Ok(ArrowType::UInt32), diff --git a/src/daft-core/src/datatypes/matching.rs b/src/daft-core/src/datatypes/matching.rs index 4e490b3e98..68a7ba0040 100644 --- a/src/daft-core/src/datatypes/matching.rs +++ b/src/daft-core/src/datatypes/matching.rs @@ -64,6 +64,7 @@ macro_rules! with_match_physical_daft_types {( Int16 => __with_ty__! { Int16Type }, Int32 => __with_ty__! { Int32Type }, Int64 => __with_ty__! { Int64Type }, + Int128 => __with_ty__! { Int128Type }, UInt8 => __with_ty__! { UInt8Type }, UInt16 => __with_ty__! { UInt16Type }, UInt32 => __with_ty__! { UInt32Type }, @@ -131,6 +132,7 @@ macro_rules! with_match_comparable_daft_types {( Int16 => __with_ty__! { Int16Type }, Int32 => __with_ty__! { Int32Type }, Int64 => __with_ty__! { Int64Type }, + Int128 => __with_ty__! { Int128Type }, UInt8 => __with_ty__! { UInt8Type }, UInt16 => __with_ty__! { UInt16Type }, UInt32 => __with_ty__! { UInt32Type }, diff --git a/tests/cookbook/test_dataloading.py b/tests/cookbook/test_dataloading.py index 655c3acbd6..8d83a90995 100644 --- a/tests/cookbook/test_dataloading.py +++ b/tests/cookbook/test_dataloading.py @@ -2,15 +2,10 @@ import os import pathlib -import sys -from unittest.mock import patch import pandas as pd -import pytest -from fsspec.implementations.local import LocalFileSystem import daft -from daft.context import get_context from tests.conftest import assert_df_equals from tests.cookbook.assets import COOKBOOK_DATA_CSV @@ -29,7 +24,7 @@ def test_load_csv_no_headers(tmp_path: pathlib.Path): csv.write_text("\n".join(pathlib.Path(COOKBOOK_DATA_CSV).read_text().split("\n")[1:])) daft_df = daft.read_csv(str(csv), has_headers=False) pd_df = pd.read_csv(csv, header=None, keep_default_na=False) - pd_df.columns = [f"f{i}" for i in range(52)] + pd_df.columns = [f"column_{i}" for i in range(1, 53)] daft_pd_df = daft_df.to_pandas() assert list(daft_pd_df.columns) == list(pd_df.columns) @@ -87,7 +82,8 @@ def test_glob_files(tmpdir): daft_pd_df = daft_df.to_pandas() pd_df = pd.DataFrame.from_records( - {"path": str(path.as_posix()), "size": size, "num_rows": None} for path, size in zip(filepaths, list(range(10))) + {"path": "file://" + str(path.as_posix()), "size": size, "num_rows": None} + for path, size in zip(filepaths, list(range(10))) ) pd_df = pd_df[~pd_df["path"].str.endswith(".bar")] pd_df = pd_df.astype({"num_rows": float}) @@ -99,7 +95,7 @@ def test_glob_files_single_file(tmpdir): filepath.write_text("b" * 10) daft_df = daft.from_glob_path(os.path.join(tmpdir, "file.foo")) daft_pd_df = daft_df.to_pandas() - pd_df = pd.DataFrame.from_records([{"path": str(filepath), "size": 10, "num_rows": None}]) + pd_df = pd.DataFrame.from_records([{"path": "file://" + str(filepath), "size": 10, "num_rows": None}]) pd_df = pd_df.astype({"num_rows": float}) assert_df_equals(daft_pd_df, pd_df, sort_key="path") @@ -118,15 +114,11 @@ def test_glob_files_directory(tmpdir): daft_pd_df = daft_df.to_pandas() listing_records = [ - {"path": str(path.as_posix()), "size": size, "num_rows": None} + {"path": "file://" + str(path.as_posix()), "size": size, "num_rows": None} for path, size in zip(filepaths, [i for i in range(10) for _ in range(2)]) ] - dir_size = extra_empty_dir.stat().st_size - if sys.platform == "win32": - dir_size = 0 - - listing_records = listing_records + [{"path": str(extra_empty_dir.as_posix()), "size": dir_size, "num_rows": None}] + listing_records = listing_records pd_df = pd.DataFrame.from_records(listing_records) pd_df = pd_df.astype({"num_rows": float}) assert_df_equals(daft_pd_df, pd_df, sort_key="path") @@ -145,44 +137,11 @@ def test_glob_files_recursive(tmpdir): daft_df = daft.from_glob_path(os.path.join(tmpdir, "**")) daft_pd_df = daft_df.to_pandas() listing_records = [ - {"path": str(path.as_posix()), "size": size, "num_rows": None} + {"path": "file://" + str(path.as_posix()), "size": size, "num_rows": None} for path, size in zip(paths, [i for i in range(10) for _ in range(2)]) ] - dir_size = nested_dir_path.stat().st_size - if sys.platform == "win32": - dir_size = 0 - listing_records = listing_records + [{"path": str(nested_dir_path.as_posix()), "size": dir_size, "num_rows": None}] pd_df = pd.DataFrame.from_records(listing_records) pd_df = pd_df.astype({"num_rows": float}) assert_df_equals(daft_pd_df, pd_df, sort_key="path") - - -@pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") -def test_glob_files_custom_fs(tmpdir): - filepaths = [] - for i in range(10): - filepath = pathlib.Path(tmpdir) / f"file_{i}.foo" - filepath.write_text("a" * i) - filepaths.append(filepath) - bar_filepath = pathlib.Path(tmpdir) / f"file_{i}.bar" - bar_filepath.write_text("b" * i) - - # Mark that this filesystem instance shouldn't be automatically reused by fsspec; without this, - # fsspec would cache this instance and reuse it for Daft's default construction of filesystems, - # which would make this test pass without the passed filesystem being used. - fs = LocalFileSystem(skip_instance_cache=True) - with patch.object(fs, "glob", wraps=fs.glob) as mock_glob: - daft_df = daft.from_glob_path(f"{tmpdir}/*.foo", fs=fs) - - # Check that glob() is called on the passed filesystem. - mock_glob.assert_called() - - daft_pd_df = daft_df.to_pandas() - pd_df = pd.DataFrame.from_records( - {"path": str(path.as_posix()), "size": size, "num_rows": None} for path, size in zip(filepaths, list(range(10))) - ) - pd_df = pd_df[~pd_df["path"].str.endswith(".bar")] - pd_df = pd_df.astype({"num_rows": float}) - assert_df_equals(daft_pd_df, pd_df, sort_key="path") diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index fa4065adf6..587b80beb5 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -7,19 +7,16 @@ import os import tempfile import uuid -from unittest.mock import MagicMock, patch import numpy as np import pandas as pd import pyarrow as pa import pyarrow.parquet as papq import pytest -from fsspec.implementations.local import LocalFileSystem from ray.data.extensions import ArrowTensorArray, TensorArray import daft from daft.api_annotations import APITypeError -from daft.context import get_context from daft.dataframe import DataFrame from daft.datatype import DataType from daft.utils import pyarrow_supports_fixed_shape_tensor @@ -393,41 +390,6 @@ def test_create_dataframe_multiple_csvs(valid_data: list[dict[str, float]], use_ assert len(pd_df) == (len(valid_data) * 2) -@pytest.mark.skipif( - get_context().runner_config.name not in {"py"}, - reason="requires PyRunner to be in use", -) -def test_create_dataframe_csv_custom_fs(valid_data: list[dict[str, float]]) -> None: - with create_temp_filename() as fname: - with open(fname, "w") as f: - header = list(valid_data[0].keys()) - writer = csv.writer(f) - writer.writerow(header) - writer.writerows([[item[col] for col in header] for item in valid_data]) - f.flush() - - # Mark that this filesystem instance shouldn't be automatically reused by fsspec; without this, - # fsspec would cache this instance and reuse it for Daft's default construction of filesystems, - # which would make this test pass without the passed filesystem being used. - fs = LocalFileSystem(skip_instance_cache=True) - mock_cache = MagicMock(return_value=None) - with patch.object(fs, "info", wraps=fs.info) as mock_info, patch.object( - fs, "open", wraps=fs.open - ) as mock_open, patch("daft.filesystem._get_fs_from_cache", mock_cache): - df = daft.read_csv(fname, fs=fs) - # Check that info() is called on the passed filesystem. - mock_info.assert_called() - - assert df.column_names == COL_NAMES - pd_df = df.to_pandas() - - # Check that open() is called on the passed filesystem. - mock_open.assert_called() - - assert list(pd_df.columns) == COL_NAMES - assert len(pd_df) == len(valid_data) - - @pytest.mark.parametrize("use_native_downloader", [True, False]) def test_create_dataframe_csv_generate_headers(valid_data: list[dict[str, float]], use_native_downloader) -> None: with create_temp_filename() as fname: @@ -583,41 +545,6 @@ def test_create_dataframe_multiple_jsons(valid_data: list[dict[str, float]]) -> assert len(pd_df) == (len(valid_data) * 2) -@pytest.mark.skipif( - get_context().runner_config.name not in {"py"}, - reason="requires PyRunner to be in use", -) -def test_create_dataframe_json_custom_fs(valid_data: list[dict[str, float]]) -> None: - with create_temp_filename() as fname: - with open(fname, "w") as f: - for data in valid_data: - f.write(json.dumps(data)) - f.write("\n") - f.flush() - - # Mark that this filesystem instance shouldn't be automatically reused by fsspec; without this, - # fsspec would cache this instance and reuse it for Daft's default construction of filesystems, - # which would make this test pass without the passed filesystem being used. - fs = LocalFileSystem(skip_instance_cache=True) - mock_cache = MagicMock(return_value=None) - with patch.object(fs, "info", wraps=fs.info) as mock_info, patch.object( - fs, "open", wraps=fs.open - ) as mock_open, patch("daft.filesystem._get_fs_from_cache", mock_cache): - df = daft.read_json(fname, fs=fs) - - # Check that info() is called on the passed filesystem. - mock_info.assert_called() - - assert df.column_names == COL_NAMES - pd_df = df.to_pandas() - - # Check that open() is called on the passed filesystem. - mock_open.assert_called() - - assert list(pd_df.columns) == COL_NAMES - assert len(pd_df) == len(valid_data) - - def test_create_dataframe_json_column_projection(valid_data: list[dict[str, float]]) -> None: with create_temp_filename() as fname: with open(fname, "w") as f: @@ -674,15 +601,14 @@ def test_create_dataframe_json_specify_schema(valid_data: list[dict[str, float]] ### -@pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_parquet(valid_data: list[dict[str, float]], use_native_downloader) -> None: +def test_create_dataframe_parquet(valid_data: list[dict[str, float]]) -> None: with create_temp_filename() as fname: with open(fname, "w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) papq.write_table(table, f.name) f.flush() - df = daft.read_parquet(fname, use_native_downloader=use_native_downloader) + df = daft.read_parquet(fname) assert df.column_names == COL_NAMES pd_df = df.to_pandas() @@ -690,15 +616,14 @@ def test_create_dataframe_parquet(valid_data: list[dict[str, float]], use_native assert len(pd_df) == len(valid_data) -@pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_parquet_with_filter(valid_data: list[dict[str, float]], use_native_downloader) -> None: +def test_create_dataframe_parquet_with_filter(valid_data: list[dict[str, float]]) -> None: with create_temp_filename() as fname: with open(fname, "w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) papq.write_table(table, f.name) f.flush() - df = daft.read_parquet(fname, use_native_downloader=use_native_downloader) + df = daft.read_parquet(fname) assert df.column_names == COL_NAMES df = df.where(daft.col("sepal_length") > 4.8) @@ -708,8 +633,7 @@ def test_create_dataframe_parquet_with_filter(valid_data: list[dict[str, float]] assert len(pd_df) == len(valid_data) - 1 -@pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_multiple_parquets(valid_data: list[dict[str, float]], use_native_downloader) -> None: +def test_create_dataframe_multiple_parquets(valid_data: list[dict[str, float]]) -> None: with create_temp_filename() as f1name, create_temp_filename() as f2name: with open(f1name, "w") as f1, open(f2name, "w") as f2: for f in (f1, f2): @@ -717,7 +641,7 @@ def test_create_dataframe_multiple_parquets(valid_data: list[dict[str, float]], papq.write_table(table, f.name) f.flush() - df = daft.read_parquet([f1name, f2name], use_native_downloader=use_native_downloader) + df = daft.read_parquet([f1name, f2name]) assert df.column_names == COL_NAMES pd_df = df.to_pandas() @@ -725,42 +649,7 @@ def test_create_dataframe_multiple_parquets(valid_data: list[dict[str, float]], assert len(pd_df) == (len(valid_data) * 2) -@pytest.mark.skipif( - get_context().runner_config.name not in {"py"}, - reason="requires PyRunner to be in use", -) -def test_create_dataframe_parquet_custom_fs(valid_data: list[dict[str, float]]) -> None: - with create_temp_filename() as fname: - with open(fname, "w") as f: - table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) - papq.write_table(table, f.name) - f.flush() - - # Mark that this filesystem instance shouldn't be automatically reused by fsspec; without this, - # fsspec would cache this instance and reuse it for Daft's default construction of filesystems, - # which would make this test pass without the passed filesystem being used. - fs = LocalFileSystem(skip_instance_cache=True) - mock_cache = MagicMock(return_value=None) - with patch.object(fs, "info", wraps=fs.info) as mock_info, patch.object( - fs, "open", wraps=fs.open - ) as mock_open, patch("daft.filesystem._get_fs_from_cache", mock_cache): - df = daft.read_parquet(fname, fs=fs) - - # Check that info() is called on the passed filesystem. - mock_info.assert_called() - - assert df.column_names == COL_NAMES - pd_df = df.to_pandas() - - # Check that open() is called on the passed filesystem. - mock_open.assert_called() - - assert list(pd_df.columns) == COL_NAMES - assert len(pd_df) == len(valid_data) - - -@pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_parquet_column_projection(valid_data: list[dict[str, float]], use_native_downloader) -> None: +def test_create_dataframe_parquet_column_projection(valid_data: list[dict[str, float]]) -> None: with create_temp_filename() as fname: with open(fname, "w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) @@ -769,7 +658,7 @@ def test_create_dataframe_parquet_column_projection(valid_data: list[dict[str, f col_subset = COL_NAMES[:3] - df = daft.read_parquet(fname, use_native_downloader=use_native_downloader) + df = daft.read_parquet(fname) df = df.select(*col_subset) assert df.column_names == col_subset @@ -778,8 +667,7 @@ def test_create_dataframe_parquet_column_projection(valid_data: list[dict[str, f assert len(pd_df) == len(valid_data) -@pytest.mark.parametrize("use_native_downloader", [True, False]) -def test_create_dataframe_parquet_specify_schema(valid_data: list[dict[str, float]], use_native_downloader) -> None: +def test_create_dataframe_parquet_specify_schema(valid_data: list[dict[str, float]]) -> None: with create_temp_filename() as fname: with open(fname, "w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) @@ -795,7 +683,6 @@ def test_create_dataframe_parquet_specify_schema(valid_data: list[dict[str, floa "petal_width": DataType.float64(), "variety": DataType.string(), }, - use_native_downloader=use_native_downloader, ) assert df.column_names == COL_NAMES diff --git a/tests/integration/io/parquet/test_reads_public_data.py b/tests/integration/io/parquet/test_reads_public_data.py index 3f6da0ef98..dc6d48cfbc 100644 --- a/tests/integration/io/parquet/test_reads_public_data.py +++ b/tests/integration/io/parquet/test_reads_public_data.py @@ -1,12 +1,8 @@ from __future__ import annotations -import os -import tempfile - import pandas as pd import pyarrow as pa import pytest -import requests from pyarrow import parquet as pq import daft @@ -192,22 +188,6 @@ def parquet_file(request) -> tuple[str, str]: return request.param -HTTP_CAN_READ_FILES = [param for param in DAFT_CAN_READ_FILES if param[1].startswith("http")] - - -@pytest.fixture(scope="session", params=HTTP_CAN_READ_FILES, ids=[name for name, _ in HTTP_CAN_READ_FILES]) -def local_parquet_file(request) -> tuple[str, str]: - """Returns a tuple of (`name`, `url`) of files that Daft should be able to handle. URLs input are remote but will return local paths.""" - name, url = request.param - with tempfile.TemporaryDirectory() as dir: - file = os.path.join(dir, "tempfile") - response = requests.get(url) - with open(file, "wb") as f: - f.write(response.content) - - yield (name, file) - - def read_parquet_with_pyarrow(path) -> pa.Table: kwargs = {} if get_protocol_from_path(path) == "s3": @@ -234,19 +214,6 @@ def test_parquet_read_table(parquet_file, public_storage_io_config, multithreade pd.testing.assert_frame_equal(daft_native_read.to_pandas(), pa_read.to_pandas()) -@pytest.mark.integration() -@pytest.mark.parametrize( - "multithreaded_io", - [False, True], -) -def test_parquet_read_table_local(local_parquet_file, public_storage_io_config, multithreaded_io): - _, url = local_parquet_file - daft_native_read = Table.read_parquet(url, io_config=public_storage_io_config, multithreaded_io=multithreaded_io) - pa_read = Table.from_arrow(read_parquet_with_pyarrow(url)) - assert daft_native_read.schema() == pa_read.schema() - pd.testing.assert_frame_equal(daft_native_read.to_pandas(), pa_read.to_pandas()) - - @pytest.mark.integration() @pytest.mark.parametrize( "multithreaded_io", @@ -312,15 +279,7 @@ def test_parquet_into_pyarrow_bulk(parquet_file, public_storage_io_config, multi @pytest.mark.integration() def test_parquet_read_df(parquet_file, public_storage_io_config): _, url = parquet_file - # This is a hack until we remove `fsspec.info`, `fsspec.glob` and `fsspec.glob` from `daft.read_parquet`. - # We rely on the native downloaders impl for that - if url.startswith("az"): - import adlfs - - fs = adlfs.AzureBlobFileSystem(account_name="dafttestdata", anon=True) - else: - fs = None - daft_native_read = daft.read_parquet(url, io_config=public_storage_io_config, use_native_downloader=True, fs=fs) + daft_native_read = daft.read_parquet(url, io_config=public_storage_io_config) pa_read = Table.from_arrow(read_parquet_with_pyarrow(url)) assert daft_native_read.schema() == pa_read.schema() pd.testing.assert_frame_equal(daft_native_read.to_pandas(), pa_read.to_pandas()) @@ -363,43 +322,6 @@ def test_row_groups_selection(public_storage_io_config, multithreaded_io): assert all_rows.to_arrow()[0:10] == out_of_order.to_arrow()[10:20] -@pytest.mark.integration() -@pytest.mark.parametrize( - "multithreaded_io", - [False, True], -) -def test_row_groups_selection_local(public_storage_io_config, multithreaded_io): - url = "tests/assets/parquet-data/mvp.parquet" - all_rows = Table.read_parquet(url, io_config=public_storage_io_config, multithreaded_io=multithreaded_io) - assert len(all_rows) == 100 - first = Table.read_parquet( - url, io_config=public_storage_io_config, multithreaded_io=multithreaded_io, row_groups=[0] - ) - assert len(first) == 10 - assert all_rows.to_arrow()[:10] == first.to_arrow() - - fifth = Table.read_parquet( - url, io_config=public_storage_io_config, multithreaded_io=multithreaded_io, row_groups=[5] - ) - assert len(fifth) == 10 - assert all_rows.to_arrow()[50:60] == fifth.to_arrow() - - repeated = Table.read_parquet( - url, io_config=public_storage_io_config, multithreaded_io=multithreaded_io, row_groups=[1, 1, 1] - ) - assert len(repeated) == 30 - assert all_rows.to_arrow()[10:20] == repeated.to_arrow()[:10] - assert all_rows.to_arrow()[10:20] == repeated.to_arrow()[10:20] - assert all_rows.to_arrow()[10:20] == repeated.to_arrow()[20:] - - out_of_order = Table.read_parquet( - url, io_config=public_storage_io_config, multithreaded_io=multithreaded_io, row_groups=[1, 0] - ) - assert len(out_of_order) == 20 - assert all_rows.to_arrow()[10:20] == out_of_order.to_arrow()[:10] - assert all_rows.to_arrow()[0:10] == out_of_order.to_arrow()[10:20] - - @pytest.mark.integration() @pytest.mark.parametrize( "multithreaded_io", diff --git a/tests/integration/io/test_url_download_public_aws_s3.py b/tests/integration/io/test_url_download_public_aws_s3.py index e5a42bf379..956f4743db 100644 --- a/tests/integration/io/test_url_download_public_aws_s3.py +++ b/tests/integration/io/test_url_download_public_aws_s3.py @@ -1,17 +1,17 @@ from __future__ import annotations import pytest -import s3fs import daft @pytest.mark.integration() def test_url_download_aws_s3_public_bucket_custom_s3fs(small_images_s3_paths): - fs = s3fs.S3FileSystem(anon=True) data = {"urls": small_images_s3_paths} df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download(fs=fs)) + df = df.with_column( + "data", df["urls"].url.download(io_config=daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))) + ) data = df.to_pydict() assert len(data["data"]) == 6 @@ -21,10 +21,11 @@ def test_url_download_aws_s3_public_bucket_custom_s3fs(small_images_s3_paths): @pytest.mark.integration() def test_url_download_aws_s3_public_bucket_custom_s3fs_wrong_region(small_images_s3_paths): - fs = s3fs.S3FileSystem(anon=True) data = {"urls": small_images_s3_paths} df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download(fs=fs)) + df = df.with_column( + "data", df["urls"].url.download(io_config=daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))) + ) data = df.to_pydict() assert len(data["data"]) == 6 diff --git a/tests/integration/io/test_url_download_s3_minio.py b/tests/integration/io/test_url_download_s3_minio.py index 48e17002c8..821e0ae40b 100644 --- a/tests/integration/io/test_url_download_s3_minio.py +++ b/tests/integration/io/test_url_download_s3_minio.py @@ -1,29 +1,13 @@ from __future__ import annotations import pytest -import s3fs import daft -@pytest.mark.integration() -def test_url_download_minio_custom_s3fs(minio_io_config, minio_image_data_fixture, image_data): - urls = minio_image_data_fixture - fs = s3fs.S3FileSystem( - key=minio_io_config.s3.key_id, - password=minio_io_config.s3.access_key, - client_kwargs={"endpoint_url": minio_io_config.s3.endpoint_url}, - ) - data = {"urls": urls} - df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download(fs=fs)) - - assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(urls))]} - - @pytest.mark.integration() def test_url_download_minio_native_downloader(minio_io_config, minio_image_data_fixture, image_data): data = {"urls": minio_image_data_fixture} df = daft.from_pydict(data) - df = df.with_column("data", df["urls"].url.download(io_config=minio_io_config, use_native_downloader=True)) + df = df.with_column("data", df["urls"].url.download(io_config=minio_io_config)) assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(minio_image_data_fixture))]} diff --git a/tests/udf_library/test_url_udfs.py b/tests/udf_library/test_url_udfs.py index 244d2741cd..a8daee4596 100644 --- a/tests/udf_library/test_url_udfs.py +++ b/tests/udf_library/test_url_udfs.py @@ -6,10 +6,8 @@ import pandas as pd import pytest -from fsspec.implementations.local import LocalFileSystem import daft -from daft.context import get_context from daft.expressions import col from tests.conftest import assert_df_equals @@ -43,26 +41,6 @@ def test_download(files, use_native_downloader): assert_df_equals(df.to_pandas(), pd_df, sort_key="filenames") -@pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") -def test_download_custom_ds(files): - # Mark that this filesystem instance shouldn't be automatically reused by fsspec; without this, - # fsspec would cache this instance and reuse it for Daft's default construction of filesystems, - # which would make this test pass without the passed filesystem being used. - - # Run it twice to ensure runtime works - for _ in range(2): - fs = LocalFileSystem(skip_instance_cache=True) - - df = daft.from_pydict({"filenames": [str(f) for f in files]}) - - df = df.with_column("bytes", col("filenames").url.download(fs=fs)) - out_df = df.to_pandas() - - pd_df = pd.DataFrame.from_dict({"filenames": [str(f) for f in files]}) - pd_df["bytes"] = pd.Series([pathlib.Path(fn).read_bytes() for fn in files]) - assert_df_equals(out_df, pd_df, sort_key="filenames") - - @pytest.mark.parametrize("use_native_downloader", [False, True]) def test_download_with_none(files, use_native_downloader): data = {"id": list(range(len(files) * 2)), "filenames": [str(f) for f in files] + [None for _ in range(len(files))]}