From 341ee2f6f51eef3fc1ef2068f81a5d713697c67e Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 2 Nov 2023 10:08:44 +0530 Subject: [PATCH] squash --- Cargo.lock | 4 + daft/daft.pyi | 57 +++- daft/execution/execution_step.py | 1 - daft/execution/rust_physical_plan_shim.py | 45 +++ daft/iceberg/__init__.py | 0 daft/iceberg/iceberg_scan.py | 94 ++++++ daft/io/_csv.py | 2 +- daft/io/_parquet.py | 8 +- daft/io/common.py | 56 ++-- daft/io/scan.py | 51 ++++ daft/logical/builder.py | 18 +- daft/table/micropartition.py | 6 + daft/table/table.py | 5 + daft/table/table_io.py | 3 +- src/daft-micropartition/src/micropartition.rs | 198 +++++++----- src/daft-micropartition/src/ops/agg.rs | 4 +- .../src/ops/cast_to_schema.rs | 4 +- src/daft-micropartition/src/ops/concat.rs | 2 +- .../src/ops/eval_expressions.rs | 4 +- src/daft-micropartition/src/ops/filter.rs | 2 +- src/daft-micropartition/src/ops/join.rs | 2 +- src/daft-micropartition/src/ops/partition.rs | 2 +- src/daft-micropartition/src/ops/slice.rs | 2 +- src/daft-micropartition/src/ops/take.rs | 8 +- src/daft-micropartition/src/python.rs | 50 ++- src/daft-plan/Cargo.toml | 1 + src/daft-plan/src/builder.rs | 60 +++- src/daft-plan/src/lib.rs | 16 +- src/daft-plan/src/logical_ops/source.rs | 21 +- .../src/optimization/rules/push_down_limit.rs | 35 ++- src/daft-plan/src/physical_ops/csv.rs | 6 +- src/daft-plan/src/physical_ops/json.rs | 6 +- src/daft-plan/src/physical_ops/mod.rs | 2 + src/daft-plan/src/physical_ops/parquet.rs | 2 +- src/daft-plan/src/physical_ops/scan.rs | 21 ++ src/daft-plan/src/physical_plan.rs | 29 +- src/daft-plan/src/planner.rs | 28 +- src/daft-plan/src/source_info/mod.rs | 27 +- src/daft-plan/src/test/mod.rs | 9 +- src/daft-scan/Cargo.toml | 3 + src/daft-scan/src/anonymous.rs | 74 +++-- .../src}/file_format.rs | 68 ++++- src/daft-scan/src/glob.rs | 120 ++++---- src/daft-scan/src/lib.rs | 288 +++++++++++++++--- .../src}/py_object_serde.rs | 11 +- src/daft-scan/src/python.rs | 191 +++++++++++- .../src}/storage_config.rs | 17 +- src/daft-stats/src/partition_spec.rs | 2 +- src/daft-stats/src/table_metadata.rs | 2 +- .../iceberg/docker-compose/docker-compose.yml | 4 + tests/table/table_io/test_csv.py | 2 +- tests/table/table_io/test_parquet.py | 2 +- 52 files changed, 1276 insertions(+), 399 deletions(-) create mode 100644 daft/iceberg/__init__.py create mode 100644 daft/iceberg/iceberg_scan.py create mode 100644 daft/io/scan.py create mode 100644 src/daft-plan/src/physical_ops/scan.rs rename src/{daft-plan/src/source_info => daft-scan/src}/file_format.rs (74%) rename src/{daft-plan/src/source_info => daft-scan/src}/py_object_serde.rs (88%) rename src/{daft-plan/src/source_info => daft-scan/src}/storage_config.rs (91%) diff --git a/Cargo.lock b/Cargo.lock index 5763bfaebf..92561e133e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1269,6 +1269,7 @@ dependencies = [ "common-io-config", "daft-core", "daft-dsl", + "daft-scan", "daft-table", "indexmap 2.0.2", "log", @@ -1284,7 +1285,9 @@ dependencies = [ name = "daft-scan" version = "0.1.10" dependencies = [ + "bincode", "common-error", + "common-io-config", "daft-core", "daft-csv", "daft-dsl", @@ -1295,6 +1298,7 @@ dependencies = [ "pyo3", "pyo3-log", "serde", + "serde_json", "snafu", "tokio", ] diff --git a/daft/daft.pyi b/daft/daft.pyi index ecc1cb11d0..b412bc0cda 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -178,10 +178,7 @@ class ParquetSourceConfig: Configuration of a Parquet data source. """ - # Whether or not to use a multithreaded tokio runtime for processing I/O - multithreaded_io: bool - - def __init__(self, multithreaded_io: bool): ... + def __init__(self, coerce_int96_timestamp_unit: PyTimeUnit | None = None, row_groups: list[int] | None = None): ... class CsvSourceConfig: """ @@ -339,9 +336,11 @@ class NativeStorageConfig: Storage configuration for the Rust-native I/O layer. """ + # Whether or not to use a multithreaded tokio runtime for processing I/O + multithreaded_io: bool io_config: IOConfig - def __init__(self, io_config: IOConfig | None = None): ... + def __init__(self, multithreaded_io: bool, io_config: IOConfig | None = None): ... class PythonStorageConfig: """ @@ -374,6 +373,48 @@ class StorageConfig: @property def config(self) -> NativeStorageConfig | PythonStorageConfig: ... +class ScanTask: + """ + A scan task for reading data from an external source. + """ + + ... + +class ScanTaskBatch: + """ + A batch of scan tasks for reading data from an external source. + """ + + @staticmethod + def from_scan_tasks(scan_tasks: list[ScanTask]) -> ScanTaskBatch: + """ + Create a scan task batch from a list of scan tasks. + """ + ... + def num_rows(self) -> int: + """ + Get number of rows that will be scanned by all tasks in this batch. + """ + ... + def size_bytes(self) -> int: + """ + Get number of bytes that will be scanned by all tasks in this batch. + """ + ... + +class ScanOperatorHandle: + """ + A handle to a scan operator. + """ + + @staticmethod + def anonymous_scan( + files: list[str], + schema: PySchema, + file_format_config: FileFormatConfig, + storage_config: StorageConfig, + ) -> ScanOperatorHandle: ... + def read_parquet( uri: str, columns: list[str] | None = None, @@ -722,6 +763,8 @@ class PyMicroPartition: @staticmethod def empty(schema: PySchema | None = None) -> PyMicroPartition: ... @staticmethod + def from_scan_task_batch(scan_task_batch: ScanTaskBatch) -> PyMicroPartition: ... + @staticmethod def from_tables(tables: list[PyTable]) -> PyMicroPartition: ... @staticmethod def from_arrow_record_batches(record_batches: list[pyarrow.RecordBatch], schema: PySchema) -> PyMicroPartition: ... @@ -814,6 +857,10 @@ class LogicalPlanBuilder: partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, num_partitions: int ) -> LogicalPlanBuilder: ... @staticmethod + def table_scan_with_scan_operator( + scan_operator: ScanOperatorHandle, schema_hint: PySchema | None + ) -> LogicalPlanBuilder: ... + @staticmethod def table_scan( file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig ) -> LogicalPlanBuilder: ... diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index da03267d5e..998bcbb2df 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -404,7 +404,6 @@ def _handle_tabular_files_scan( schema=self.schema, storage_config=self.storage_config, read_options=read_options, - multithreaded_io=format_config.multithreaded_io, ) for fp in filepaths ] diff --git a/daft/execution/rust_physical_plan_shim.py b/daft/execution/rust_physical_plan_shim.py index b0aa071d47..2e2cd8bcea 100644 --- a/daft/execution/rust_physical_plan_shim.py +++ b/daft/execution/rust_physical_plan_shim.py @@ -1,5 +1,6 @@ from __future__ import annotations +from dataclasses import dataclass from typing import Iterator, TypeVar, cast from daft.daft import ( @@ -10,17 +11,61 @@ PySchema, PyTable, ResourceRequest, + ScanTask, + ScanTaskBatch, StorageConfig, ) from daft.execution import execution_step, physical_plan from daft.expressions import Expression, ExpressionsProjection from daft.logical.map_partition_ops import MapPartitionOp from daft.logical.schema import Schema +from daft.runners.partitioning import PartialPartitionMetadata from daft.table import Table PartitionT = TypeVar("PartitionT") +def scan_with_tasks( + scan_tasks: Iterator[ScanTask], +) -> physical_plan.InProgressPhysicalPlan[PartitionT]: + """child_plan represents partitions with filenames. + + Yield a plan to read those filenames. + """ + # TODO(Clark): Bundle scan tasks into single-instruction bulk reads. + for scan_task in scan_tasks: + scan_task_batch = ScanTaskBatch.from_scan_tasks([scan_task]) + scan_step = execution_step.PartitionTaskBuilder[PartitionT](inputs=[], partial_metadatas=None,).add_instruction( + instruction=ScanWithTask(scan_task_batch), + # Set the filesize as the memory request. + # (Note: this is very conservative; file readers empirically use much more peak memory than 1x file size.) + resource_request=ResourceRequest(memory_bytes=scan_task_batch.size_bytes()), + ) + yield scan_step + + +@dataclass(frozen=True) +class ScanWithTask(execution_step.SingleOutputInstruction): + scan_task_batch: ScanTaskBatch + + def run(self, inputs: list[Table]) -> list[Table]: + return self._scan(inputs) + + def _scan(self, inputs: list[Table]) -> list[Table]: + assert len(inputs) == 0 + return [Table._from_scan_task_batch(self.scan_task_batch)] + + def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]: + assert len(input_metadatas) == 0 + + return [ + PartialPartitionMetadata( + num_rows=self.scan_task_batch.num_rows(), + size_bytes=None, + ) + ] + + def tabular_scan( schema: PySchema, columns_to_read: list[str] | None, diff --git a/daft/iceberg/__init__.py b/daft/iceberg/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/daft/iceberg/iceberg_scan.py b/daft/iceberg/iceberg_scan.py new file mode 100644 index 0000000000..eb96364d13 --- /dev/null +++ b/daft/iceberg/iceberg_scan.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.io.pyarrow import schema_to_pyarrow +from pyiceberg.partitioning import PartitionField as IcebergPartitionField +from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec +from pyiceberg.schema import Schema as IcebergSchema +from pyiceberg.table import Table + +from daft.datatype import DataType +from daft.expressions.expressions import col +from daft.io.scan import PartitionField, ScanOperator +from daft.logical.schema import Field, Schema + + +def _iceberg_partition_field_to_daft_partition_field( + iceberg_schema: IcebergSchema, pfield: IcebergPartitionField +) -> PartitionField: + name = pfield.name + source_id = pfield.source_id + source_field = iceberg_schema.find_field(source_id) + source_name = source_field.name + daft_field = Field.create( + source_name, DataType.from_arrow_type(schema_to_pyarrow(iceberg_schema.find_type(source_name))) + ) + transform = pfield.transform + iceberg_result_type = transform.result_type(source_field.field_type) + arrow_result_type = schema_to_pyarrow(iceberg_result_type) + daft_result_type = DataType.from_arrow_type(arrow_result_type) + result_field = Field.create(name, daft_result_type) + + from pyiceberg.transforms import ( + DayTransform, + HourTransform, + IdentityTransform, + MonthTransform, + YearTransform, + ) + + expr = None + if isinstance(transform, IdentityTransform): + expr = col(source_name) + if source_name != name: + expr = expr.alias(name) + elif isinstance(transform, YearTransform): + expr = col(source_name).dt.year().alias(name) + elif isinstance(transform, MonthTransform): + expr = col(source_name).dt.month().alias(name) + elif isinstance(transform, DayTransform): + expr = col(source_name).dt.day().alias(name) + elif isinstance(transform, HourTransform): + raise NotImplementedError("HourTransform not implemented, Please make an issue!") + else: + raise NotImplementedError(f"{transform} not implemented, Please make an issue!") + + assert expr is not None + return PartitionField(result_field, daft_field, transform=expr) + + +def iceberg_partition_spec_to_fields(iceberg_schema: IcebergSchema, spec: IcebergPartitionSpec) -> list[PartitionField]: + return [_iceberg_partition_field_to_daft_partition_field(iceberg_schema, field) for field in spec.fields] + + +class IcebergScanOperator(ScanOperator): + def __init__(self, iceberg_table: Table) -> None: + super().__init__() + self._table = iceberg_table + arrow_schema = schema_to_pyarrow(iceberg_table.schema()) + self._schema = Schema.from_pyarrow_schema(arrow_schema) + self._partition_keys = iceberg_partition_spec_to_fields(self._table.schema(), self._table.spec()) + + def schema(self) -> Schema: + return self._schema + + def partitioning_keys(self) -> list[PartitionField]: + return self._partition_keys + + +def catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + +cat = catalog() +tab = cat.load_table("default.test_partitioned_by_years") +ice = IcebergScanOperator(tab) diff --git a/daft/io/_csv.py b/daft/io/_csv.py index babb6c4c35..ff9ea0fd48 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -70,7 +70,7 @@ def read_csv( ) file_format_config = FileFormatConfig.from_csv_config(csv_config) if use_native_downloader: - storage_config = StorageConfig.native(NativeStorageConfig(io_config)) + storage_config = StorageConfig.native(NativeStorageConfig(True, io_config)) else: storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config)) builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config) diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index e14b751870..43dfa4053b 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -54,13 +54,9 @@ def read_parquet( # This is because each Ray worker process receives its own pool of thread workers and connections multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io - file_format_config = FileFormatConfig.from_parquet_config( - ParquetSourceConfig( - multithreaded_io=multithreaded_io, - ) - ) + file_format_config = FileFormatConfig.from_parquet_config(ParquetSourceConfig()) if use_native_downloader: - storage_config = StorageConfig.native(NativeStorageConfig(io_config)) + storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config)) else: storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config)) diff --git a/daft/io/common.py b/daft/io/common.py index 7709d9d831..44ee052bc0 100644 --- a/daft/io/common.py +++ b/daft/io/common.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from typing import TYPE_CHECKING from daft.context import get_context @@ -7,6 +8,7 @@ FileFormatConfig, NativeStorageConfig, PythonStorageConfig, + ScanOperatorHandle, StorageConfig, ) from daft.datatype import DataType @@ -43,21 +45,41 @@ def _get_tabular_files_scan( io_config = storage_config.config.io_config else: raise NotImplementedError(f"Tabular scan with config not implemented: {storage_config.config}") + # TODO(Clark): Move this flag check to the global Daft context. + if os.getenv("DAFT_MICROPARTITIONS", "0") == "1": + # TODO(Clark): Add globbing scan, once implemented. + runner_io = get_context().runner().runner_io() + file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config) - runner_io = get_context().runner().runner_io() - file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config) - - # Infer schema if no hints provided - inferred_or_provided_schema = ( - schema_hint - if schema_hint is not None - else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config) - ) - # Construct plan - builder = LogicalPlanBuilder.from_tabular_scan( - file_infos=file_infos, - schema=inferred_or_provided_schema, - file_format_config=file_format_config, - storage_config=storage_config, - ) - return builder + # Infer schema if no hints provided + inferred_or_provided_schema = ( + schema_hint + if schema_hint is not None + else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config) + ) + scan_op = ScanOperatorHandle.anonymous_scan( + file_infos.file_paths, inferred_or_provided_schema._schema, file_format_config, storage_config + ) + builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator( + scan_operator=scan_op, + schema_hint=inferred_or_provided_schema, + ) + return builder + else: + runner_io = get_context().runner().runner_io() + file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config) + + # Infer schema if no hints provided + inferred_or_provided_schema = ( + schema_hint + if schema_hint is not None + else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config) + ) + # Construct plan + builder = LogicalPlanBuilder.from_tabular_scan( + file_infos=file_infos, + schema=inferred_or_provided_schema, + file_format_config=file_format_config, + storage_config=storage_config, + ) + return builder diff --git a/daft/io/scan.py b/daft/io/scan.py new file mode 100644 index 0000000000..afaaf7e08f --- /dev/null +++ b/daft/io/scan.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import abc +from dataclasses import dataclass + +from daft.expressions.expressions import Expression +from daft.logical.schema import Field, Schema + + +@dataclass(frozen=True) +class ScanTask: + file_type: str + columns: list[str] | None + limit: int | None + + +@dataclass(frozen=True) +class PartitionField: + field: Field + source_field: Field + transform: Expression + + +class ScanOperator(abc.ABC): + @abc.abstractmethod + def schema(self) -> Schema: + raise NotImplementedError() + + @abc.abstractmethod + def partitioning_keys(self) -> list[PartitionField]: + raise NotImplementedError() + + # @abc.abstractmethod + # def num_partitions(self) -> int: + # raise NotImplementedError() + + # @abc.abstractmethod + # def filter(self, predicate: Expression) -> tuple[bool, ScanOperator]: + # raise NotImplementedError() + + # @abc.abstractmethod + # def limit(self, num: int) -> ScanOperator: + # raise NotImplementedError() + + # @abc.abstractmethod + # def select(self, columns: list[str]) -> ScanOperator: + # raise NotImplementedError() + + # @abc.abstractmethod + # def to_scan_tasks(self) -> Iterator[Any]: + # raise NotImplementedError() diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 311c5e29fd..2447e54333 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -5,7 +5,12 @@ from daft.daft import CountMode, FileFormat, FileFormatConfig, FileInfos, JoinType from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder -from daft.daft import PartitionScheme, ResourceRequest, StorageConfig +from daft.daft import ( + PartitionScheme, + ResourceRequest, + ScanOperatorHandle, + StorageConfig, +) from daft.expressions import Expression, col from daft.logical.schema import Schema from daft.runners.partitioning import PartitionCacheEntry @@ -66,6 +71,17 @@ def from_in_memory_scan( builder = _LogicalPlanBuilder.in_memory_scan(partition.key, partition, schema._schema, num_partitions) return cls(builder) + @classmethod + def from_tabular_scan_with_scan_operator( + cls, + *, + scan_operator: ScanOperatorHandle, + schema_hint: Schema | None, + ) -> LogicalPlanBuilder: + pyschema = schema_hint._schema if schema_hint is not None else None + builder = _LogicalPlanBuilder.table_scan_with_scan_operator(scan_operator, pyschema) + return cls(builder) + @classmethod def from_tabular_scan( cls, diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index af25430725..5c52605f59 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -8,6 +8,7 @@ from daft.daft import IOConfig, JoinType from daft.daft import PyMicroPartition as _PyMicroPartition from daft.daft import PyTable as _PyTable +from daft.daft import ScanTaskBatch as _ScanTaskBatch from daft.datatype import DataType, TimeUnit from daft.expressions import Expression, ExpressionsProjection from daft.logical.schema import Schema @@ -64,6 +65,11 @@ def empty(schema: Schema | None = None) -> MicroPartition: pyt = _PyMicroPartition.empty(None) if schema is None else _PyMicroPartition.empty(schema._schema) return MicroPartition._from_pymicropartition(pyt) + @staticmethod + def _from_scan_task_batch(scan_task_batch: _ScanTaskBatch) -> MicroPartition: + assert isinstance(scan_task_batch, _ScanTaskBatch) + return MicroPartition._from_pymicropartition(_PyMicroPartition.from_scan_task_batch(scan_task_batch)) + @staticmethod def _from_pytable(pyt: _PyTable) -> MicroPartition: assert isinstance(pyt, _PyTable) diff --git a/daft/table/table.py b/daft/table/table.py index d4e28fb80a..f02c7c8191 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -8,6 +8,7 @@ from daft.arrow_utils import ensure_table from daft.daft import JoinType from daft.daft import PyTable as _PyTable +from daft.daft import ScanTaskBatch as _ScanTaskBatch from daft.daft import read_csv as _read_csv from daft.daft import read_parquet as _read_parquet from daft.daft import read_parquet_bulk as _read_parquet_bulk @@ -78,6 +79,10 @@ def empty(schema: Schema | None = None) -> Table: pyt = _PyTable.empty(None) if schema is None else _PyTable.empty(schema._schema) return Table._from_pytable(pyt) + @staticmethod + def _from_scan_task_batch(_: _ScanTaskBatch) -> Table: + raise NotImplementedError("_from_scan_task_batch is not implemented for legacy Python Table.") + @staticmethod def _from_pytable(pyt: _PyTable) -> Table: assert isinstance(pyt, _PyTable) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 90eaa3d5e3..bff7c028b2 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -106,7 +106,6 @@ def read_parquet( storage_config: StorageConfig | None = None, read_options: TableReadOptions = TableReadOptions(), parquet_options: TableParseParquetOptions = TableParseParquetOptions(), - multithreaded_io: bool | None = None, ) -> Table: """Reads a Table from a Parquet file @@ -131,7 +130,7 @@ def read_parquet( num_rows=read_options.num_rows, io_config=config.io_config, coerce_int96_timestamp_unit=parquet_options.coerce_int96_timestamp_unit, - multithreaded_io=multithreaded_io, + multithreaded_io=config.multithreaded_io, ) return _cast_table_to_schema(tbl, read_options=read_options, schema=schema) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index b3b45d44e3..c43cb2dc4b 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -11,9 +11,11 @@ use daft_csv::read::read_csv; use daft_parquet::read::{ read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions, }; +use daft_scan::file_format::{FileFormatConfig, ParquetSourceConfig}; +use daft_scan::storage_config::{NativeStorageConfig, StorageConfig}; +use daft_scan::{DataFileSource, ScanTaskBatch}; use daft_table::Table; -use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::DaftCoreComputeSnafu; @@ -22,33 +24,18 @@ use daft_io::{IOConfig, IOStatsRef}; use daft_stats::TableMetadata; use daft_stats::TableStatistics; -#[derive(Clone, Serialize, Deserialize)] -enum FormatParams { - Parquet { - row_groups: Option>>, - inference_options: ParquetSchemaInferenceOptions, - }, -} - -#[derive(Clone, Serialize, Deserialize)] -pub(crate) struct DeferredLoadingParams { - format_params: FormatParams, - urls: Vec, - io_config: Arc, - multithreaded_io: bool, - limit: Option, - columns: Option>, -} pub(crate) enum TableState { - Unloaded(DeferredLoadingParams), + // Unloaded(DeferredLoadingParams), + Unloaded(Arc), Loaded(Arc>), } impl Display for TableState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - TableState::Unloaded(params) => { - write!(f, "TableState: Unloaded. To load from: {:?}", params.urls) + TableState::Unloaded(_) => { + write!(f, "TableState: Unloaded") + // write!(f, "TableState: Unloaded. To load from: {:?}", params.urls) } TableState::Loaded(tables) => { writeln!(f, "TableState: Loaded. {} tables", tables.len())?; @@ -63,7 +50,7 @@ impl Display for TableState { pub(crate) struct MicroPartition { pub(crate) schema: SchemaRef, pub(crate) state: Mutex, - pub(crate) metadata: TableMetadata, + pub(crate) metadata: Option, pub(crate) statistics: Option, } @@ -71,12 +58,9 @@ impl MicroPartition { pub fn new( schema: SchemaRef, state: TableState, - metadata: TableMetadata, + metadata: Option, statistics: Option, ) -> Self { - if let TableState::Unloaded(..) = state && statistics.is_none() { - panic!("MicroPartition does not allow the Table without Statistics") - } if let Some(stats) = &statistics { if stats.columns.len() != schema.fields.len() { panic!("MicroPartition: TableStatistics and Schema have differing lengths") @@ -99,13 +83,21 @@ impl MicroPartition { } } + pub fn from_scan_task_batch(scan_task_batch: Arc) -> Self { + let schema = scan_task_batch.schema.clone(); + let metadata = scan_task_batch.metadata.clone(); + let statistics = scan_task_batch.statistics.clone(); + let state = TableState::Unloaded(scan_task_batch); + Self::new(schema, state, metadata, statistics) + } + pub fn empty(schema: Option) -> Self { let schema = schema.unwrap_or(Schema::empty().into()); Self::new( schema, TableState::Loaded(Arc::new(vec![])), - TableMetadata { length: 0 }, + Some(TableMetadata { length: 0 }), None, ) } @@ -115,7 +107,7 @@ impl MicroPartition { } pub fn len(&self) -> usize { - self.metadata.length + self.metadata.as_ref().map(|m| m.length).unwrap_or(0) } pub fn size_bytes(&self) -> DaftResult { @@ -146,42 +138,68 @@ impl MicroPartition { io_stats: Option, ) -> crate::Result>> { let mut guard = self.state.lock().unwrap(); - if let TableState::Unloaded(params) = guard.deref() { - let runtime_handle = daft_io::get_runtime(params.multithreaded_io).unwrap(); - let _rt_guard = runtime_handle.enter(); - - let table_values: Vec<_> = match ¶ms.format_params { - FormatParams::Parquet { + if let TableState::Unloaded(scan_task_batch) = guard.deref() { + let table_values: Vec<_> = match scan_task_batch.file_format_config.as_ref() { + FileFormatConfig::Parquet(ParquetSourceConfig { + coerce_int96_timestamp_unit, + // TODO(Clark): Support different row group specification per file. row_groups, - inference_options, - } => { - let io_client = - daft_io::get_io_client(params.multithreaded_io, params.io_config.clone()) - .unwrap(); - let column_names = params - .columns - .as_ref() - .map(|v| v.iter().map(|s| s.as_ref()).collect::>()); - let urls = params.urls.iter().map(|s| s.as_str()).collect::>(); - let all_tables = daft_parquet::read::read_parquet_bulk( - urls.as_slice(), - column_names.as_deref(), - None, - params.limit, - row_groups.clone(), - io_client.clone(), - io_stats, - 8, - runtime_handle, - inference_options, - ) - .context(DaftCoreComputeSnafu)?; - all_tables - .into_iter() - .map(|t| t.cast_to_schema(&self.schema)) - .collect::>>() - .context(DaftCoreComputeSnafu)? - } + }) => match scan_task_batch.storage_config.as_ref() { + StorageConfig::Native(native_storage_config) => { + let runtime_handle = + daft_io::get_runtime(native_storage_config.multithreaded_io).unwrap(); + let _rt_guard = runtime_handle.enter(); + + let io_config = Arc::new( + native_storage_config + .io_config + .as_ref() + .cloned() + .unwrap_or_default(), + ); + let io_client = daft_io::get_io_client( + native_storage_config.multithreaded_io, + io_config, + ) + .unwrap(); + let column_names = scan_task_batch + .columns + .as_ref() + .map(|v| v.iter().map(|s| s.as_ref()).collect::>()); + let urls = scan_task_batch + .sources + .iter() + .map(|s| s.get_path()) + .collect::>(); + let inference_options = + ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit)); + let all_tables = daft_parquet::read::read_parquet_bulk( + urls.as_slice(), + column_names.as_deref(), + None, + scan_task_batch.limit, + row_groups + .as_ref() + .map(|row_groups| vec![row_groups.clone(); urls.len()]), + io_client.clone(), + io_stats, + 8, + runtime_handle, + &inference_options, + ) + .context(DaftCoreComputeSnafu)?; + all_tables + .into_iter() + .map(|t| t.cast_to_schema(&self.schema)) + .collect::>>() + .context(DaftCoreComputeSnafu)? + } + #[cfg(feature = "python")] + StorageConfig::Python(_) => { + todo!("TODO: Implement Python I/O backend for MicroPartitions.") + } + }, + _ => todo!("TODO: Implement MicroPartition reads for other file formats."), }; let casted_table_values = table_values .iter() @@ -315,7 +333,7 @@ pub(crate) fn read_csv_into_micropartition( Ok(MicroPartition::new( unioned_schema.clone(), TableState::Loaded(Arc::new(tables)), - TableMetadata { length: total_len }, + Some(TableMetadata { length: total_len }), None, )) } @@ -401,19 +419,35 @@ pub(crate) fn read_parquet_into_micropartition( if let Some(stats) = stats { let owned_urls = uris.iter().map(|s| s.to_string()).collect::>(); - let owned_columns = columns.map(|c| c.iter().map(|s| s.to_string()).collect::>()); - - let params = DeferredLoadingParams { - format_params: FormatParams::Parquet { - row_groups, - inference_options: *schema_infer_options, - }, - urls: owned_urls, - io_config: io_config.clone(), - multithreaded_io, - limit: num_rows, - columns: owned_columns, - }; + + let daft_schema = Arc::new(daft_schema); + let scan_task_batch = ScanTaskBatch::new( + owned_urls + .into_iter() + .map(|url| DataFileSource::AnonymousDataFile { + path: url, + metadata: None, + partition_spec: None, + statistics: None, + }) + .collect::>(), + FileFormatConfig::Parquet(ParquetSourceConfig { + coerce_int96_timestamp_unit: schema_infer_options.coerce_int96_timestamp_unit, + row_groups: None, + }) + .into(), + daft_schema.clone(), + StorageConfig::Native( + NativeStorageConfig::new_internal( + multithreaded_io, + Some(io_config.as_ref().clone()), + ) + .into(), + ) + .into(), + columns.map(|cols| Arc::new(cols.iter().map(|v| v.to_string()).collect::>())), + num_rows, + ); let exprs = daft_schema .fields @@ -421,12 +455,12 @@ pub(crate) fn read_parquet_into_micropartition( .map(|n| daft_dsl::col(n.as_str())) .collect::>(); // use schema to update stats - let stats = stats.eval_expression_list(exprs.as_slice(), &daft_schema)?; + let stats = stats.eval_expression_list(exprs.as_slice(), daft_schema.as_ref())?; Ok(MicroPartition::new( - Arc::new(daft_schema), - TableState::Unloaded(params), - TableMetadata { length: total_rows }, + daft_schema, + TableState::Unloaded(scan_task_batch.into()), + Some(TableMetadata { length: total_rows }), Some(stats), )) } else { @@ -449,7 +483,7 @@ pub(crate) fn read_parquet_into_micropartition( Ok(MicroPartition::new( Arc::new(daft_schema), TableState::Loaded(all_tables.into()), - TableMetadata { length: total_rows }, + Some(TableMetadata { length: total_rows }), None, )) } diff --git a/src/daft-micropartition/src/ops/agg.rs b/src/daft-micropartition/src/ops/agg.rs index 117395c0fc..886f35c521 100644 --- a/src/daft-micropartition/src/ops/agg.rs +++ b/src/daft-micropartition/src/ops/agg.rs @@ -18,7 +18,7 @@ impl MicroPartition { Ok(MicroPartition::new( agged.schema.clone(), TableState::Loaded(vec![agged].into()), - TableMetadata { length: agged_len }, + Some(TableMetadata { length: agged_len }), None, )) } @@ -28,7 +28,7 @@ impl MicroPartition { Ok(MicroPartition::new( agged.schema.clone(), TableState::Loaded(vec![agged].into()), - TableMetadata { length: agged_len }, + Some(TableMetadata { length: agged_len }), None, )) } diff --git a/src/daft-micropartition/src/ops/cast_to_schema.rs b/src/daft-micropartition/src/ops/cast_to_schema.rs index 22a6bc5dc8..ffde26053b 100644 --- a/src/daft-micropartition/src/ops/cast_to_schema.rs +++ b/src/daft-micropartition/src/ops/cast_to_schema.rs @@ -20,9 +20,9 @@ impl MicroPartition { let guard = self.state.lock().unwrap(); match guard.deref() { // Replace schema if Unloaded, which should be applied when data is lazily loaded - TableState::Unloaded(params) => Ok(MicroPartition::new( + TableState::Unloaded(scan_task_batch) => Ok(MicroPartition::new( schema.clone(), - TableState::Unloaded(params.clone()), + TableState::Unloaded(scan_task_batch.clone()), self.metadata.clone(), pruned_statistics, )), diff --git a/src/daft-micropartition/src/ops/concat.rs b/src/daft-micropartition/src/ops/concat.rs index 9901a02d35..2f73e52513 100644 --- a/src/daft-micropartition/src/ops/concat.rs +++ b/src/daft-micropartition/src/ops/concat.rs @@ -48,7 +48,7 @@ impl MicroPartition { Ok(MicroPartition { schema: mps.first().unwrap().schema.clone(), state: Mutex::new(TableState::Loaded(all_tables.into())), - metadata: TableMetadata { length: new_len }, + metadata: Some(TableMetadata { length: new_len }), statistics: all_stats, }) } diff --git a/src/daft-micropartition/src/ops/eval_expressions.rs b/src/daft-micropartition/src/ops/eval_expressions.rs index f4f8a1ee81..b4d22ac874 100644 --- a/src/daft-micropartition/src/ops/eval_expressions.rs +++ b/src/daft-micropartition/src/ops/eval_expressions.rs @@ -51,7 +51,7 @@ impl MicroPartition { Ok(MicroPartition::new( expected_schema.into(), TableState::Loaded(Arc::new(evaluated_tables)), - TableMetadata { length: self.len() }, + Some(TableMetadata { length: self.len() }), eval_stats, )) } @@ -92,7 +92,7 @@ impl MicroPartition { Ok(MicroPartition::new( Arc::new(expected_schema), TableState::Loaded(Arc::new(evaluated_tables)), - TableMetadata { length: new_len }, + Some(TableMetadata { length: new_len }), eval_stats, )) } diff --git a/src/daft-micropartition/src/ops/filter.rs b/src/daft-micropartition/src/ops/filter.rs index 91cec19cca..ee5d129d62 100644 --- a/src/daft-micropartition/src/ops/filter.rs +++ b/src/daft-micropartition/src/ops/filter.rs @@ -42,7 +42,7 @@ impl MicroPartition { Ok(Self::new( self.schema.clone(), TableState::Loaded(tables.into()), - TableMetadata { length: new_len }, + Some(TableMetadata { length: new_len }), self.statistics.clone(), // update these values based off the filter we just ran )) } diff --git a/src/daft-micropartition/src/ops/join.rs b/src/daft-micropartition/src/ops/join.rs index 5e49734d80..377b714bc7 100644 --- a/src/daft-micropartition/src/ops/join.rs +++ b/src/daft-micropartition/src/ops/join.rs @@ -47,7 +47,7 @@ impl MicroPartition { Ok(MicroPartition::new( join_schema.into(), TableState::Loaded(vec![joined_table].into()), - TableMetadata { length: joined_len }, + Some(TableMetadata { length: joined_len }), None, )) } diff --git a/src/daft-micropartition/src/ops/partition.rs b/src/daft-micropartition/src/ops/partition.rs index 2b90216ff8..9d84aca901 100644 --- a/src/daft-micropartition/src/ops/partition.rs +++ b/src/daft-micropartition/src/ops/partition.rs @@ -37,7 +37,7 @@ impl MicroPartition { MicroPartition::new( self.schema.clone(), TableState::Loaded(Arc::new(v)), - TableMetadata { length: new_len }, + Some(TableMetadata { length: new_len }), self.statistics.clone(), ) }) diff --git a/src/daft-micropartition/src/ops/slice.rs b/src/daft-micropartition/src/ops/slice.rs index 63e74bffbb..b25d33cb38 100644 --- a/src/daft-micropartition/src/ops/slice.rs +++ b/src/daft-micropartition/src/ops/slice.rs @@ -39,7 +39,7 @@ impl MicroPartition { Ok(MicroPartition { schema: self.schema.clone(), state: TableState::Loaded(slices_tables.into()).into(), - metadata: TableMetadata { length: new_len }, + metadata: Some(TableMetadata { length: new_len }), statistics: self.statistics.clone(), }) } diff --git a/src/daft-micropartition/src/ops/take.rs b/src/daft-micropartition/src/ops/take.rs index 7212630514..4afa5ff502 100644 --- a/src/daft-micropartition/src/ops/take.rs +++ b/src/daft-micropartition/src/ops/take.rs @@ -18,7 +18,7 @@ impl MicroPartition { Ok(Self::new( self.schema.clone(), TableState::Loaded(Arc::new(vec![taken])), - TableMetadata { length: idx.len() }, + Some(TableMetadata { length: idx.len() }), self.statistics.clone(), )) } @@ -27,7 +27,7 @@ impl MicroPartition { Ok(Self::new( self.schema.clone(), TableState::Loaded(Arc::new(vec![taken])), - TableMetadata { length: idx.len() }, + Some(TableMetadata { length: idx.len() }), self.statistics.clone(), )) } @@ -46,7 +46,7 @@ impl MicroPartition { Ok(Self::new( self.schema.clone(), TableState::Loaded(Arc::new(vec![taken])), - TableMetadata { length: taken_len }, + Some(TableMetadata { length: taken_len }), self.statistics.clone(), )) } @@ -64,7 +64,7 @@ impl MicroPartition { Ok(Self::new( self.schema.clone(), TableState::Loaded(Arc::new(vec![taken])), - TableMetadata { length: taken_len }, + Some(TableMetadata { length: taken_len }), self.statistics.clone(), )) } diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 4f430eb237..fab773f0f6 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -1,31 +1,20 @@ -#![allow(unused)] // MAKE SURE TO REMOVE THIS - -use std::{ - ops::Deref, - sync::{Arc, Mutex}, -}; +use std::{ops::Deref, sync::Arc}; use common_error::DaftResult; use daft_core::{ - ffi, python::{datatype::PyTimeUnit, schema::PySchema, PySeries}, schema::Schema, Series, }; use daft_dsl::python::PyExpr; -use daft_io::{get_io_client, python::IOConfig, IOStatsContext}; +use daft_io::{python::IOConfig, IOStatsContext}; use daft_parquet::read::ParquetSchemaInferenceOptions; +use daft_scan::{python::pylib::PyScanTaskBatch, ScanTaskBatch}; use daft_stats::TableStatistics; -use daft_table::{python::PyTable, Table}; -use indexmap::IndexMap; -use pyo3::{ - exceptions::PyValueError, - prelude::*, - types::{PyBytes, PyDict, PyList, PyTuple}, - Python, -}; +use daft_table::python::PyTable; +use pyo3::{exceptions::PyValueError, prelude::*, types::PyBytes, Python}; -use crate::micropartition::{DeferredLoadingParams, MicroPartition, TableState}; +use crate::micropartition::{MicroPartition, TableState}; use daft_stats::TableMetadata; use pyo3::PyTypeInfo; @@ -77,6 +66,11 @@ impl PyMicroPartition { } // Creation Methods + #[staticmethod] + pub fn from_scan_task_batch(scan_task_batch: PyScanTaskBatch) -> PyResult { + Ok(MicroPartition::from_scan_task_batch(scan_task_batch.into()).into()) + } + #[staticmethod] pub fn from_tables(tables: Vec) -> PyResult { match &tables[..] { @@ -86,9 +80,9 @@ impl PyMicroPartition { Ok(MicroPartition::new( first.table.schema.clone(), TableState::Loaded(tables.clone()), - TableMetadata { + Some(TableMetadata { length: tables.iter().map(|t| t.len()).sum(), - }, + }), // Don't compute statistics if data is already materialized None, ) @@ -122,14 +116,14 @@ impl PyMicroPartition { Ok(MicroPartition::new( schema.schema.clone(), TableState::Loaded(Arc::new(tables)), - TableMetadata { length: total_len }, + Some(TableMetadata { length: total_len }), None, ) .into()) } // Export Methods - pub fn to_table(&self, py: Python) -> PyResult { + pub fn to_table(&self) -> PyResult { let concatted = self.inner.concat_or_get()?; match &concatted.as_ref()[..] { [] => PyTable::empty(Some(self.schema()?)), @@ -469,23 +463,23 @@ impl PyMicroPartition { #[staticmethod] pub fn _from_unloaded_table_state( - py: Python, schema_bytes: &PyBytes, - loading_params_bytes: &PyBytes, + loading_scan_task_batch_bytes: &PyBytes, metadata_bytes: &PyBytes, statistics_bytes: &PyBytes, ) -> PyResult { let schema = bincode::deserialize::(schema_bytes.as_bytes()).unwrap(); - let params = - bincode::deserialize::(loading_params_bytes.as_bytes()).unwrap(); + let scan_task_batch = + bincode::deserialize::(loading_scan_task_batch_bytes.as_bytes()) + .unwrap(); let metadata = bincode::deserialize::(metadata_bytes.as_bytes()).unwrap(); let statistics = bincode::deserialize::>(statistics_bytes.as_bytes()).unwrap(); Ok(MicroPartition::new( schema.into(), - TableState::Unloaded(params), - metadata, + TableState::Unloaded(scan_task_batch.into()), + Some(metadata), statistics, ) .into()) @@ -516,7 +510,7 @@ impl PyMicroPartition { Ok(MicroPartition::new( schema.into(), TableState::Loaded(tables.into()), - metadata, + Some(metadata), statistics, ) .into()) diff --git a/src/daft-plan/Cargo.toml b/src/daft-plan/Cargo.toml index 079fd75715..f9c683ef77 100644 --- a/src/daft-plan/Cargo.toml +++ b/src/daft-plan/Cargo.toml @@ -5,6 +5,7 @@ common-error = {path = "../common/error", default-features = false} common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} +daft-scan = {path = "../daft-scan", default-features = false} daft-table = {path = "../daft-table", default-features = false} indexmap = {workspace = true} log = {workspace = true} diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index bd89bb720d..aaa8b0354f 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -7,24 +7,27 @@ use crate::{ planner::plan, sink_info::{OutputFileInfo, SinkInfo}, source_info::{ - ExternalInfo as ExternalSourceInfo, FileFormatConfig, FileInfos as InputFileInfos, - PyStorageConfig, SourceInfo, StorageConfig, + ExternalInfo as ExternalSourceInfo, FileInfos as InputFileInfos, LegacyExternalInfo, + SourceInfo, }, - FileFormat, JoinType, PartitionScheme, PhysicalPlanScheduler, ResourceRequest, + JoinType, PartitionScheme, PhysicalPlanScheduler, ResourceRequest, }; use common_error::{DaftError, DaftResult}; use daft_core::schema::SchemaRef; use daft_core::{datatypes::Field, schema::Schema, DataType}; use daft_dsl::Expr; +use daft_scan::{ + file_format::{FileFormat, FileFormatConfig}, + storage_config::{PyStorageConfig, StorageConfig}, + ScanExternalInfo, ScanOperatorRef, +}; #[cfg(feature = "python")] use { - crate::{ - physical_plan::PhysicalPlan, - source_info::{InMemoryInfo, PyFileFormatConfig}, - }, + crate::{physical_plan::PhysicalPlan, source_info::InMemoryInfo}, daft_core::python::schema::PySchema, daft_dsl::python::PyExpr, + daft_scan::{file_format::PyFileFormatConfig, python::pylib::ScanOperatorHandle}, pyo3::prelude::*, }; @@ -64,6 +67,24 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } + pub fn table_scan_with_scan_operator( + scan_operator: ScanOperatorRef, + schema_hint: Option, + ) -> DaftResult { + let schema = schema_hint.unwrap_or_else(|| scan_operator.schema()); + let partitioning_keys = scan_operator.partitioning_keys(); + let source_info = + SourceInfo::ExternalInfo(ExternalSourceInfo::Scan(ScanExternalInfo::new( + scan_operator.clone(), + schema.clone(), + partitioning_keys.into(), + Default::default(), + ))); + let logical_plan: LogicalPlan = + logical_ops::Source::new(schema.clone(), source_info.into(), None).into(); + Ok(logical_plan.into()) + } + pub fn table_scan( file_infos: InputFileInfos, schema: Arc, @@ -80,12 +101,13 @@ impl LogicalPlanBuilder { storage_config: Arc, limit: Option, ) -> DaftResult { - let source_info = SourceInfo::ExternalInfo(ExternalSourceInfo::new( - schema.clone(), - file_infos.into(), - file_format_config, - storage_config, - )); + let source_info = + SourceInfo::ExternalInfo(ExternalSourceInfo::Legacy(LegacyExternalInfo::new( + schema.clone(), + file_infos.into(), + file_format_config, + storage_config, + ))); let logical_plan: LogicalPlan = logical_ops::Source::new(schema.clone(), source_info.into(), limit).into(); Ok(logical_plan.into()) @@ -265,6 +287,18 @@ impl PyLogicalPlanBuilder { .into()) } + #[staticmethod] + pub fn table_scan_with_scan_operator( + scan_operator: ScanOperatorHandle, + schema_hint: Option, + ) -> PyResult { + Ok(LogicalPlanBuilder::table_scan_with_scan_operator( + scan_operator.into(), + schema_hint.map(|s| s.into()), + )? + .into()) + } + #[staticmethod] pub fn table_scan( file_infos: InputFileInfos, diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index a72214e4d8..ec8b4866a3 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -1,5 +1,6 @@ #![feature(let_chains)] #![feature(assert_matches)] +#![feature(if_let_guard)] mod builder; mod display; @@ -18,18 +19,23 @@ mod source_info; mod test; pub use builder::{LogicalPlanBuilder, PyLogicalPlanBuilder}; +use daft_scan::{ + file_format::{ + CsvSourceConfig, FileFormat, JsonSourceConfig, ParquetSourceConfig, PyFileFormatConfig, + }, + storage_config::{NativeStorageConfig, PyStorageConfig}, +}; pub use join::JoinType; pub use logical_plan::LogicalPlan; pub use partitioning::{PartitionScheme, PartitionSpec}; pub use physical_plan::PhysicalPlanScheduler; pub use resource_request::ResourceRequest; -pub use source_info::{ - CsvSourceConfig, FileFormat, FileInfo, FileInfos, JsonSourceConfig, NativeStorageConfig, - ParquetSourceConfig, PyFileFormatConfig, PyStorageConfig, -}; +pub use source_info::{FileInfo, FileInfos}; #[cfg(feature = "python")] -use {pyo3::prelude::*, source_info::PythonStorageConfig}; +use daft_scan::storage_config::PythonStorageConfig; +#[cfg(feature = "python")] +use pyo3::prelude::*; #[cfg(feature = "python")] pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { diff --git a/src/daft-plan/src/logical_ops/source.rs b/src/daft-plan/src/logical_ops/source.rs index 7e3738dd05..2c3830f0c0 100644 --- a/src/daft-plan/src/logical_ops/source.rs +++ b/src/daft-plan/src/logical_ops/source.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use daft_core::schema::SchemaRef; use daft_dsl::ExprRef; +use daft_scan::ScanExternalInfo; -use crate::source_info::{ExternalInfo, SourceInfo}; +use crate::source_info::{ExternalInfo, LegacyExternalInfo, SourceInfo}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Source { @@ -14,6 +15,8 @@ pub struct Source { /// Information about the source data location. pub source_info: Arc, + // TODO(Clark): Replace these pushdown fields with the Pushdown struct, where the Pushdown struct would exist + // on the LegacyExternalInfo struct in SourceInfo. /// Optional filters to apply to the source data. pub filters: Vec, /// Optional number of rows to read. @@ -56,12 +59,12 @@ impl Source { let mut res = vec![]; match self.source_info.as_ref() { - SourceInfo::ExternalInfo(ExternalInfo { + SourceInfo::ExternalInfo(ExternalInfo::Legacy(LegacyExternalInfo { source_schema, file_infos, file_format_config, storage_config, - }) => { + })) => { res.push(format!("Source: {}", file_format_config.var_name())); res.push(format!( "File paths = [{}]", @@ -71,6 +74,18 @@ impl Source { res.push(format!("Format-specific config = {:?}", file_format_config)); res.push(format!("Storage config = {:?}", storage_config)); } + SourceInfo::ExternalInfo(ExternalInfo::Scan(ScanExternalInfo { + source_schema, + scan_op, + partitioning_keys, + pushdowns, + })) => { + res.push("Source:".to_string()); + res.push(format!("Scan op = {}", scan_op)); + res.push(format!("File schema = {}", source_schema.short_string())); + res.push(format!("Partitioning keys = {:?}", partitioning_keys)); + res.push(format!("Scan pushdowns = {:?}", pushdowns)); + } #[cfg(feature = "python")] SourceInfo::InMemoryInfo(_) => {} } diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 73085c1a88..c015c43d81 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -1,8 +1,13 @@ use std::sync::Arc; use common_error::DaftResult; +use daft_scan::{Pushdowns, ScanExternalInfo}; -use crate::{logical_ops::Limit as LogicalLimit, source_info::SourceInfo, LogicalPlan}; +use crate::{ + logical_ops::Limit as LogicalLimit, + source_info::{ExternalInfo, SourceInfo}, + LogicalPlan, +}; use super::{ApplyOrder, OptimizerRule, Transformed}; @@ -41,20 +46,44 @@ impl OptimizerRule for PushDownLimit { // Limit pushdown is not supported for in-memory sources. #[cfg(feature = "python")] (SourceInfo::InMemoryInfo(_), _) => Ok(Transformed::No(plan)), + + // Legacy external info handling. + // Do not pushdown if Source node is already more limited than `limit` - (SourceInfo::ExternalInfo(_), Some(existing_source_limit)) + (SourceInfo::ExternalInfo(ExternalInfo::Legacy(_)), Some(existing_source_limit)) if (existing_source_limit <= limit) => { Ok(Transformed::No(plan)) } // Pushdown limit into the Source node as a "local" limit - (SourceInfo::ExternalInfo(_), _) => { + (SourceInfo::ExternalInfo(ExternalInfo::Legacy(_)), _) => { let new_source = LogicalPlan::Source(source.with_limit(Some(limit))).into(); let limit_with_local_limited_source = plan.with_new_children(&[new_source]); Ok(Transformed::Yes(limit_with_local_limited_source)) } + + // Scan operator external info handling. + + // Do not pushdown if Source node is already more limited than `limit` + (SourceInfo::ExternalInfo(ExternalInfo::Scan(ScanExternalInfo { pushdowns: Pushdowns { limit: existing_source_limit, .. }, .. })), _) + if let Some(existing_source_limit) = existing_source_limit && existing_source_limit <= &limit => + { + Ok(Transformed::No(plan)) + } + // Pushdown limit into the Source node as a "local" limit + (SourceInfo::ExternalInfo(ExternalInfo::Scan(ScanExternalInfo { scan_op, .. })), _) => { + let new_source = + LogicalPlan::Source(source.with_limit(Some(limit))).into(); + let out_plan = if scan_op.can_absorb_limit() { + // Scan can fully absorb the limit, so we can drop the Limit op from the logical plan. + new_source + } else { + plan.with_new_children(&[new_source]) + }; + Ok(Transformed::Yes(out_plan)) + } } } _ => Ok(Transformed::No(plan)), diff --git a/src/daft-plan/src/physical_ops/csv.rs b/src/daft-plan/src/physical_ops/csv.rs index cccedd45d1..8cb99846b8 100644 --- a/src/daft-plan/src/physical_ops/csv.rs +++ b/src/daft-plan/src/physical_ops/csv.rs @@ -4,7 +4,7 @@ use daft_core::schema::SchemaRef; use daft_dsl::ExprRef; use crate::{ - physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::ExternalInfo, + physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::LegacyExternalInfo, PartitionSpec, }; use serde::{Deserialize, Serialize}; @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct TabularScanCsv { pub projection_schema: SchemaRef, - pub external_info: ExternalInfo, + pub external_info: LegacyExternalInfo, pub partition_spec: Arc, pub limit: Option, pub filters: Vec, @@ -21,7 +21,7 @@ pub struct TabularScanCsv { impl TabularScanCsv { pub(crate) fn new( projection_schema: SchemaRef, - external_info: ExternalInfo, + external_info: LegacyExternalInfo, partition_spec: Arc, limit: Option, filters: Vec, diff --git a/src/daft-plan/src/physical_ops/json.rs b/src/daft-plan/src/physical_ops/json.rs index 0d8fce535f..80249f30b7 100644 --- a/src/daft-plan/src/physical_ops/json.rs +++ b/src/daft-plan/src/physical_ops/json.rs @@ -4,7 +4,7 @@ use daft_core::schema::SchemaRef; use daft_dsl::ExprRef; use crate::{ - physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::ExternalInfo, + physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::LegacyExternalInfo, PartitionSpec, }; use serde::{Deserialize, Serialize}; @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TabularScanJson { pub projection_schema: SchemaRef, - pub external_info: ExternalInfo, + pub external_info: LegacyExternalInfo, pub partition_spec: Arc, pub limit: Option, pub filters: Vec, @@ -21,7 +21,7 @@ pub struct TabularScanJson { impl TabularScanJson { pub(crate) fn new( projection_schema: SchemaRef, - external_info: ExternalInfo, + external_info: LegacyExternalInfo, partition_spec: Arc, limit: Option, filters: Vec, diff --git a/src/daft-plan/src/physical_ops/mod.rs b/src/daft-plan/src/physical_ops/mod.rs index d1e12d6a69..09dc707511 100644 --- a/src/daft-plan/src/physical_ops/mod.rs +++ b/src/daft-plan/src/physical_ops/mod.rs @@ -14,6 +14,7 @@ mod limit; mod parquet; mod project; mod reduce; +mod scan; mod sort; mod split; @@ -33,5 +34,6 @@ pub use limit::Limit; pub use parquet::{TabularScanParquet, TabularWriteParquet}; pub use project::Project; pub use reduce::ReduceMerge; +pub use scan::TabularScan; pub use sort::Sort; pub use split::Split; diff --git a/src/daft-plan/src/physical_ops/parquet.rs b/src/daft-plan/src/physical_ops/parquet.rs index f461432d3f..fdf1241f64 100644 --- a/src/daft-plan/src/physical_ops/parquet.rs +++ b/src/daft-plan/src/physical_ops/parquet.rs @@ -5,7 +5,7 @@ use daft_dsl::ExprRef; use crate::{ physical_plan::PhysicalPlan, sink_info::OutputFileInfo, - source_info::ExternalInfo as ExternalSourceInfo, PartitionSpec, + source_info::LegacyExternalInfo as ExternalSourceInfo, PartitionSpec, }; use serde::{Deserialize, Serialize}; diff --git a/src/daft-plan/src/physical_ops/scan.rs b/src/daft-plan/src/physical_ops/scan.rs new file mode 100644 index 0000000000..b9f618acc8 --- /dev/null +++ b/src/daft-plan/src/physical_ops/scan.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +use daft_scan::ScanTask; + +use crate::PartitionSpec; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TabularScan { + pub scan_tasks: Vec, + pub partition_spec: Arc, +} + +impl TabularScan { + pub(crate) fn new(scan_tasks: Vec, partition_spec: Arc) -> Self { + Self { + scan_tasks, + partition_spec, + } + } +} diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index eca30d3869..b3bfb45700 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -2,15 +2,17 @@ use { crate::{ sink_info::OutputFileInfo, - source_info::{ - ExternalInfo, FileFormat, FileFormatConfig, FileInfos, InMemoryInfo, - PyFileFormatConfig, PyStorageConfig, StorageConfig, - }, + source_info::{FileInfos, InMemoryInfo, LegacyExternalInfo}, }, daft_core::python::schema::PySchema, daft_core::schema::SchemaRef, daft_dsl::python::PyExpr, daft_dsl::Expr, + daft_scan::{ + file_format::{FileFormat, FileFormatConfig, PyFileFormatConfig}, + python::pylib::PyScanTask, + storage_config::{PyStorageConfig, StorageConfig}, + }, pyo3::{ pyclass, pymethods, types::PyBytes, PyObject, PyRef, PyRefMut, PyResult, PyTypeInfo, Python, ToPyObject, @@ -32,6 +34,7 @@ pub enum PhysicalPlan { TabularScanParquet(TabularScanParquet), TabularScanCsv(TabularScanCsv), TabularScanJson(TabularScanJson), + TabularScan(TabularScan), Project(Project), Filter(Filter), Limit(Limit), @@ -58,6 +61,7 @@ impl PhysicalPlan { match self { #[cfg(feature = "python")] Self::InMemoryScan(InMemoryScan { partition_spec, .. }) => partition_spec.clone(), + Self::TabularScan(TabularScan { partition_spec, .. }) => partition_spec.clone(), Self::TabularScanParquet(TabularScanParquet { partition_spec, .. }) => { partition_spec.clone() } @@ -303,10 +307,21 @@ impl PhysicalPlan { .call1((partition_iter,))?; Ok(py_iter.into()) } + PhysicalPlan::TabularScan(TabularScan { scan_tasks, .. }) => { + let py_scan_tasks = scan_tasks + .iter() + .map(|scan_task| PyScanTask::from(Arc::new(scan_task.clone()))) + .collect::>(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "scan_with_tasks"))? + .call1((py_scan_tasks,))?; + Ok(py_iter.into()) + } PhysicalPlan::TabularScanParquet(TabularScanParquet { projection_schema, external_info: - ExternalInfo { + LegacyExternalInfo { source_schema, file_infos, file_format_config, @@ -328,7 +343,7 @@ impl PhysicalPlan { PhysicalPlan::TabularScanCsv(TabularScanCsv { projection_schema, external_info: - ExternalInfo { + LegacyExternalInfo { source_schema, file_infos, file_format_config, @@ -350,7 +365,7 @@ impl PhysicalPlan { PhysicalPlan::TabularScanJson(TabularScanJson { projection_schema, external_info: - ExternalInfo { + LegacyExternalInfo { source_schema, file_infos, file_format_config, diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 8410ad0e1b..af910f9f6e 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -5,6 +5,8 @@ use std::{cmp::max, collections::HashMap}; use common_error::DaftResult; use daft_core::count_mode::CountMode; use daft_dsl::Expr; +use daft_scan::file_format::FileFormatConfig; +use daft_scan::ScanExternalInfo; use crate::logical_ops::{ Aggregate as LogicalAggregate, Concat as LogicalConcat, Distinct as LogicalDistinct, @@ -15,7 +17,7 @@ use crate::logical_ops::{ use crate::logical_plan::LogicalPlan; use crate::physical_plan::PhysicalPlan; use crate::sink_info::{OutputFileInfo, SinkInfo}; -use crate::source_info::{ExternalInfo as ExternalSourceInfo, FileFormatConfig, SourceInfo}; +use crate::source_info::{ExternalInfo as ExternalSourceInfo, LegacyExternalInfo, SourceInfo}; use crate::{physical_ops::*, PartitionSpec}; use crate::{FileFormat, PartitionScheme}; @@ -31,13 +33,13 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { limit, filters, }) => match source_info.as_ref() { - SourceInfo::ExternalInfo( - ext_info @ ExternalSourceInfo { + SourceInfo::ExternalInfo(ExternalSourceInfo::Legacy( + ext_info @ LegacyExternalInfo { file_format_config, file_infos, .. }, - ) => { + )) => { let partition_spec = Arc::new(PartitionSpec::new_internal( PartitionScheme::Unknown, file_infos.len(), @@ -73,6 +75,24 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { } } } + SourceInfo::ExternalInfo(ExternalSourceInfo::Scan(ScanExternalInfo { + pushdowns, + scan_op, + .. + })) => { + let scan_tasks = scan_op + .to_scan_tasks(pushdowns.clone())? + .collect::>>()?; + let partition_spec = Arc::new(PartitionSpec::new_internal( + PartitionScheme::Unknown, + scan_tasks.len(), + None, + )); + Ok(PhysicalPlan::TabularScan(TabularScan::new( + scan_tasks, + partition_spec, + ))) + } #[cfg(feature = "python")] SourceInfo::InMemoryInfo(mem_info) => { let scan = PhysicalPlan::InMemoryScan(InMemoryScan::new( diff --git a/src/daft-plan/src/source_info/mod.rs b/src/daft-plan/src/source_info/mod.rs index d2fc46c329..985940196e 100644 --- a/src/daft-plan/src/source_info/mod.rs +++ b/src/daft-plan/src/source_info/mod.rs @@ -1,23 +1,14 @@ -pub mod file_format; pub mod file_info; -#[cfg(feature = "python")] -mod py_object_serde; -pub mod storage_config; - use daft_core::schema::SchemaRef; -pub use file_format::{ - CsvSourceConfig, FileFormat, FileFormatConfig, JsonSourceConfig, ParquetSourceConfig, - PyFileFormatConfig, -}; +use daft_scan::file_format::FileFormatConfig; +use daft_scan::storage_config::StorageConfig; +use daft_scan::ScanExternalInfo; pub use file_info::{FileInfo, FileInfos}; use serde::{Deserialize, Serialize}; use std::{hash::Hash, sync::Arc}; #[cfg(feature = "python")] -pub use storage_config::PythonStorageConfig; -pub use storage_config::{NativeStorageConfig, PyStorageConfig, StorageConfig}; -#[cfg(feature = "python")] use { - py_object_serde::{deserialize_py_object, serialize_py_object}, + daft_scan::py_object_serde::{deserialize_py_object, serialize_py_object}, pyo3::{PyObject, Python}, std::hash::Hasher, }; @@ -89,15 +80,21 @@ impl Hash for InMemoryInfo { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum ExternalInfo { + Scan(ScanExternalInfo), + Legacy(LegacyExternalInfo), +} + #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct ExternalInfo { +pub struct LegacyExternalInfo { pub source_schema: SchemaRef, pub file_infos: Arc, pub file_format_config: Arc, pub storage_config: Arc, } -impl ExternalInfo { +impl LegacyExternalInfo { pub fn new( source_schema: SchemaRef, file_infos: Arc, diff --git a/src/daft-plan/src/test/mod.rs b/src/daft-plan/src/test/mod.rs index 1b7406faaf..9baf63af08 100644 --- a/src/daft-plan/src/test/mod.rs +++ b/src/daft-plan/src/test/mod.rs @@ -1,11 +1,10 @@ use std::sync::Arc; use daft_core::{datatypes::Field, schema::Schema}; +use daft_scan::{file_format::FileFormatConfig, storage_config::StorageConfig}; use crate::{ - builder::LogicalPlanBuilder, - source_info::{FileFormatConfig, FileInfos, StorageConfig}, - JsonSourceConfig, NativeStorageConfig, + builder::LogicalPlanBuilder, source_info::FileInfos, JsonSourceConfig, NativeStorageConfig, }; /// Create a dummy scan node containing the provided fields in its schema. @@ -15,7 +14,7 @@ pub fn dummy_scan_node(fields: Vec) -> LogicalPlanBuilder { FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]), schema, FileFormatConfig::Json(JsonSourceConfig {}).into(), - StorageConfig::Native(NativeStorageConfig::new_internal(None).into()).into(), + StorageConfig::Native(NativeStorageConfig::new_internal(true, None).into()).into(), ) .unwrap() } @@ -27,7 +26,7 @@ pub fn dummy_scan_node_with_limit(fields: Vec, limit: Option) -> L FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]), schema, FileFormatConfig::Json(JsonSourceConfig {}).into(), - StorageConfig::Native(NativeStorageConfig::new_internal(None).into()).into(), + StorageConfig::Native(NativeStorageConfig::new_internal(true, None).into()).into(), limit, ) .unwrap() diff --git a/src/daft-scan/Cargo.toml b/src/daft-scan/Cargo.toml index 8603ea3dc6..98e72eca18 100644 --- a/src/daft-scan/Cargo.toml +++ b/src/daft-scan/Cargo.toml @@ -1,5 +1,7 @@ [dependencies] +bincode = {workspace = true} common-error = {path = "../common/error", default-features = false} +common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-csv = {path = "../daft-csv", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} @@ -10,6 +12,7 @@ daft-table = {path = "../daft-table", default-features = false} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true} serde = {workspace = true} +serde_json = {workspace = true} snafu = {workspace = true} tokio = {workspace = true} diff --git a/src/daft-scan/src/anonymous.rs b/src/daft-scan/src/anonymous.rs index 4b1f31d55b..3fc608daa2 100644 --- a/src/daft-scan/src/anonymous.rs +++ b/src/daft-scan/src/anonymous.rs @@ -1,26 +1,32 @@ -use std::fmt::Display; +use std::{fmt::Display, sync::Arc}; use common_error::DaftResult; use daft_core::schema::SchemaRef; -use crate::{DataFileSource, FileType, PartitionField, ScanOperator, ScanOperatorRef, ScanTask}; +use crate::{ + file_format::FileFormatConfig, storage_config::StorageConfig, DataFileSource, PartitionField, + Pushdowns, ScanOperator, ScanTask, +}; #[derive(Debug)] pub struct AnonymousScanOperator { - schema: SchemaRef, - file_type: FileType, files: Vec, - columns_to_select: Option>, - limit: Option, + schema: SchemaRef, + file_format_config: Arc, + storage_config: Arc, } impl AnonymousScanOperator { - pub fn new(schema: SchemaRef, file_type: FileType, files: Vec) -> Self { + pub fn new( + files: Vec, + schema: SchemaRef, + file_format_config: Arc, + storage_config: Arc, + ) -> Self { Self { - schema, - file_type, files, - columns_to_select: None, - limit: None, + schema, + file_format_config, + storage_config, } } } @@ -40,40 +46,27 @@ impl ScanOperator for AnonymousScanOperator { &[] } - fn num_partitions(&self) -> common_error::DaftResult { - Ok(self.files.len()) + fn can_absorb_filter(&self) -> bool { + false } - - fn select(self: Box, columns: &[&str]) -> common_error::DaftResult { - for c in columns { - if self.schema.get_field(c).is_err() { - return Err(common_error::DaftError::FieldNotFound(format!( - "{c} not found in {:?}", - self.columns_to_select - ))); - } - } - let mut to_rtn = self; - to_rtn.columns_to_select = Some(columns.iter().map(|s| s.to_string()).collect()); - Ok(to_rtn) - } - - fn limit(self: Box, num: usize) -> DaftResult { - let mut to_rtn = self; - to_rtn.limit = Some(num); - Ok(to_rtn) + fn can_absorb_select(&self) -> bool { + false } - - fn filter(self: Box, _predicate: &daft_dsl::Expr) -> DaftResult<(bool, ScanOperatorRef)> { - Ok((false, self)) + fn can_absorb_limit(&self) -> bool { + false } fn to_scan_tasks( - self: Box, + &self, + pushdowns: Pushdowns, ) -> DaftResult>>> { + let columns = pushdowns.columns; + let file_format_config = self.file_format_config.clone(); + let storage_config = self.storage_config.clone(); + let limit = pushdowns.limit; + let schema = self.schema.clone(); let iter = self.files.clone().into_iter().map(move |f| { let source = DataFileSource::AnonymousDataFile { - file_type: self.file_type, path: f, metadata: None, partition_spec: None, @@ -81,8 +74,11 @@ impl ScanOperator for AnonymousScanOperator { }; Ok(ScanTask { source, - columns: self.columns_to_select.clone(), - limit: self.limit, + file_format_config: file_format_config.clone(), + schema: schema.clone(), + storage_config: storage_config.clone(), + columns: columns.clone(), + limit, }) }); Ok(Box::new(iter)) diff --git a/src/daft-plan/src/source_info/file_format.rs b/src/daft-scan/src/file_format.rs similarity index 74% rename from src/daft-plan/src/source_info/file_format.rs rename to src/daft-scan/src/file_format.rs index 60f6acc863..efd3e8beaf 100644 --- a/src/daft-plan/src/source_info/file_format.rs +++ b/src/daft-scan/src/file_format.rs @@ -1,11 +1,15 @@ -use daft_core::impl_bincode_py_state_serialization; +use common_error::{DaftError, DaftResult}; +use daft_core::{datatypes::TimeUnit, impl_bincode_py_state_serialization}; use serde::{Deserialize, Serialize}; -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; #[cfg(feature = "python")] -use pyo3::{ - pyclass, pyclass::CompareOp, pymethods, types::PyBytes, IntoPy, PyObject, PyResult, PyTypeInfo, - Python, ToPyObject, +use { + daft_core::python::datatype::PyTimeUnit, + pyo3::{ + exceptions::PyValueError, pyclass, pyclass::CompareOp, pymethods, types::PyBytes, IntoPy, + PyObject, PyResult, PyTypeInfo, Python, ToPyObject, + }, }; /// Format of a file, e.g. Parquet, CSV, JSON. @@ -17,6 +21,27 @@ pub enum FileFormat { Json, } +impl FromStr for FileFormat { + type Err = DaftError; + + fn from_str(file_format: &str) -> DaftResult { + use FileFormat::*; + + if file_format.trim().eq_ignore_ascii_case("parquet") { + Ok(Parquet) + } else if file_format.trim().eq_ignore_ascii_case("csv") { + Ok(Csv) + } else if file_format.trim().eq_ignore_ascii_case("json") { + Ok(Json) + } else { + Err(DaftError::TypeError(format!( + "FileFormat {} not supported!", + file_format + ))) + } + } +} + impl_bincode_py_state_serialization!(FileFormat); impl From<&FileFormatConfig> for FileFormat { @@ -53,7 +78,8 @@ impl FileFormatConfig { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct ParquetSourceConfig { - multithreaded_io: bool, + pub coerce_int96_timestamp_unit: TimeUnit, + pub row_groups: Option>, } #[cfg(feature = "python")] @@ -61,13 +87,23 @@ pub struct ParquetSourceConfig { impl ParquetSourceConfig { /// Create a config for a Parquet data source. #[new] - fn new(multithreaded_io: bool) -> Self { - Self { multithreaded_io } + fn new(coerce_int96_timestamp_unit: Option, row_groups: Option>) -> Self { + Self { + coerce_int96_timestamp_unit: coerce_int96_timestamp_unit + .unwrap_or(TimeUnit::Nanoseconds.into()) + .into(), + row_groups, + } } #[getter] - fn multithreaded_io(&self) -> PyResult { - Ok(self.multithreaded_io) + fn row_groups(&self) -> PyResult>> { + Ok(self.row_groups.clone()) + } + + #[getter] + fn coerce_int96_timestamp_unit(&self) -> PyResult { + Ok(self.coerce_int96_timestamp_unit.into()) } } @@ -102,14 +138,20 @@ impl CsvSourceConfig { double_quote: bool, buffer_size: Option, chunk_size: Option, - ) -> Self { - Self { + ) -> PyResult { + if delimiter.as_bytes().len() != 1 { + return Err(PyValueError::new_err(format!( + "Cannot create CsvSourceConfig with delimiter with length: {}", + delimiter.len() + ))); + } + Ok(Self { delimiter, has_headers, double_quote, buffer_size, chunk_size, - } + }) } } diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 1043a76bf4..980ed3618e 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -3,16 +3,19 @@ use std::{fmt::Display, sync::Arc}; use common_error::DaftResult; use daft_core::schema::SchemaRef; use daft_io::{get_io_client, get_runtime, IOStatsContext}; +use daft_parquet::read::ParquetSchemaInferenceOptions; -use crate::{DataFileSource, FileType, PartitionField, ScanOperator, ScanOperatorRef, ScanTask}; -#[derive(Debug)] +use crate::{ + file_format::{CsvSourceConfig, FileFormatConfig, JsonSourceConfig, ParquetSourceConfig}, + storage_config::StorageConfig, + DataFileSource, PartitionField, Pushdowns, ScanOperator, ScanTask, +}; +#[derive(Debug, PartialEq, Hash)] pub struct GlobScanOperator { glob_path: String, - file_type: FileType, - columns_to_select: Option>, - limit: Option, + file_format_config: Arc, schema: SchemaRef, - io_config: Arc, + storage_config: Arc, } fn run_glob( @@ -39,14 +42,22 @@ fn run_glob( impl GlobScanOperator { pub fn _try_new( glob_path: &str, - file_type: FileType, - io_config: Arc, + file_format_config: FileFormatConfig, + storage_config: Arc, ) -> DaftResult { + let io_config = match storage_config.as_ref() { + StorageConfig::Native(cfg) => Arc::new(cfg.io_config.clone().unwrap_or_default()), + #[cfg(feature = "python")] + StorageConfig::Python(cfg) => Arc::new(cfg.io_config.clone().unwrap_or_default()), + }; let paths = run_glob(glob_path, io_config.clone(), Some(1))?; let first_filepath = paths[0].as_str(); - let schema = match file_type { - FileType::Parquet => { + let schema = match &file_format_config { + FileFormatConfig::Parquet(ParquetSourceConfig { + coerce_int96_timestamp_unit, + row_groups: _, + }) => { let io_client = get_io_client(true, io_config.clone())?; // it appears that read_parquet_schema is hardcoded to use multithreaded_io let io_stats = IOStatsContext::new(format!( "GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}" @@ -55,36 +66,45 @@ impl GlobScanOperator { first_filepath, io_client, Some(io_stats), - Default::default(), // TODO: pass-through schema inference options + ParquetSchemaInferenceOptions { + coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, + }, )? } - FileType::Csv => { + FileFormatConfig::Csv(CsvSourceConfig { + delimiter, + has_headers, + double_quote, + buffer_size: _, + chunk_size: _, + }) => { let io_client = get_io_client(true, io_config.clone())?; // it appears that read_parquet_schema is hardcoded to use multithreaded_io let io_stats = IOStatsContext::new(format!( "GlobScanOperator constructor read_csv_schema: for uri {first_filepath}" )); let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema( first_filepath, - true, // TODO: pass-through schema inference options - None, // TODO: pass-through schema inference options - true, // TODO: pass-through schema inference options - None, // TODO: pass-through schema inference options + *has_headers, + Some(delimiter.as_bytes()[0]), + *double_quote, + None, io_client, Some(io_stats), )?; schema } - FileType::Avro => todo!("Schema inference for Avro not implemented"), - FileType::Orc => todo!("Schema inference for Orc not implemented"), + FileFormatConfig::Json(JsonSourceConfig {}) => { + // NOTE: Native JSON reads not yet implemented, so we have to delegate to Python here or implement + // a daft_json crate that gives us native JSON schema inference + todo!("Implement schema inference from JSON in GlobScanOperator"); + } }; Ok(Self { glob_path: glob_path.to_string(), - file_type, - columns_to_select: None, - limit: None, + file_format_config: Arc::new(file_format_config), schema: Arc::new(schema), - io_config, + storage_config, }) } } @@ -104,41 +124,36 @@ impl ScanOperator for GlobScanOperator { &[] } - fn num_partitions(&self) -> common_error::DaftResult { - unimplemented!("Cannot get number of partitions -- this will not be implemented."); - } - - fn select(self: Box, columns: &[&str]) -> common_error::DaftResult { - for c in columns { - if self.schema.get_field(c).is_err() { - return Err(common_error::DaftError::FieldNotFound(format!( - "{c} not found in {:?}", - self.columns_to_select - ))); - } - } - let mut to_rtn = self; - to_rtn.columns_to_select = Some(columns.iter().map(|s| s.to_string()).collect()); - Ok(to_rtn) + fn can_absorb_filter(&self) -> bool { + false } - - fn limit(self: Box, num: usize) -> DaftResult { - let mut to_rtn = self; - to_rtn.limit = Some(num); - Ok(to_rtn) + fn can_absorb_select(&self) -> bool { + false } - - fn filter(self: Box, _predicate: &daft_dsl::Expr) -> DaftResult<(bool, ScanOperatorRef)> { - Ok((false, self)) + fn can_absorb_limit(&self) -> bool { + false } fn to_scan_tasks( - self: Box, + &self, + pushdowns: Pushdowns, ) -> DaftResult>>> { - let files = run_glob(self.glob_path.as_str(), self.io_config.clone(), None)?; + let io_config = match self.storage_config.as_ref() { + StorageConfig::Native(cfg) => Arc::new(cfg.io_config.clone().unwrap_or_default()), + #[cfg(feature = "python")] + StorageConfig::Python(cfg) => Arc::new(cfg.io_config.clone().unwrap_or_default()), + }; + let columns = pushdowns.columns; + let limit = pushdowns.limit; + + // Clone to move into closure for delayed execution + let storage_config = self.storage_config.clone(); + let schema = self.schema.clone(); + let file_format_config = self.file_format_config.clone(); + + let files = run_glob(self.glob_path.as_str(), io_config, None)?; let iter = files.into_iter().map(move |f| { let source = DataFileSource::AnonymousDataFile { - file_type: self.file_type, path: f, metadata: None, partition_spec: None, @@ -146,8 +161,11 @@ impl ScanOperator for GlobScanOperator { }; Ok(ScanTask { source, - columns: self.columns_to_select.clone(), - limit: self.limit, + file_format_config: file_format_config.clone(), + schema: schema.clone(), + storage_config: storage_config.clone(), + columns: columns.clone(), + limit, }) }); Ok(Box::new(iter)) diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index f23fbb0cae..274b6205df 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -1,62 +1,38 @@ use std::{ fmt::{Debug, Display}, - str::FromStr, + hash::{Hash, Hasher}, + sync::Arc, }; -use common_error::{DaftError, DaftResult}; +use common_error::DaftResult; use daft_core::{datatypes::Field, schema::SchemaRef}; -use daft_dsl::Expr; +use daft_dsl::{Expr, ExprRef}; use daft_stats::{PartitionSpec, TableMetadata, TableStatistics}; +use file_format::FileFormatConfig; use serde::{Deserialize, Serialize}; mod anonymous; +pub mod file_format; mod glob; +#[cfg(feature = "python")] +pub mod py_object_serde; + #[cfg(feature = "python")] pub mod python; +pub mod storage_config; #[cfg(feature = "python")] pub use python::register_modules; +use storage_config::StorageConfig; -#[derive(Serialize, Deserialize, Clone, Copy, Debug)] -pub enum FileType { - Parquet, - Avro, - Orc, - Csv, -} - -impl FromStr for FileType { - type Err = DaftError; - - fn from_str(file_type: &str) -> DaftResult { - use FileType::*; - if file_type.trim().eq_ignore_ascii_case("parquet") { - Ok(Parquet) - } else if file_type.trim().eq_ignore_ascii_case("avro") { - Ok(Avro) - } else if file_type.trim().eq_ignore_ascii_case("orc") { - Ok(Orc) - } else if file_type.trim().eq_ignore_ascii_case("csv") { - Ok(Csv) - } else { - Err(DaftError::TypeError(format!( - "FileType {} not supported!", - file_type - ))) - } - } -} - -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum DataFileSource { AnonymousDataFile { - file_type: FileType, path: String, metadata: Option, partition_spec: Option, statistics: Option, }, CatalogDataFile { - file_type: FileType, path: String, metadata: TableMetadata, partition_spec: PartitionSpec, @@ -64,31 +40,243 @@ pub enum DataFileSource { }, } -#[derive(Serialize, Deserialize)] +impl DataFileSource { + pub fn get_path(&self) -> &str { + match self { + Self::AnonymousDataFile { path, .. } | Self::CatalogDataFile { path, .. } => path, + } + } + pub fn get_metadata(&self) -> Option<&TableMetadata> { + match self { + Self::AnonymousDataFile { metadata, .. } => metadata.as_ref(), + Self::CatalogDataFile { metadata, .. } => Some(metadata), + } + } + + pub fn get_statistics(&self) -> Option<&TableStatistics> { + match self { + Self::AnonymousDataFile { statistics, .. } + | Self::CatalogDataFile { statistics, .. } => statistics.as_ref(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ScanTask { // Micropartition will take this in as an input - source: DataFileSource, - columns: Option>, - limit: Option, + pub source: DataFileSource, + pub file_format_config: Arc, + pub schema: SchemaRef, + pub storage_config: Arc, + // TODO(Clark): Directly use the Pushdowns struct as part of the ScanTask struct? + pub columns: Option>>, + pub limit: Option, } -#[derive(Serialize, Deserialize)] + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScanTaskBatch { + pub sources: Vec, + pub file_format_config: Arc, + pub schema: SchemaRef, + pub storage_config: Arc, + // TODO(Clark): Directly use the Pushdowns struct as part of the ScanTaskBatch struct? + pub columns: Option>>, + pub limit: Option, + pub metadata: Option, + pub statistics: Option, +} + +impl ScanTaskBatch { + pub fn new( + sources: Vec, + file_format_config: Arc, + schema: SchemaRef, + storage_config: Arc, + columns: Option>>, + limit: Option, + ) -> Self { + assert!(!sources.is_empty()); + let (length, statistics) = sources + .iter() + .map(|s| { + ( + s.get_metadata().map(|m| m.length), + s.get_statistics().cloned(), + ) + }) + .reduce(|(acc_len, acc_stats), (curr_len, curr_stats)| { + ( + acc_len.and_then(|acc_len| curr_len.map(|curr_len| acc_len + curr_len)), + acc_stats.and_then(|acc_stats| { + curr_stats.map(|curr_stats| acc_stats.union(&curr_stats).unwrap()) + }), + ) + }) + .unwrap(); + let metadata = length.map(|l| TableMetadata { length: l }); + Self { + sources, + file_format_config, + schema, + storage_config, + columns, + limit, + metadata, + statistics, + } + } + + pub fn num_rows(&self) -> Option { + self.metadata.as_ref().map(|m| m.length) + } + + pub fn size_bytes(&self) -> Option { + self.statistics.as_ref().and_then(|s| { + self.num_rows() + .and_then(|num_rows| Some(num_rows * s.estimate_row_size().ok()?)) + }) + } +} + +impl From> for ScanTaskBatch { + fn from(value: Vec) -> Self { + if value.is_empty() { + panic!("Must have at least one ScanTask to create a ScanTaskBatch."); + } + let mut scan_task_iter = value.into_iter(); + let first_scan_task = scan_task_iter.next().unwrap(); + let first_scan_task_source = first_scan_task.source; + let sources = vec![first_scan_task_source] + .into_iter() + .chain(scan_task_iter.map(|t| t.source)) + .collect::>(); + Self::new( + sources, + first_scan_task.file_format_config, + first_scan_task.schema, + first_scan_task.storage_config, + first_scan_task.columns, + first_scan_task.limit, + ) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct PartitionField { field: Field, source_field: Option, transform: Option, } -pub trait ScanOperator: Send + Display { +pub trait ScanOperator: Send + Sync + Display + Debug { fn schema(&self) -> SchemaRef; fn partitioning_keys(&self) -> &[PartitionField]; - fn num_partitions(&self) -> DaftResult; + // fn statistics(&self) -> &TableStatistics; + // fn clustering_spec(&self) -> &ClusteringSpec; - // also returns a bool to indicate if the scan operator can "absorb" the predicate - fn filter(self: Box, predicate: &Expr) -> DaftResult<(bool, ScanOperatorRef)>; - fn select(self: Box, columns: &[&str]) -> DaftResult; - fn limit(self: Box, num: usize) -> DaftResult; - fn to_scan_tasks(self: Box) - -> DaftResult>>>; + fn can_absorb_filter(&self) -> bool; + fn can_absorb_select(&self) -> bool; + fn can_absorb_limit(&self) -> bool; + fn to_scan_tasks( + &self, + pushdowns: Pushdowns, + ) -> DaftResult>>>; } -pub type ScanOperatorRef = Box; +pub type ScanOperatorRef = Arc; + +impl PartialEq for Arc { + fn eq(&self, other: &dyn ScanOperator) -> bool { + self.as_ref().eq(other) + } +} + +impl PartialEq for dyn ScanOperator + '_ { + #[allow(clippy::ptr_eq)] + fn eq(&self, other: &dyn ScanOperator) -> bool { + // We don't use std::ptr::eq() since that also includes fat pointer metadata in the comparison; + // for trait objects, vtables are duplicated in multiple codegen units, which could cause false negatives. + // We therefore cast to unit type pointers to ditch the vtables before comparing. + self as *const dyn ScanOperator as *const () + == other as *const dyn ScanOperator as *const () + } +} + +impl Eq for dyn ScanOperator + '_ {} + +impl Hash for dyn ScanOperator + '_ { + fn hash(&self, hasher: &mut H) { + // We prune the fat trait object pointer of the vtable before hashing; see comment for PartialEq implementation. + (self as *const dyn ScanOperator as *const ()).hash(hasher) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ScanExternalInfo { + pub scan_op: Arc, + pub source_schema: SchemaRef, + pub partitioning_keys: Vec, + pub pushdowns: Pushdowns, +} + +impl ScanExternalInfo { + pub fn new( + scan_op: Arc, + source_schema: SchemaRef, + partitioning_keys: Vec, + pushdowns: Pushdowns, + ) -> Self { + Self { + scan_op, + source_schema, + partitioning_keys, + pushdowns, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Pushdowns { + /// Optional filters to apply to the source data. + pub filters: Option>>, + /// Optional columns to select from the source data. + pub columns: Option>>, + /// Optional number of rows to read. + pub limit: Option, +} + +impl Default for Pushdowns { + fn default() -> Self { + Self::new(None, None, None) + } +} + +impl Pushdowns { + pub fn new( + filters: Option>>, + columns: Option>>, + limit: Option, + ) -> Self { + Self { + filters, + columns, + limit, + } + } + + pub fn with_limit(&self, limit: Option) -> Self { + Self { + filters: self.filters.clone(), + columns: self.columns.clone(), + limit, + } + } + + pub fn with_filters(&self, filters: Option>>) -> Self { + Self { + filters, + columns: self.columns.clone(), + limit: self.limit, + } + } +} diff --git a/src/daft-plan/src/source_info/py_object_serde.rs b/src/daft-scan/src/py_object_serde.rs similarity index 88% rename from src/daft-plan/src/source_info/py_object_serde.rs rename to src/daft-scan/src/py_object_serde.rs index f85b1b5678..2dde3fff1b 100644 --- a/src/daft-plan/src/source_info/py_object_serde.rs +++ b/src/daft-scan/src/py_object_serde.rs @@ -4,7 +4,7 @@ use serde::{ }; use std::fmt; -pub(super) fn serialize_py_object(obj: &PyObject, s: S) -> Result +pub fn serialize_py_object(obj: &PyObject, s: S) -> Result where S: Serializer, { @@ -56,7 +56,7 @@ impl<'de> Visitor<'de> for PyObjectVisitor { } #[cfg(feature = "python")] -pub(super) fn deserialize_py_object<'de, D>(d: D) -> Result +pub fn deserialize_py_object<'de, D>(d: D) -> Result where D: Deserializer<'de>, { @@ -69,10 +69,7 @@ where struct PyObjSerdeWrapper<'a>(#[serde(serialize_with = "serialize_py_object")] &'a PyObject); #[cfg(feature = "python")] -pub(super) fn serialize_py_object_optional( - obj: &Option, - s: S, -) -> Result +pub fn serialize_py_object_optional(obj: &Option, s: S) -> Result where S: Serializer, { @@ -108,7 +105,7 @@ impl<'de> Visitor<'de> for OptPyObjectVisitor { } #[cfg(feature = "python")] -pub(super) fn deserialize_py_object_optional<'de, D>(d: D) -> Result, D::Error> +pub fn deserialize_py_object_optional<'de, D>(d: D) -> Result, D::Error> where D: Deserializer<'de>, { diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index 3a56e6f562..a956fba5dc 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -1,46 +1,215 @@ use pyo3::prelude::*; pub mod pylib { + + use daft_dsl::python::PyExpr; + use pyo3::prelude::*; - use std::str::FromStr; + + use std::fmt::Display; + + use std::sync::Arc; use daft_core::python::schema::PySchema; use pyo3::pyclass; + use serde::{Deserialize, Serialize}; use crate::anonymous::AnonymousScanOperator; - use crate::FileType; + use crate::Pushdowns; + use crate::ScanOperator; use crate::ScanOperatorRef; + use crate::ScanTask; + use crate::ScanTaskBatch; + + use crate::file_format::PyFileFormatConfig; + use crate::storage_config::PyStorageConfig; #[pyclass(module = "daft.daft", frozen)] - pub(crate) struct ScanOperator { + #[derive(Debug, Clone)] + pub struct ScanOperatorHandle { scan_op: ScanOperatorRef, } #[pymethods] - impl ScanOperator { + impl ScanOperatorHandle { pub fn __repr__(&self) -> PyResult { Ok(format!("{}", self.scan_op)) } #[staticmethod] pub fn anonymous_scan( - schema: PySchema, - file_type: &str, files: Vec, + schema: PySchema, + file_format_config: PyFileFormatConfig, + storage_config: PyStorageConfig, ) -> PyResult { let schema = schema.schema; - let operator = Box::new(AnonymousScanOperator::new( - schema, - FileType::from_str(file_type)?, + let operator = Arc::new(AnonymousScanOperator::new( files, + schema, + file_format_config.into(), + storage_config.into(), )); - Ok(ScanOperator { scan_op: operator }) + Ok(ScanOperatorHandle { scan_op: operator }) + } + + #[staticmethod] + pub fn from_python_abc(py_scan: PyObject) -> PyResult { + let scan_op: ScanOperatorRef = + Arc::new(PythonScanOperatorBridge::from_python_abc(py_scan)?); + Ok(ScanOperatorHandle { scan_op }) + } + } + #[pyclass(module = "daft.daft")] + #[derive(Debug)] + struct PythonScanOperatorBridge { + operator: PyObject, + } + #[pymethods] + impl PythonScanOperatorBridge { + #[staticmethod] + pub fn from_python_abc(abc: PyObject) -> PyResult { + Ok(Self { operator: abc }) + } + + pub fn _filter(&self, py: Python, predicate: PyExpr) -> PyResult<(bool, Self)> { + let _from_pyexpr = py + .import(pyo3::intern!(py, "daft.expressions"))? + .getattr(pyo3::intern!(py, "Expression"))? + .getattr(pyo3::intern!(py, "_from_pyexpr"))?; + let expr = _from_pyexpr.call1((predicate,))?; + let result = self.operator.call_method(py, "filter", (expr,), None)?; + let (absorb, new_op) = result.extract::<(bool, PyObject)>(py)?; + Ok((absorb, Self { operator: new_op })) + } + } + + impl Display for PythonScanOperatorBridge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:#?}", self) + } + } + + impl ScanOperator for PythonScanOperatorBridge { + // fn filter( + // self: Box, + // predicate: &daft_dsl::Expr, + // ) -> common_error::DaftResult<(bool, ScanOperatorRef)> { + // Python::with_gil(|py| { + // let (can, new_op) = self._filter( + // py, + // PyExpr { + // expr: predicate.clone(), + // }, + // )?; + // Ok((can, Box::new(new_op) as ScanOperatorRef)) + // }) + // } + // fn limit(self: Box, num: usize) -> common_error::DaftResult { + // todo!() + // } + // fn num_partitions(&self) -> common_error::DaftResult { + // todo!() + // } + fn partitioning_keys(&self) -> &[crate::PartitionField] { + todo!() + } + fn schema(&self) -> daft_core::schema::SchemaRef { + todo!() + } + fn can_absorb_filter(&self) -> bool { + todo!() + } + fn can_absorb_limit(&self) -> bool { + todo!() + } + fn can_absorb_select(&self) -> bool { + todo!() + } + + // fn select(self: Box, columns: &[&str]) -> common_error::DaftResult { + // todo!() + // } + fn to_scan_tasks( + &self, + _pushdowns: Pushdowns, + ) -> common_error::DaftResult< + Box>>, + > { + todo!() + } + } + + impl From for ScanOperatorHandle { + fn from(value: ScanOperatorRef) -> Self { + Self { scan_op: value } + } + } + + impl From for ScanOperatorRef { + fn from(value: ScanOperatorHandle) -> Self { + value.scan_op + } + } + + #[pyclass(module = "daft.daft", name = "ScanTask", frozen)] + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct PyScanTask(Arc); + + impl From> for PyScanTask { + fn from(value: Arc) -> Self { + Self(value) + } + } + + impl From for Arc { + fn from(value: PyScanTask) -> Self { + value.0 + } + } + + #[pyclass(module = "daft.daft", name = "ScanTaskBatch", frozen)] + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct PyScanTaskBatch(Arc); + + #[pymethods] + impl PyScanTaskBatch { + #[staticmethod] + pub fn from_scan_tasks(scan_tasks: Vec) -> PyResult { + let scan_tasks: Vec = scan_tasks + .into_iter() + .map(|st| st.0.as_ref().clone()) + .collect(); + let scan_task_batch: ScanTaskBatch = scan_tasks.into(); + Ok(Self(Arc::new(scan_task_batch))) + } + + pub fn num_rows(&self) -> PyResult> { + Ok(self.0.num_rows().map(i64::try_from).transpose()?) + } + + pub fn size_bytes(&self) -> PyResult> { + Ok(self.0.size_bytes().map(i64::try_from).transpose()?) + } + } + + impl From> for PyScanTaskBatch { + fn from(value: Arc) -> Self { + Self(value) + } + } + + impl From for Arc { + fn from(value: PyScanTaskBatch) -> Self { + value.0 } } } pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { - parent.add_class::()?; + parent.add_class::()?; + parent.add_class::()?; + parent.add_class::()?; Ok(()) } diff --git a/src/daft-plan/src/source_info/storage_config.rs b/src/daft-scan/src/storage_config.rs similarity index 91% rename from src/daft-plan/src/source_info/storage_config.rs rename to src/daft-scan/src/storage_config.rs index 5ef40640bc..fa705cf4d4 100644 --- a/src/daft-plan/src/source_info/storage_config.rs +++ b/src/daft-scan/src/storage_config.rs @@ -29,11 +29,15 @@ pub enum StorageConfig { #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct NativeStorageConfig { pub io_config: Option, + pub multithreaded_io: bool, } impl NativeStorageConfig { - pub fn new_internal(io_config: Option) -> Self { - Self { io_config } + pub fn new_internal(multithreaded_io: bool, io_config: Option) -> Self { + Self { + io_config, + multithreaded_io, + } } } @@ -41,14 +45,19 @@ impl NativeStorageConfig { #[pymethods] impl NativeStorageConfig { #[new] - pub fn new(io_config: Option) -> Self { - Self::new_internal(io_config.map(|c| c.config)) + pub fn new(multithreaded_io: bool, io_config: Option) -> Self { + Self::new_internal(multithreaded_io, io_config.map(|c| c.config)) } #[getter] pub fn io_config(&self) -> Option { self.io_config.clone().map(|c| c.into()) } + + #[getter] + pub fn multithreaded_io(&self) -> bool { + self.multithreaded_io + } } /// Storage configuration for the legacy Python I/O layer. diff --git a/src/daft-stats/src/partition_spec.rs b/src/daft-stats/src/partition_spec.rs index c75db0344d..a5e60a19bb 100644 --- a/src/daft-stats/src/partition_spec.rs +++ b/src/daft-stats/src/partition_spec.rs @@ -1,6 +1,6 @@ use daft_table::Table; -#[derive(Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PartitionSpec { keys: Table, } diff --git a/src/daft-stats/src/table_metadata.rs b/src/daft-stats/src/table_metadata.rs index 8c6d980dae..55112819f4 100644 --- a/src/daft-stats/src/table_metadata.rs +++ b/src/daft-stats/src/table_metadata.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TableMetadata { pub length: usize, } diff --git a/tests/integration/iceberg/docker-compose/docker-compose.yml b/tests/integration/iceberg/docker-compose/docker-compose.yml index 53b96dff23..a805797e89 100644 --- a/tests/integration/iceberg/docker-compose/docker-compose.yml +++ b/tests/integration/iceberg/docker-compose/docker-compose.yml @@ -18,6 +18,8 @@ version: '3' services: spark-iceberg: + depends_on: + - rest image: python-integration container_name: pyiceberg-spark build: . @@ -39,6 +41,8 @@ services: - rest:rest - minio:minio rest: + depends_on: + - mc image: tabulario/iceberg-rest container_name: pyiceberg-rest networks: diff --git a/tests/table/table_io/test_csv.py b/tests/table/table_io/test_csv.py index bcf01a03ec..e6c70cef15 100644 --- a/tests/table/table_io/test_csv.py +++ b/tests/table/table_io/test_csv.py @@ -18,7 +18,7 @@ def storage_config_from_use_native_downloader(use_native_downloader: bool) -> StorageConfig: if use_native_downloader: - return StorageConfig.native(NativeStorageConfig(None)) + return StorageConfig.native(NativeStorageConfig(True, None)) else: return StorageConfig.python(PythonStorageConfig(None)) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 83c6e67e2b..84dc5c12c2 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -23,7 +23,7 @@ def storage_config_from_use_native_downloader(use_native_downloader: bool) -> StorageConfig: if use_native_downloader: - return StorageConfig.native(NativeStorageConfig(None)) + return StorageConfig.native(NativeStorageConfig(True, None)) else: return StorageConfig.python(PythonStorageConfig(None))