diff --git a/daft/__init__.py b/daft/__init__.py index 99ff9618ae..07edd80585 100644 --- a/daft/__init__.py +++ b/daft/__init__.py @@ -1,7 +1,6 @@ from __future__ import annotations import os -from typing import TYPE_CHECKING from daft.logging import setup_logger @@ -46,11 +45,6 @@ def get_build_type() -> str: __version__ = get_version() -if TYPE_CHECKING: - # Placeholder for type checking for Rust module - class daft: - pass - ### # Initialize analytics diff --git a/daft/context.py b/daft/context.py index 777b64d56a..88111265b4 100644 --- a/daft/context.py +++ b/daft/context.py @@ -109,7 +109,10 @@ def logical_plan_builder_class(self) -> type[LogicalPlanBuilder]: from daft.logical.logical_plan import PyLogicalPlanBuilder from daft.logical.rust_logical_plan import RustLogicalPlanBuilder - return RustLogicalPlanBuilder if self.use_rust_planner else PyLogicalPlanBuilder + if self.use_rust_planner: + return RustLogicalPlanBuilder + else: + return PyLogicalPlanBuilder _DaftContext = DaftContext() diff --git a/daft/daft.pyi b/daft/daft.pyi index 9e18fbc9b3..c91d3dd5ad 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1,5 +1,12 @@ +import builtins from enum import Enum -from typing import Any +from typing import Any, Callable + +from daft.runners.partitioning import PartitionCacheEntry +from daft.execution import physical_plan +from daft.planner.planner import PartitionT +import pyarrow +import fsspec class ImageMode(Enum): """ @@ -54,13 +61,9 @@ class ImageFormat(Enum): """ PNG: int - JPEG: int - TIFF: int - GIF: int - BMP: int @staticmethod @@ -70,4 +73,640 @@ class ImageFormat(Enum): """ ... +class JoinType(Enum): + """ + Type of a join operation. + """ + + Inner: int + Left: int + Right: int + + @staticmethod + def from_join_type_str(join_type: str) -> JoinType: + """ + Create a JoinType from its string representation. + + Args: + join_type: String representation of the join type. This is the same as the enum + attribute name, e.g. ``JoinType.from_join_type_str("Inner")`` would + return ``JoinType.Inner``. + """ + ... + +class CountMode(Enum): + """ + Supported count modes for Daft's count aggregation. + + | All - Count both non-null and null values. + | Valid - Count only valid values. + | Null - Count only null values. + """ + + All: int + Valid: int + Null: int + + @staticmethod + def from_count_mode_str(count_mode: str) -> CountMode: + """ + Create a CountMode from its string representation. + + Args: + count_mode: String representation of the count mode , e.g. "all", "valid", or "null". + """ + ... + +class PartitionScheme(Enum): + """ + Partition scheme for Daft DataFrame. + """ + + Range: int + Hash: int + Random: int + Unknown: int + +class PartitionSpec: + """ + Partition specification for a Daft DataFrame. + """ + + scheme: PartitionScheme + num_partitions: int + by: list[PyExpr] + + def __init__( + self, scheme: PartitionScheme = PartitionScheme.Unknown, num_partitions: int = 0, by: list[PyExpr] | None = None + ): ... + def __eq__(self, other: PartitionSpec) -> bool: ... # type: ignore[override] + def __ne__(self, other: PartitionSpec) -> bool: ... # type: ignore[override] + def __str__(self) -> str: ... + +class ResourceRequest: + """ + Resource request for a query fragment task. + """ + + num_cpus: float | None + num_gpus: float | None + memory_bytes: int | None + + def __init__( + self, num_cpus: float | None = None, num_gpus: float | None = None, memory_bytes: int | None = None + ): ... + @staticmethod + def max_resources(resource_requests: list[ResourceRequest]): + """Take a field-wise max of the list of resource requests.""" + ... + def __add__(self, other: ResourceRequest) -> ResourceRequest: ... + def __repr__(self) -> str: ... + def __eq__(self, other: ResourceRequest) -> bool: ... # type: ignore[override] + def __ne__(self, other: ResourceRequest) -> bool: ... # type: ignore[override] + +class FileFormat(Enum): + """ + Format of a file, e.g. Parquet, CSV, and JSON. + """ + + Parquet: int + Csv: int + Json: int + +class ParquetSourceConfig: + """ + Configuration of a Parquet data source. + """ + +class CsvSourceConfig: + """ + Configuration of a CSV data source. + """ + + delimiter: str + has_headers: bool + + def __init__(self, delimiter: str, has_headers: bool): ... + +class JsonSourceConfig: + """ + Configuration of a JSON data source. + """ + +class FileFormatConfig: + """ + Configuration for parsing a particular file format (Parquet, CSV, JSON). + """ + + config: ParquetSourceConfig | CsvSourceConfig | JsonSourceConfig + + @staticmethod + def from_parquet_config(config: ParquetSourceConfig) -> FileFormatConfig: + """ + Create a Parquet file format config. + """ + ... + @staticmethod + def from_csv_config(config: CsvSourceConfig) -> FileFormatConfig: + """ + Create a CSV file format config. + """ + ... + @staticmethod + def from_json_config(config: JsonSourceConfig) -> FileFormatConfig: + """ + Create a JSON file format config. + """ + ... + def file_format(self) -> FileFormat: + """ + Get the file format for this config. + """ + ... + def __eq__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override] + def __ne__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override] + +class FileInfo: + """ + Metadata for a single file. + """ + + file_path: str + file_size: int | None + num_rows: int | None + +class FileInfos: + """ + Metadata for a collection of files. + """ + + file_paths: list[str] + file_sizes: list[int | None] + num_rows: list[int | None] + + @staticmethod + def from_infos(file_paths: list[str], file_sizes: list[int | None], num_rows: list[int | None]) -> FileInfos: ... + @staticmethod + def from_table(table: PyTable) -> FileInfos: + """ + Create from a Daft table with "path", "size", and "num_rows" columns. + """ + ... + def extend(self, new_infos: FileInfos) -> FileInfos: + """ + Concatenate two FileInfos together. + """ + ... + def __getitem__(self, idx: int) -> FileInfo: ... + def to_table(self) -> PyTable: + """ + Convert to a Daft table with "path", "size", and "num_rows" columns. + """ + def __len__(self) -> int: ... + +class S3Config: + """ + I/O configuration for accessing an S3-compatible system. + """ + + def __init__( + self, + region_name: str | None = None, + endpoint_url: str | None = None, + key_id: str | None = None, + session_token: str | None = None, + access_key: str | None = None, + max_connections: int | None = None, + retry_initial_backoff_ms: int | None = None, + connect_timeout_ms: int | None = None, + read_timeout_ms: int | None = None, + num_tries: int | None = None, + retry_mode: str | None = None, + anonymous: bool | None = None, + ): ... + +class AzureConfig: + """ + I/O configuration for accessing Azure Blob Storage. + """ + + def __init__( + self, storage_account: str | None = None, access_key: str | None = None, anonymous: str | None = None + ): ... + +class GCSConfig: + """ + I/O configuration for accessing Google Cloud Storage. + """ + + def __init__(self, project_id: str | None = None, anonymous: bool | None = None): ... + +class IOConfig: + """ + Configuration for the native I/O layer, e.g. credentials for accessing cloud storage systems. + """ + + s3: S3Config + azure: AzureConfig + gcs: GCSConfig + + def __init__(self, s3: S3Config | None = None, azure: AzureConfig | None = None, gcs: GCSConfig | None = None): ... + @staticmethod + def from_json(input: str) -> IOConfig: + """ + Recreate an IOConfig from a JSON string. + """ + +class NativeStorageConfig: + """ + Storage configuration for the Rust-native I/O layer. + """ + + io_config: IOConfig + + def __init__(self, io_config: IOConfig | None = None): ... + +class PythonStorageConfig: + """ + Storage configuration for the legacy Python I/O layer. + """ + + fs: fsspec.AbstractFileSystem + + def __init__(self, fs: fsspec.AbstractFileSystem | None = None): ... + +class StorageConfig: + """ + Configuration for interacting with a particular storage backend, using a particular + I/O layer implementation. + """ + + @staticmethod + def native(config: NativeStorageConfig) -> StorageConfig: + """ + Create from a native storage config. + """ + ... + @staticmethod + def python(config: PythonStorageConfig) -> StorageConfig: + """ + Create from a Python storage config. + """ + ... + @property + def config(self) -> NativeStorageConfig | PythonStorageConfig: ... + +def read_parquet( + uri: str, + columns: list[str] | None = None, + start_offset: int | None = None, + num_rows: int | None = None, + row_groups: list[int] | None = None, + io_config: IOConfig | None = None, + multithreaded_io: bool | None = None, + coerce_int96_timestamp_unit: PyTimeUnit | None = None, +): ... +def read_parquet_bulk( + uris: list[str], + columns: list[str] | None = None, + start_offset: int | None = None, + num_rows: int | None = None, + row_groups: list[list[int]] | None = None, + io_config: IOConfig | None = None, + multithreaded_io: bool | None = None, + coerce_int96_timestamp_unit: PyTimeUnit | None = None, +): ... +def read_parquet_statistics( + uris: PySeries, + io_config: IOConfig | None = None, + multithreaded_io: bool | None = None, +): ... +def read_parquet_into_pyarrow( + uri: str, + columns: list[str] | None = None, + start_offset: int | None = None, + num_rows: int | None = None, + row_groups: list[int] | None = None, + io_config: IOConfig | None = None, + multithreaded_io: bool | None = None, + coerce_int96_timestamp_unit: PyTimeUnit | None = None, +): ... +def read_parquet_schema( + uri: str, + io_config: IOConfig | None = None, + multithreaded_io: bool | None = None, + coerce_int96_timestamp_unit: PyTimeUnit | None = None, +): ... + +class PyTimeUnit: + @staticmethod + def nanoseconds() -> PyTimeUnit: ... + @staticmethod + def microseconds() -> PyTimeUnit: ... + @staticmethod + def milliseconds() -> PyTimeUnit: ... + @staticmethod + def seconds() -> PyTimeUnit: ... + +class PyDataType: + @staticmethod + def null() -> PyDataType: ... + @staticmethod + def bool() -> PyDataType: ... + @staticmethod + def int8() -> PyDataType: ... + @staticmethod + def int16() -> PyDataType: ... + @staticmethod + def int32() -> PyDataType: ... + @staticmethod + def int64() -> PyDataType: ... + @staticmethod + def uint8() -> PyDataType: ... + @staticmethod + def uint16() -> PyDataType: ... + @staticmethod + def uint32() -> PyDataType: ... + @staticmethod + def uint64() -> PyDataType: ... + @staticmethod + def float32() -> PyDataType: ... + @staticmethod + def float64() -> PyDataType: ... + @staticmethod + def binary() -> PyDataType: ... + @staticmethod + def string() -> PyDataType: ... + @staticmethod + def decimal128(precision: int, size: int) -> PyDataType: ... + @staticmethod + def date() -> PyDataType: ... + @staticmethod + def timestamp(time_unit: PyTimeUnit, timezone: str | None = None) -> PyDataType: ... + @staticmethod + def duration(time_unit: PyTimeUnit) -> PyDataType: ... + @staticmethod + def list(data_type: PyDataType) -> PyDataType: ... + @staticmethod + def fixed_size_list(data_type: PyDataType, size: int) -> PyDataType: ... + @staticmethod + def struct(fields: dict[str, PyDataType]) -> PyDataType: ... + @staticmethod + def extension(name: str, storage_data_type: PyDataType, metadata: str | None = None) -> PyDataType: ... + @staticmethod + def embedding(data_type: PyDataType, size: int) -> PyDataType: ... + @staticmethod + def image(mode: ImageMode | None = None, height: int | None = None, width: int | None = None) -> PyDataType: ... + @staticmethod + def tensor(dtype: PyDataType, shape: tuple[int, ...] | None = None) -> PyDataType: ... + @staticmethod + def python() -> PyDataType: ... + def to_arrow(self, cast_tensor_type_for_ray: builtins.bool | None = None) -> pyarrow.DataType: ... + def is_image(self) -> builtins.bool: ... + def is_fixed_shape_image(self) -> builtins.bool: ... + def is_tensor(self) -> builtins.bool: ... + def is_fixed_shape_tensor(self) -> builtins.bool: ... + def is_logical(self) -> builtins.bool: ... + def is_equal(self, other: Any) -> builtins.bool: ... + @staticmethod + def from_json(serialized: str) -> PyDataType: ... + def __setstate__(self, state: Any) -> None: ... + def __getstate__(self) -> Any: ... + def __hash__(self) -> int: ... + +class PyField: + def name(self) -> str: ... + def dtype(self) -> PyDataType: ... + def eq(self, other: PyField) -> bool: ... + def __setstate__(self, state: Any) -> None: ... + def __getstate__(self) -> Any: ... + +class PySchema: + def __getitem__(self, name: str) -> PyField: ... + def names(self) -> list[str]: ... + def union(self, other: PySchema) -> PySchema: ... + def eq(self, other: PySchema) -> bool: ... + @staticmethod + def from_field_name_and_types(names_and_types: list[tuple[str, PyDataType]]) -> PySchema: ... + @staticmethod + def from_fields(fields: list[PyField]) -> PySchema: ... + def __setstate__(self, state: Any) -> None: ... + def __getstate__(self) -> Any: ... + def __repr__(self) -> str: ... + def _repr_html_(self) -> str: ... + +class PyExpr: + def _input_mapping(self) -> str | None: ... + def _required_columns(self) -> set[str]: ... + def _is_column(self) -> bool: ... + def _replace_column_with_expression(self, column: str, new_expr: PyExpr) -> PyExpr: ... + def alias(self, name: str) -> PyExpr: ... + def cast(self, dtype: PyDataType) -> PyExpr: ... + def if_else(self, if_true: PyExpr, if_false: PyExpr) -> PyExpr: ... + def count(self, mode: CountMode) -> PyExpr: ... + def sum(self) -> PyExpr: ... + def mean(self) -> PyExpr: ... + def min(self) -> PyExpr: ... + def max(self) -> PyExpr: ... + def agg_list(self) -> PyExpr: ... + def agg_concat(self) -> PyExpr: ... + def explode(self) -> PyExpr: ... + def __abs__(self) -> PyExpr: ... + def __add__(self, other: PyExpr) -> PyExpr: ... + def __sub__(self, other: PyExpr) -> PyExpr: ... + def __mul__(self, other: PyExpr) -> PyExpr: ... + def __floordiv__(self, other: PyExpr) -> PyExpr: ... + def __truediv__(self, other: PyExpr) -> PyExpr: ... + def __mod__(self, other: PyExpr) -> PyExpr: ... + def __and__(self, other: PyExpr) -> PyExpr: ... + def __or__(self, other: PyExpr) -> PyExpr: ... + def __xor__(self, other: PyExpr) -> PyExpr: ... + def __invert__(self) -> PyExpr: ... + def __lt__(self, other: PyExpr) -> PyExpr: ... + def __le__(self, other: PyExpr) -> PyExpr: ... + def __gt__(self, other: PyExpr) -> PyExpr: ... + def __ge__(self, other: PyExpr) -> PyExpr: ... + def __eq__(self, other: PyExpr) -> PyExpr: ... # type: ignore[override] + def __ne__(self, other: PyExpr) -> PyExpr: ... # type: ignore[override] + def is_null(self) -> PyExpr: ... + def name(self) -> str: ... + def to_field(self, schema: PySchema) -> PyField: ... + def __repr__(self) -> str: ... + def __hash__(self) -> int: ... + def __setstate__(self, state: Any) -> None: ... + def __getstate__(self) -> Any: ... + def is_nan(self) -> PyExpr: ... + def dt_day(self) -> PyExpr: ... + def dt_month(self) -> PyExpr: ... + def dt_year(self) -> PyExpr: ... + def dt_day_of_week(self) -> PyExpr: ... + def utf8_endswith(self, pattern: PyExpr) -> PyExpr: ... + def utf8_startswith(self, pattern: PyExpr) -> PyExpr: ... + def utf8_contains(self, pattern: PyExpr) -> PyExpr: ... + def utf8_length(self) -> PyExpr: ... + def image_decode(self) -> PyExpr: ... + def image_encode(self, image_format: ImageFormat) -> PyExpr: ... + def image_resize(self, w: int, h: int) -> PyExpr: ... + def image_crop(self, bbox: PyExpr) -> PyExpr: ... + def list_join(self, delimiter: PyExpr) -> PyExpr: ... + def list_lengths(self) -> PyExpr: ... + def url_download( + self, max_connections: int, raise_error_on_failure: bool, multi_thread: bool, config: IOConfig | None = None + ) -> PyExpr: ... + +def eq(expr1: PyExpr, expr2: PyExpr) -> bool: ... +def col(name: str) -> PyExpr: ... +def lit(item: Any) -> PyExpr: ... +def udf(func: Callable, expressions: list[PyExpr], return_dtype: PyDataType) -> PyExpr: ... + +class PySeries: + @staticmethod + def from_arrow(name: str, pyarrow_array: pyarrow.Array) -> PySeries: ... + @staticmethod + def from_pylist(name: str, pylist: list[Any], pyobj: str) -> PySeries: ... + def to_pylist(self) -> list[Any]: ... + def to_arrow(self) -> pyarrow.Array: ... + def __abs__(self) -> PySeries: ... + def __add__(self, other: PySeries) -> PySeries: ... + def __sub__(self, other: PySeries) -> PySeries: ... + def __mul__(self, other: PySeries) -> PySeries: ... + def __truediv__(self, other: PySeries) -> PySeries: ... + def __mod__(self, other: PySeries) -> PySeries: ... + def __and__(self, other: PySeries) -> PySeries: ... + def __or__(self, other: PySeries) -> PySeries: ... + def __xor__(self, other: PySeries) -> PySeries: ... + def __lt__(self, other: PySeries) -> PySeries: ... + def __le__(self, other: PySeries) -> PySeries: ... + def __gt__(self, other: PySeries) -> PySeries: ... + def __ge__(self, other: PySeries) -> PySeries: ... + def __eq__(self, other: PySeries) -> PySeries: ... # type: ignore[override] + def __ne__(self, other: PySeries) -> PySeries: ... # type: ignore[override] + def take(self, idx: PySeries) -> PySeries: ... + def slice(self, start: int, end: int) -> PySeries: ... + def filter(self, mask: PySeries) -> PySeries: ... + def sort(self, descending: bool) -> PySeries: ... + def argsort(self, descending: bool) -> PySeries: ... + def hash(self, seed: PySeries | None = None) -> PySeries: ... + def __invert__(self) -> PySeries: ... + def _count(self, mode: CountMode) -> PySeries: ... + def _sum(self) -> PySeries: ... + def _mean(self) -> PySeries: ... + def _min(self) -> PySeries: ... + def _max(self) -> PySeries: ... + def _agg_list(self) -> PySeries: ... + def cast(self, dtype: PyDataType) -> PySeries: ... + @staticmethod + def concat(series: list[PySeries]) -> PySeries: ... + def __len__(self) -> int: ... + def size_bytes(self) -> int: ... + def name(self) -> str: ... + def rename(self, name: str) -> PySeries: ... + def data_type(self) -> PyDataType: ... + def utf8_endswith(self, pattern: PySeries) -> PySeries: ... + def utf8_startswith(self, pattern: PySeries) -> PySeries: ... + def utf8_contains(self, pattern: PySeries) -> PySeries: ... + def utf8_length(self) -> PySeries: ... + def is_nan(self) -> PySeries: ... + def dt_day(self) -> PySeries: ... + def dt_month(self) -> PySeries: ... + def dt_year(self) -> PySeries: ... + def dt_day_of_week(self) -> PySeries: ... + def list_lengths(self) -> PySeries: ... + def image_decode(self) -> PySeries: ... + def image_encode(self, image_format: ImageFormat) -> PySeries: ... + def image_resize(self, w: int, h: int) -> PySeries: ... + def if_else(self, other: PySeries, predicate: PySeries) -> PySeries: ... + def is_null(self) -> PySeries: ... + +class PyTable: + def schema(self) -> PySchema: ... + def cast_to_schema(self, schema: PySchema) -> PyTable: ... + def eval_expression_list(self, exprs: list[PyExpr]) -> PyTable: ... + def take(self, idx: PySeries) -> PyTable: ... + def filter(self, exprs: list[PyExpr]) -> PyTable: ... + def sort(self, sort_keys: list[PyExpr], descending: list[bool]) -> PyTable: ... + def argsort(self, sort_keys: list[PyExpr], descending: list[bool]) -> PySeries: ... + def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyTable: ... + def join(self, right: PyTable, left_on: list[PyExpr], right_on: list[PyExpr]) -> PyTable: ... + def explode(self, to_explode: list[PyExpr]) -> PyTable: ... + def head(self, num: int) -> PyTable: ... + def sample(self, num: int) -> PyTable: ... + def quantiles(self, num: int) -> PyTable: ... + def partition_by_hash(self, exprs: list[PyExpr], num_partitions: int) -> list[PyTable]: ... + def partition_by_random(self, num_partitions: int, seed: int) -> list[PyTable]: ... + def partition_by_range( + self, partition_keys: list[PyExpr], boundaries: PyTable, descending: list[bool] + ) -> list[PyTable]: ... + def __repr__(self) -> str: ... + def _repr_html_(self) -> str: ... + def __len__(self) -> int: ... + def size_bytes(self) -> int: ... + def column_names(self) -> list[str]: ... + def get_column(self, name: str) -> PySeries: ... + def get_column_by_index(self, idx: int) -> PySeries: ... + @staticmethod + def concat(tables: list[PyTable]) -> PyTable: ... + def slice(self, start: int, end: int) -> PyTable: ... + @staticmethod + def from_arrow_record_batches(record_batches: list[pyarrow.RecordBatch], schema: PySchema) -> PyTable: ... + @staticmethod + def from_pylist_series(dict: dict[str, PySeries]) -> PyTable: ... + def to_arrow_record_batch(self) -> pyarrow.RecordBatch: ... + @staticmethod + def empty(schema: PySchema | None = None) -> PyTable: ... + +class PhysicalPlanScheduler: + """ + A work scheduler for physical query plans. + """ + + def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: ... + +class LogicalPlanBuilder: + """ + A logical plan builder, which simplifies constructing logical plans via + a fluent interface. E.g., LogicalPlanBuilder.table_scan(..).project(..).filter(..). + + This builder holds the current root (sink) of the logical plan, and the building methods return + a brand new builder holding a new plan; i.e., this is an immutable builder. + """ + + @staticmethod + def in_memory_scan( + partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, partition_spec: PartitionSpec + ) -> LogicalPlanBuilder: ... + @staticmethod + def table_scan( + file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig + ) -> LogicalPlanBuilder: ... + def project(self, projection: list[PyExpr], resource_request: ResourceRequest) -> LogicalPlanBuilder: ... + def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ... + def limit(self, limit: int) -> LogicalPlanBuilder: ... + def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ... + def sort(self, sort_by: list[PyExpr], descending: list[bool]) -> LogicalPlanBuilder: ... + def repartition( + self, num_partitions: int, partition_by: list[PyExpr], scheme: PartitionScheme + ) -> LogicalPlanBuilder: ... + def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: ... + def distinct(self) -> LogicalPlanBuilder: ... + def aggregate(self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr]) -> LogicalPlanBuilder: ... + def join( + self, right: LogicalPlanBuilder, left_on: list[PyExpr], right_on: list[PyExpr], join_type: JoinType + ) -> LogicalPlanBuilder: ... + def concat(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder: ... + def table_write( + self, + root_dir: str, + file_format: FileFormat, + partition_cols: list[PyExpr] | None = None, + compression: str | None = None, + ) -> LogicalPlanBuilder: ... + def schema(self) -> PySchema: ... + def partition_spec(self) -> PartitionSpec: ... + def optimize(self) -> LogicalPlanBuilder: ... + def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler: ... + def repr_ascii(self, simple: bool) -> str: ... + +def build_type() -> str: ... +def version() -> str: ... def __getattr__(name) -> Any: ... diff --git a/daft/datasources.py b/daft/datasources.py deleted file mode 100644 index 2d396f96e2..0000000000 --- a/daft/datasources.py +++ /dev/null @@ -1,53 +0,0 @@ -from __future__ import annotations - -import sys -from dataclasses import dataclass -from enum import Enum -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from daft.io import IOConfig - -if sys.version_info < (3, 8): - from typing_extensions import Protocol -else: - from typing import Protocol - - -class StorageType(Enum): - CSV = "CSV" - PARQUET = "PARQUET" - JSON = "JSON" - - -class SourceInfo(Protocol): - """A class that provides information about a given Datasource""" - - def scan_type(self) -> StorageType: - ... - - -@dataclass(frozen=True) -class CSVSourceInfo(SourceInfo): - - delimiter: str - has_headers: bool - - def scan_type(self): - return StorageType.CSV - - -@dataclass(frozen=True) -class JSONSourceInfo(SourceInfo): - def scan_type(self): - return StorageType.JSON - - -@dataclass(frozen=True) -class ParquetSourceInfo(SourceInfo): - - use_native_downloader: bool - io_config: IOConfig | None - - def scan_type(self): - return StorageType.PARQUET diff --git a/daft/io/common.py b/daft/io/common.py index 0027b75128..ce316a201f 100644 --- a/daft/io/common.py +++ b/daft/io/common.py @@ -3,7 +3,7 @@ from typing import TYPE_CHECKING from daft.context import get_context -from daft.daft import FileFormatConfig, LogicalPlanBuilder, StorageConfig +from daft.daft import FileFormatConfig, StorageConfig from daft.datatype import DataType from daft.logical.builder import LogicalPlanBuilder from daft.logical.schema import Schema diff --git a/daft/logging.py b/daft/logging.py index 20da902f96..36bd6aa903 100644 --- a/daft/logging.py +++ b/daft/logging.py @@ -29,7 +29,7 @@ def emit(self, record: logging.LogRecord) -> None: frame = frame.f_back depth += 1 - logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + logger.opt(depth=depth - 1, exception=record.exc_info).log(level, record.getMessage()) # Clear out any existing standard loggers. root = logging.root diff --git a/daft/logical/schema.py b/daft/logical/schema.py index 6378e64418..74c4778f20 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -146,12 +146,14 @@ def from_parquet( cls, path: str, io_config: IOConfig | None = None, + multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), ) -> Schema: return Schema._from_pyschema( _read_parquet_schema( uri=path, io_config=io_config, + multithreaded_io=multithreaded_io, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, ) ) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index a980071959..d596974ff9 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -82,16 +82,17 @@ def _glob_path_into_file_infos( if isinstance(config, PythonStorageConfig): fs = config.fs file_infos = FileInfos() + file_format = file_format_config.file_format() if file_format_config is not None else None for path in paths: if fs is None: fs = get_filesystem_from_path(path) - path_file_infos = glob_path_with_stats(path, file_format_config, fs, storage_config) + path_file_infos = glob_path_with_stats(path, file_format, fs, storage_config) if len(path_file_infos) == 0: raise FileNotFoundError(f"No files found at {path}") file_infos.extend(path_file_infos) - return file_infos + return Table._from_pytable(file_infos.to_table()) @ray.remote @@ -134,7 +135,7 @@ def remote_len_partition(p: Table) -> int: def sample_schema_from_filepath( first_file_path: str, file_format_config: FileFormatConfig, - storage_config: StorageConfig | None, + storage_config: StorageConfig, ) -> Schema: """Ray remote function to run schema sampling on top of a Table containing a single filepath""" # Currently just samples the Schema from the first file @@ -222,15 +223,19 @@ def glob_paths_details( storage_config: StorageConfig | None = None, ) -> FileInfos: # Synchronously fetch the file infos, for now. - return ray.get( - _glob_path_into_file_infos.remote(source_paths, file_format_config, fs=fs, storage_config=storage_config) + return FileInfos.from_table( + ray.get( + _glob_path_into_file_infos.remote( + source_paths, file_format_config, fs=fs, storage_config=storage_config + ) + )._table ) def get_schema_from_first_filepath( self, file_infos: FileInfos, file_format_config: FileFormatConfig, - storage_config: StorageConfig | None, + storage_config: StorageConfig, ) -> Schema: if len(file_infos) == 0: raise ValueError("No files to get schema from") diff --git a/daft/runners/runner_io.py b/daft/runners/runner_io.py index eff6c57caa..58e7147f8c 100644 --- a/daft/runners/runner_io.py +++ b/daft/runners/runner_io.py @@ -49,7 +49,7 @@ def glob_paths_details( @abstractmethod def get_schema_from_first_filepath( self, - file_info: FileInfos, + file_infos: FileInfos, file_format_config: FileFormatConfig, storage_config: StorageConfig, ) -> Schema: diff --git a/daft/table/table.py b/daft/table/table.py index eb39859814..7a0b0831af 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -430,7 +430,6 @@ def read_parquet_statistics( def _trim_pyarrow_large_arrays(arr: pa.ChunkedArray) -> pa.ChunkedArray: - if pa.types.is_large_binary(arr.type) or pa.types.is_large_string(arr.type): if pa.types.is_large_binary(arr.type): target_type = pa.binary() @@ -464,7 +463,6 @@ def read_parquet_into_pyarrow( multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), ) -> pa.Table: - fields, metadata, columns = _read_parquet_into_pyarrow( uri=path, columns=columns, diff --git a/daft/udf.py b/daft/udf.py index 00cde3da96..2d3ad0eba9 100644 --- a/daft/udf.py +++ b/daft/udf.py @@ -8,7 +8,7 @@ from daft.datatype import DataType from daft.expressions import Expression -from daft.series import Series +from daft.series import PySeries, Series _NUMPY_AVAILABLE = True try: @@ -29,7 +29,7 @@ class PartialUDF: def expressions(self) -> dict[str, Expression]: return {key: val for key, val in self.bound_args.arguments.items() if isinstance(val, Expression)} - def __call__(self, evaluated_expressions: list[Series]) -> Series: + def __call__(self, evaluated_expressions: list[Series]) -> PySeries: kwarg_keys = list(self.bound_args.kwargs.keys()) arg_keys = [k for k in self.bound_args.arguments.keys() if k not in self.bound_args.kwargs.keys()] pyvalues = {key: val for key, val in self.bound_args.arguments.items() if not isinstance(val, Expression)} diff --git a/src/daft-core/src/datatypes/image_format.rs b/src/daft-core/src/datatypes/image_format.rs index b20e18c37b..a5c5d8959f 100644 --- a/src/daft-core/src/datatypes/image_format.rs +++ b/src/daft-core/src/datatypes/image_format.rs @@ -9,12 +9,6 @@ use serde::{Deserialize, Serialize}; use common_error::{DaftError, DaftResult}; /// Supported image formats for Daft's I/O layer. -/// -/// | PNG -/// | JPEG -/// | TIFF -/// | GIF -/// | BMP #[allow(clippy::upper_case_acronyms)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass)] diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 8fa31000c6..142504825b 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -19,12 +19,20 @@ use daft_dsl::Expr; #[cfg(feature = "python")] use { - crate::source_info::{InMemoryInfo, PyFileFormatConfig}, + crate::{ + physical_plan::PhysicalPlan, + source_info::{InMemoryInfo, PyFileFormatConfig}, + }, daft_core::python::schema::PySchema, daft_dsl::python::PyExpr, pyo3::prelude::*, }; +/// A logical plan builder, which simplifies constructing logical plans via +/// a fluent interface. E.g., LogicalPlanBuilder::table_scan(..).project(..).filter(..).build(). +/// +/// This builder holds the current root (sink) of the logical plan, and the building methods return +/// a brand new builder holding a new plan; i.e., this is an immutable builder. #[derive(Debug)] pub struct LogicalPlanBuilder { // The current root of the logical plan in this builder. @@ -169,14 +177,14 @@ impl LogicalPlanBuilder { pub fn join( &self, - other: &Self, + right: &Self, left_on: Vec, right_on: Vec, join_type: JoinType, ) -> DaftResult { let logical_plan: LogicalPlan = logical_ops::Join::try_new( self.plan.clone(), - other.plan.clone(), + right.plan.clone(), left_on, right_on, join_type, @@ -237,6 +245,11 @@ impl From for LogicalPlanBuilder { } } +/// A Python-facing wrapper of the LogicalPlanBuilder. +/// +/// This lightweight proxy interface should hold as much of the Python-specific logic +/// as possible, converting pyo3 wrapper type arguments into their underlying Rust-native types +/// (e.g. PySchema -> Schema). #[cfg_attr(feature = "python", pyclass(name = "LogicalPlanBuilder"))] #[derive(Debug)] pub struct PyLogicalPlanBuilder { @@ -359,7 +372,7 @@ impl PyLogicalPlanBuilder { pub fn join( &self, - other: &Self, + right: &Self, left_on: Vec, right_on: Vec, join_type: JoinType, @@ -374,7 +387,7 @@ impl PyLogicalPlanBuilder { .collect::>(); Ok(self .builder - .join(&other.builder, left_on, right_on, join_type)? + .join(&right.builder, left_on, right_on, join_type)? .into()) } @@ -405,6 +418,7 @@ impl PyLogicalPlanBuilder { Ok(self.builder.partition_spec()) } + /// Optimize the underlying logical plan, returning a new plan builder containing the optimized plan. pub fn optimize(&self) -> PyResult { let optimizer = Optimizer::new(Default::default()); let unoptimized_plan = self.builder.build(); @@ -433,10 +447,13 @@ impl PyLogicalPlanBuilder { Ok(builder.into()) } + /// Finalize the logical plan, translate the logical plan to a physical plan, and return + /// a physical plan scheduler that's capable of launching the work necessary to compute the output + /// of the physical plan. pub fn to_physical_plan_scheduler(&self) -> PyResult { let logical_plan = self.builder.build(); - let physical_plan = plan(logical_plan.as_ref())?; - Ok(Arc::new(physical_plan).into()) + let physical_plan: Arc = plan(logical_plan.as_ref())?.into(); + Ok(physical_plan.into()) } pub fn repr_ascii(&self, simple: bool) -> PyResult { diff --git a/src/daft-plan/src/join.rs b/src/daft-plan/src/join.rs index ef049d6ff1..c1290ed126 100644 --- a/src/daft-plan/src/join.rs +++ b/src/daft-plan/src/join.rs @@ -15,6 +15,7 @@ use pyo3::{ use serde::{Deserialize, Serialize}; +/// Type of a join operation. #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub enum JoinType { diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index d12ab418be..454e2e1116 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -8,6 +8,7 @@ use snafu::Snafu; use crate::{display::TreeDisplay, logical_ops::*, PartitionScheme, PartitionSpec}; +/// Logical plan for a Daft query. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum LogicalPlan { Source(Source), diff --git a/src/daft-plan/src/partitioning.rs b/src/daft-plan/src/partitioning.rs index fd3bdabf13..675734e529 100644 --- a/src/daft-plan/src/partitioning.rs +++ b/src/daft-plan/src/partitioning.rs @@ -12,6 +12,7 @@ use { }, }; +/// Partition scheme for Daft DataFrame. #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub enum PartitionScheme { @@ -40,12 +41,12 @@ impl PartitionScheme { impl_bincode_py_state_serialization!(PartitionScheme); +/// Partition specification: scheme, number of partitions, partition column. #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct PartitionSpec { pub scheme: PartitionScheme, pub num_partitions: usize, - // TODO(Clark): Port ExpressionsProjection. pub by: Option>, } diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index 2c87e143f5..ea281da1f0 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use crate::physical_ops::*; +/// Physical plan for a Daft query. #[derive(Debug, Serialize, Deserialize)] pub enum PhysicalPlan { #[cfg(feature = "python")] @@ -54,6 +55,7 @@ pub enum PhysicalPlan { TabularWriteCsv(TabularWriteCsv), } +/// A work scheduler for physical plans. #[cfg_attr(feature = "python", pyclass)] #[derive(Debug, Serialize, Deserialize)] pub struct PhysicalPlanScheduler { @@ -85,6 +87,7 @@ impl PhysicalPlanScheduler { } } + /// Converts the contained physical plan into an iterator of executable partition tasks. pub fn to_partition_tasks(&self, psets: HashMap>) -> PyResult { Python::with_gil(|py| self.plan.to_partition_tasks(py, &psets)) } diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index cb45345847..5a38d0ed02 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -21,6 +21,7 @@ use crate::{FileFormat, PartitionScheme}; #[cfg(feature = "python")] use crate::physical_ops::InMemoryScan; +/// Translate a logical plan to a physical plan. pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { match logical_plan { LogicalPlan::Source(Source { diff --git a/src/daft-plan/src/resource_request.rs b/src/daft-plan/src/resource_request.rs index 81e343d9bd..95b8ccd392 100644 --- a/src/daft-plan/src/resource_request.rs +++ b/src/daft-plan/src/resource_request.rs @@ -8,6 +8,7 @@ use { use serde::{Deserialize, Serialize}; +/// Resource request for a query fragment task. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct ResourceRequest { @@ -56,11 +57,11 @@ fn float_max(left: f64, right: f64) -> f64 { #[pymethods] impl ResourceRequest { #[new] - #[pyo3(signature = (num_cpus=None, num_gpus=None, memory_bytes=None))] pub fn new(num_cpus: Option, num_gpus: Option, memory_bytes: Option) -> Self { Self::new_internal(num_cpus, num_gpus, memory_bytes) } + /// Take a field-wise max of the list of resource requests. #[staticmethod] pub fn max_resources(resource_requests: Vec) -> Self { resource_requests.iter().fold(Default::default(), |acc, e| { diff --git a/src/daft-plan/src/source_info/file_format.rs b/src/daft-plan/src/source_info/file_format.rs index 1cb95fbd94..e7cae3ff89 100644 --- a/src/daft-plan/src/source_info/file_format.rs +++ b/src/daft-plan/src/source_info/file_format.rs @@ -12,6 +12,7 @@ use pyo3::{ IntoPy, PyObject, PyResult, Python, }; +/// Format of a file, e.g. Parquet, CSV, JSON. #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub enum FileFormat { @@ -49,6 +50,7 @@ impl From<&FileFormatConfig> for FileFormat { } } +/// Configuration for parsing a particular file format. #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum FileFormatConfig { Parquet(ParquetSourceConfig), @@ -68,6 +70,7 @@ impl FileFormatConfig { } } +/// Configuration for a Parquet data source. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct ParquetSourceConfig; @@ -75,6 +78,7 @@ pub struct ParquetSourceConfig; #[cfg(feature = "python")] #[pymethods] impl ParquetSourceConfig { + /// Create a config for a Parquet data source. #[new] fn new() -> Self { Self {} @@ -83,6 +87,7 @@ impl ParquetSourceConfig { impl_bincode_py_state_serialization!(ParquetSourceConfig); +/// Configuration for a CSV data source. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] pub struct CsvSourceConfig { @@ -93,6 +98,12 @@ pub struct CsvSourceConfig { #[cfg(feature = "python")] #[pymethods] impl CsvSourceConfig { + /// Create a config for a CSV data source. + /// + /// # Arguments + /// + /// * `delimiter` - The character delmiting individual cells in the CSV data. + /// * `has_headers` - Whether the CSV has a header row; if so, it will be skipped during data parsing. #[new] fn new(delimiter: String, has_headers: bool) -> Self { Self { @@ -104,6 +115,7 @@ impl CsvSourceConfig { impl_bincode_py_state_serialization!(CsvSourceConfig); +/// Configuration for a JSON data source. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] pub struct JsonSourceConfig {} @@ -111,6 +123,7 @@ pub struct JsonSourceConfig {} #[cfg(feature = "python")] #[pymethods] impl JsonSourceConfig { + /// Create a config for a JSON data source. #[new] fn new() -> Self { Self {} @@ -119,6 +132,7 @@ impl JsonSourceConfig { impl_bincode_py_state_serialization!(JsonSourceConfig); +/// Configuration for parsing a particular file format. #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(transparent)] #[cfg_attr( @@ -143,21 +157,25 @@ impl PyFileFormatConfig { } } + /// Create a Parquet file format config. #[staticmethod] fn from_parquet_config(config: ParquetSourceConfig) -> Self { Self(Arc::new(FileFormatConfig::Parquet(config))) } + /// Create a CSV file format config. #[staticmethod] fn from_csv_config(config: CsvSourceConfig) -> Self { Self(Arc::new(FileFormatConfig::Csv(config))) } + /// Create a JSON file format config. #[staticmethod] fn from_json_config(config: JsonSourceConfig) -> Self { Self(Arc::new(FileFormatConfig::Json(config))) } + /// Get the underlying data source config. #[getter] fn get_config(&self, py: Python) -> PyObject { use FileFormatConfig::*; @@ -169,6 +187,7 @@ impl PyFileFormatConfig { } } + /// Get the file format for this file format config. fn file_format(&self) -> FileFormat { self.0.as_ref().into() } diff --git a/src/daft-plan/src/source_info/file_info.rs b/src/daft-plan/src/source_info/file_info.rs index 57bd9eaa11..f71e9f41a3 100644 --- a/src/daft-plan/src/source_info/file_info.rs +++ b/src/daft-plan/src/source_info/file_info.rs @@ -14,6 +14,7 @@ use { }, }; +/// Metadata for a single file. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] pub struct FileInfo { @@ -41,6 +42,7 @@ impl FileInfo { } } +/// Metadata for a collection of files. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] pub struct FileInfos { @@ -74,11 +76,13 @@ impl FileInfos { Self::new_internal(file_paths, file_sizes, num_rows) } + /// Create from a Daft table with "path", "size", and "num_rows" columns. #[staticmethod] pub fn from_table(table: PyTable) -> PyResult { Ok(Self::from_table_internal(table.table)?) } + /// Concatenate two FileInfos together. pub fn extend(&mut self, new_infos: Self) { self.file_paths.extend(new_infos.file_paths); self.file_sizes.extend(new_infos.file_sizes); @@ -96,6 +100,7 @@ impl FileInfos { )) } + /// Convert to a Daft table with "path", "size", and "num_rows" columns. pub fn to_table(&self) -> PyResult { Ok(self.to_table_internal()?.into()) } diff --git a/src/daft-plan/src/source_info/storage_config.rs b/src/daft-plan/src/source_info/storage_config.rs index 5f6ebe338d..30ae0d4efd 100644 --- a/src/daft-plan/src/source_info/storage_config.rs +++ b/src/daft-plan/src/source_info/storage_config.rs @@ -17,6 +17,8 @@ use { std::hash::{Hash, Hasher}, }; +/// Configuration for interacting with a particular storage backend, using a particular +/// I/O layer implementation. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] pub enum StorageConfig { Native(Arc), @@ -24,6 +26,7 @@ pub enum StorageConfig { Python(PythonStorageConfig), } +/// Storage configuration for the Rust-native I/O layer. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct NativeStorageConfig { @@ -50,10 +53,12 @@ impl NativeStorageConfig { } } +/// Storage configuration for the legacy Python I/O layer. #[derive(Clone, Debug, Serialize, Deserialize)] #[cfg(feature = "python")] #[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))] pub struct PythonStorageConfig { + /// An fsspec filesystem instance. #[serde( serialize_with = "serialize_py_object_optional", deserialize_with = "deserialize_py_object_optional", @@ -102,6 +107,7 @@ impl Hash for PythonStorageConfig { } } +/// A Python-exposed interface for storage configs. #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(transparent)] #[cfg_attr( @@ -128,16 +134,20 @@ impl PyStorageConfig { ))), } } + + /// Create from a native storage config. #[staticmethod] fn native(config: NativeStorageConfig) -> Self { Self(Arc::new(StorageConfig::Native(config.into()))) } + /// Create from a Python storage config. #[staticmethod] fn python(config: PythonStorageConfig) -> Self { Self(Arc::new(StorageConfig::Python(config))) } + /// Get the underlying storage config. #[getter] fn get_config(&self, py: Python) -> PyObject { use StorageConfig::*;