Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Python Scan Op #1536

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 52 additions & 5 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ class ParquetSourceConfig:
Configuration of a Parquet data source.
"""

# Whether or not to use a multithreaded tokio runtime for processing I/O
multithreaded_io: bool

def __init__(self, multithreaded_io: bool): ...
def __init__(self, coerce_int96_timestamp_unit: PyTimeUnit | None = None, row_groups: list[int] | None = None): ...

class CsvSourceConfig:
"""
Expand Down Expand Up @@ -339,9 +336,11 @@ class NativeStorageConfig:
Storage configuration for the Rust-native I/O layer.
"""

# Whether or not to use a multithreaded tokio runtime for processing I/O
multithreaded_io: bool
io_config: IOConfig

def __init__(self, io_config: IOConfig | None = None): ...
def __init__(self, multithreaded_io: bool, io_config: IOConfig | None = None): ...

class PythonStorageConfig:
"""
Expand Down Expand Up @@ -374,6 +373,48 @@ class StorageConfig:
@property
def config(self) -> NativeStorageConfig | PythonStorageConfig: ...

class ScanTask:
"""
A scan task for reading data from an external source.
"""

...

class ScanTaskBatch:
"""
A batch of scan tasks for reading data from an external source.
"""

@staticmethod
def from_scan_tasks(scan_tasks: list[ScanTask]) -> ScanTaskBatch:
"""
Create a scan task batch from a list of scan tasks.
"""
...
def num_rows(self) -> int:
"""
Get number of rows that will be scanned by all tasks in this batch.
"""
...
def size_bytes(self) -> int:
"""
Get number of bytes that will be scanned by all tasks in this batch.
"""
...

class ScanOperatorHandle:
"""
A handle to a scan operator.
"""

@staticmethod
def anonymous_scan(
files: list[str],
schema: PySchema,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
) -> ScanOperatorHandle: ...

def read_parquet(
uri: str,
columns: list[str] | None = None,
Expand Down Expand Up @@ -722,6 +763,8 @@ class PyMicroPartition:
@staticmethod
def empty(schema: PySchema | None = None) -> PyMicroPartition: ...
@staticmethod
def from_scan_task_batch(scan_task_batch: ScanTaskBatch) -> PyMicroPartition: ...
@staticmethod
def from_tables(tables: list[PyTable]) -> PyMicroPartition: ...
@staticmethod
def from_arrow_record_batches(record_batches: list[pyarrow.RecordBatch], schema: PySchema) -> PyMicroPartition: ...
Expand Down Expand Up @@ -814,6 +857,10 @@ class LogicalPlanBuilder:
partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, num_partitions: int
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan_with_scan_operator(
scan_operator: ScanOperatorHandle, schema_hint: PySchema | None
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan(
file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig
) -> LogicalPlanBuilder: ...
Expand Down
1 change: 0 additions & 1 deletion daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ def _handle_tabular_files_scan(
schema=self.schema,
storage_config=self.storage_config,
read_options=read_options,
multithreaded_io=format_config.multithreaded_io,
)
for fp in filepaths
]
Expand Down
45 changes: 45 additions & 0 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterator, TypeVar, cast

from daft.daft import (
Expand All @@ -10,17 +11,61 @@
PySchema,
PyTable,
ResourceRequest,
ScanTask,
ScanTaskBatch,
StorageConfig,
)
from daft.execution import execution_step, physical_plan
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata
from daft.table import Table

PartitionT = TypeVar("PartitionT")


def scan_with_tasks(
scan_tasks: Iterator[ScanTask],
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
"""child_plan represents partitions with filenames.

Yield a plan to read those filenames.
"""
# TODO(Clark): Bundle scan tasks into single-instruction bulk reads.
for scan_task in scan_tasks:
scan_task_batch = ScanTaskBatch.from_scan_tasks([scan_task])
scan_step = execution_step.PartitionTaskBuilder[PartitionT](inputs=[], partial_metadatas=None,).add_instruction(
instruction=ScanWithTask(scan_task_batch),
# Set the filesize as the memory request.
# (Note: this is very conservative; file readers empirically use much more peak memory than 1x file size.)
resource_request=ResourceRequest(memory_bytes=scan_task_batch.size_bytes()),
)
yield scan_step


@dataclass(frozen=True)
class ScanWithTask(execution_step.SingleOutputInstruction):
scan_task_batch: ScanTaskBatch

def run(self, inputs: list[Table]) -> list[Table]:
return self._scan(inputs)

def _scan(self, inputs: list[Table]) -> list[Table]:
assert len(inputs) == 0
return [Table._from_scan_task_batch(self.scan_task_batch)]

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
assert len(input_metadatas) == 0

return [
PartialPartitionMetadata(
num_rows=self.scan_task_batch.num_rows(),
size_bytes=None,
)
]


def tabular_scan(
schema: PySchema,
columns_to_read: list[str] | None,
Expand Down
Empty file added daft/iceberg/__init__.py
Empty file.
94 changes: 94 additions & 0 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.partitioning import PartitionField as IcebergPartitionField
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import Table

from daft.datatype import DataType
from daft.expressions.expressions import col
from daft.io.scan import PartitionField, ScanOperator
from daft.logical.schema import Field, Schema


def _iceberg_partition_field_to_daft_partition_field(
iceberg_schema: IcebergSchema, pfield: IcebergPartitionField
) -> PartitionField:
name = pfield.name
source_id = pfield.source_id
source_field = iceberg_schema.find_field(source_id)
source_name = source_field.name
daft_field = Field.create(
source_name, DataType.from_arrow_type(schema_to_pyarrow(iceberg_schema.find_type(source_name)))
)
transform = pfield.transform
iceberg_result_type = transform.result_type(source_field.field_type)
arrow_result_type = schema_to_pyarrow(iceberg_result_type)
daft_result_type = DataType.from_arrow_type(arrow_result_type)
result_field = Field.create(name, daft_result_type)

from pyiceberg.transforms import (
DayTransform,
HourTransform,
IdentityTransform,
MonthTransform,
YearTransform,
)

expr = None
if isinstance(transform, IdentityTransform):
expr = col(source_name)
if source_name != name:
expr = expr.alias(name)
elif isinstance(transform, YearTransform):
expr = col(source_name).dt.year().alias(name)
elif isinstance(transform, MonthTransform):
expr = col(source_name).dt.month().alias(name)
elif isinstance(transform, DayTransform):
expr = col(source_name).dt.day().alias(name)
elif isinstance(transform, HourTransform):
raise NotImplementedError("HourTransform not implemented, Please make an issue!")
else:
raise NotImplementedError(f"{transform} not implemented, Please make an issue!")

assert expr is not None
return PartitionField(result_field, daft_field, transform=expr)


def iceberg_partition_spec_to_fields(iceberg_schema: IcebergSchema, spec: IcebergPartitionSpec) -> list[PartitionField]:
return [_iceberg_partition_field_to_daft_partition_field(iceberg_schema, field) for field in spec.fields]


class IcebergScanOperator(ScanOperator):
def __init__(self, iceberg_table: Table) -> None:
super().__init__()
self._table = iceberg_table
arrow_schema = schema_to_pyarrow(iceberg_table.schema())
self._schema = Schema.from_pyarrow_schema(arrow_schema)
self._partition_keys = iceberg_partition_spec_to_fields(self._table.schema(), self._table.spec())

def schema(self) -> Schema:
return self._schema

def partitioning_keys(self) -> list[PartitionField]:
return self._partition_keys


def catalog() -> Catalog:
return load_catalog(
"local",
**{
"type": "rest",
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)


cat = catalog()
tab = cat.load_table("default.test_partitioned_by_years")
ice = IcebergScanOperator(tab)
2 changes: 1 addition & 1 deletion daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def read_csv(
)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
Expand Down
8 changes: 2 additions & 6 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,9 @@ def read_parquet(
# This is because each Ray worker process receives its own pool of thread workers and connections
multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io

file_format_config = FileFormatConfig.from_parquet_config(
ParquetSourceConfig(
multithreaded_io=multithreaded_io,
)
)
file_format_config = FileFormatConfig.from_parquet_config(ParquetSourceConfig())
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config))

Expand Down
56 changes: 39 additions & 17 deletions daft/io/common.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING

from daft.context import get_context
from daft.daft import (
FileFormatConfig,
NativeStorageConfig,
PythonStorageConfig,
ScanOperatorHandle,
StorageConfig,
)
from daft.datatype import DataType
Expand Down Expand Up @@ -43,21 +45,41 @@ def _get_tabular_files_scan(
io_config = storage_config.config.io_config
else:
raise NotImplementedError(f"Tabular scan with config not implemented: {storage_config.config}")
# TODO(Clark): Move this flag check to the global Daft context.
if os.getenv("DAFT_MICROPARTITIONS", "0") == "1":
# TODO(Clark): Add globbing scan, once implemented.
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config)

runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config)

# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)
# Construct plan
builder = LogicalPlanBuilder.from_tabular_scan(
file_infos=file_infos,
schema=inferred_or_provided_schema,
file_format_config=file_format_config,
storage_config=storage_config,
)
return builder
# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)
scan_op = ScanOperatorHandle.anonymous_scan(
file_infos.file_paths, inferred_or_provided_schema._schema, file_format_config, storage_config
)
builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(
scan_operator=scan_op,
schema_hint=inferred_or_provided_schema,
)
return builder
else:
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config)

# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)
# Construct plan
builder = LogicalPlanBuilder.from_tabular_scan(
file_infos=file_infos,
schema=inferred_or_provided_schema,
file_format_config=file_format_config,
storage_config=storage_config,
)
return builder
Loading