From 100c15f1a4afae5d090d49048a630e9e2898e39e Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 2 Nov 2023 09:41:35 +0530 Subject: [PATCH] squash --- daft/iceberg/__init__.py | 0 daft/iceberg/iceberg_scan.py | 94 +++++++++++++++++++ daft/io/scan.py | 51 ++++++++++ src/daft-scan/src/python.rs | 90 +++++++++++++++++- .../iceberg/docker-compose/docker-compose.yml | 4 + 5 files changed, 235 insertions(+), 4 deletions(-) create mode 100644 daft/iceberg/__init__.py create mode 100644 daft/iceberg/iceberg_scan.py create mode 100644 daft/io/scan.py diff --git a/daft/iceberg/__init__.py b/daft/iceberg/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/daft/iceberg/iceberg_scan.py b/daft/iceberg/iceberg_scan.py new file mode 100644 index 0000000000..eb96364d13 --- /dev/null +++ b/daft/iceberg/iceberg_scan.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.io.pyarrow import schema_to_pyarrow +from pyiceberg.partitioning import PartitionField as IcebergPartitionField +from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec +from pyiceberg.schema import Schema as IcebergSchema +from pyiceberg.table import Table + +from daft.datatype import DataType +from daft.expressions.expressions import col +from daft.io.scan import PartitionField, ScanOperator +from daft.logical.schema import Field, Schema + + +def _iceberg_partition_field_to_daft_partition_field( + iceberg_schema: IcebergSchema, pfield: IcebergPartitionField +) -> PartitionField: + name = pfield.name + source_id = pfield.source_id + source_field = iceberg_schema.find_field(source_id) + source_name = source_field.name + daft_field = Field.create( + source_name, DataType.from_arrow_type(schema_to_pyarrow(iceberg_schema.find_type(source_name))) + ) + transform = pfield.transform + iceberg_result_type = transform.result_type(source_field.field_type) + arrow_result_type = schema_to_pyarrow(iceberg_result_type) + daft_result_type = DataType.from_arrow_type(arrow_result_type) + result_field = Field.create(name, daft_result_type) + + from pyiceberg.transforms import ( + DayTransform, + HourTransform, + IdentityTransform, + MonthTransform, + YearTransform, + ) + + expr = None + if isinstance(transform, IdentityTransform): + expr = col(source_name) + if source_name != name: + expr = expr.alias(name) + elif isinstance(transform, YearTransform): + expr = col(source_name).dt.year().alias(name) + elif isinstance(transform, MonthTransform): + expr = col(source_name).dt.month().alias(name) + elif isinstance(transform, DayTransform): + expr = col(source_name).dt.day().alias(name) + elif isinstance(transform, HourTransform): + raise NotImplementedError("HourTransform not implemented, Please make an issue!") + else: + raise NotImplementedError(f"{transform} not implemented, Please make an issue!") + + assert expr is not None + return PartitionField(result_field, daft_field, transform=expr) + + +def iceberg_partition_spec_to_fields(iceberg_schema: IcebergSchema, spec: IcebergPartitionSpec) -> list[PartitionField]: + return [_iceberg_partition_field_to_daft_partition_field(iceberg_schema, field) for field in spec.fields] + + +class IcebergScanOperator(ScanOperator): + def __init__(self, iceberg_table: Table) -> None: + super().__init__() + self._table = iceberg_table + arrow_schema = schema_to_pyarrow(iceberg_table.schema()) + self._schema = Schema.from_pyarrow_schema(arrow_schema) + self._partition_keys = iceberg_partition_spec_to_fields(self._table.schema(), self._table.spec()) + + def schema(self) -> Schema: + return self._schema + + def partitioning_keys(self) -> list[PartitionField]: + return self._partition_keys + + +def catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + +cat = catalog() +tab = cat.load_table("default.test_partitioned_by_years") +ice = IcebergScanOperator(tab) diff --git a/daft/io/scan.py b/daft/io/scan.py new file mode 100644 index 0000000000..afaaf7e08f --- /dev/null +++ b/daft/io/scan.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import abc +from dataclasses import dataclass + +from daft.expressions.expressions import Expression +from daft.logical.schema import Field, Schema + + +@dataclass(frozen=True) +class ScanTask: + file_type: str + columns: list[str] | None + limit: int | None + + +@dataclass(frozen=True) +class PartitionField: + field: Field + source_field: Field + transform: Expression + + +class ScanOperator(abc.ABC): + @abc.abstractmethod + def schema(self) -> Schema: + raise NotImplementedError() + + @abc.abstractmethod + def partitioning_keys(self) -> list[PartitionField]: + raise NotImplementedError() + + # @abc.abstractmethod + # def num_partitions(self) -> int: + # raise NotImplementedError() + + # @abc.abstractmethod + # def filter(self, predicate: Expression) -> tuple[bool, ScanOperator]: + # raise NotImplementedError() + + # @abc.abstractmethod + # def limit(self, num: int) -> ScanOperator: + # raise NotImplementedError() + + # @abc.abstractmethod + # def select(self, columns: list[str]) -> ScanOperator: + # raise NotImplementedError() + + # @abc.abstractmethod + # def to_scan_tasks(self) -> Iterator[Any]: + # raise NotImplementedError() diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index 3a56e6f562..91036aef9b 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -1,7 +1,13 @@ use pyo3::prelude::*; pub mod pylib { + use daft_dsl::col; + use daft_dsl::python::PyExpr; + use pyo3::exceptions::PyNotADirectoryError; + use pyo3::exceptions::PyNotImplementedError; use pyo3::prelude::*; + use std::borrow::BorrowMut; + use std::fmt::Display; use std::str::FromStr; use daft_core::python::schema::PySchema; @@ -10,15 +16,16 @@ pub mod pylib { use crate::anonymous::AnonymousScanOperator; use crate::FileType; + use crate::ScanOperator; use crate::ScanOperatorRef; #[pyclass(module = "daft.daft", frozen)] - pub(crate) struct ScanOperator { + pub(crate) struct ScanOperatorHandle { scan_op: ScanOperatorRef, } #[pymethods] - impl ScanOperator { + impl ScanOperatorHandle { pub fn __repr__(&self) -> PyResult { Ok(format!("{}", self.scan_op)) } @@ -35,12 +42,87 @@ pub mod pylib { FileType::from_str(file_type)?, files, )); - Ok(ScanOperator { scan_op: operator }) + Ok(ScanOperatorHandle { scan_op: operator }) + } + + #[staticmethod] + pub fn from_python_abc(py_scan: PyObject) -> PyResult { + let scan_op: ScanOperatorRef = + Box::new(PythonScanOperatorBridge::from_python_abc(py_scan)?); + Ok(ScanOperatorHandle { scan_op }) + } + } + #[pyclass(module = "daft.daft")] + #[derive(Debug)] + pub(self) struct PythonScanOperatorBridge { + operator: PyObject, + } + #[pymethods] + impl PythonScanOperatorBridge { + #[staticmethod] + pub fn from_python_abc(abc: PyObject) -> PyResult { + Ok(Self { operator: abc }) + } + + pub fn _filter(&self, py: Python, predicate: PyExpr) -> PyResult<(bool, Self)> { + let _from_pyexpr = py + .import(pyo3::intern!(py, "daft.expressions"))? + .getattr(pyo3::intern!(py, "Expression"))? + .getattr(pyo3::intern!(py, "_from_pyexpr"))?; + let expr = _from_pyexpr.call1((predicate,))?; + let result = self.operator.call_method(py, "filter", (expr,), None)?; + let (absorb, new_op) = result.extract::<(bool, PyObject)>(py)?; + Ok((absorb, Self { operator: new_op })) + } + } + + impl Display for PythonScanOperatorBridge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:#?}", self) + } + } + + impl ScanOperator for PythonScanOperatorBridge { + fn filter( + self: Box, + predicate: &daft_dsl::Expr, + ) -> common_error::DaftResult<(bool, ScanOperatorRef)> { + Python::with_gil(|py| { + let (can, new_op) = self._filter( + py, + PyExpr { + expr: predicate.clone(), + }, + )?; + Ok((can, Box::new(new_op) as ScanOperatorRef)) + }) + } + fn limit(self: Box, num: usize) -> common_error::DaftResult { + todo!() + } + fn num_partitions(&self) -> common_error::DaftResult { + todo!() + } + fn partitioning_keys(&self) -> &[crate::PartitionField] { + todo!() + } + fn schema(&self) -> daft_core::schema::SchemaRef { + todo!() + } + fn select(self: Box, columns: &[&str]) -> common_error::DaftResult { + todo!() + } + fn to_scan_tasks( + self: Box, + ) -> common_error::DaftResult< + Box>>, + > { + todo!() } } } pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { - parent.add_class::()?; + parent.add_class::()?; Ok(()) } diff --git a/tests/integration/iceberg/docker-compose/docker-compose.yml b/tests/integration/iceberg/docker-compose/docker-compose.yml index 53b96dff23..a805797e89 100644 --- a/tests/integration/iceberg/docker-compose/docker-compose.yml +++ b/tests/integration/iceberg/docker-compose/docker-compose.yml @@ -18,6 +18,8 @@ version: '3' services: spark-iceberg: + depends_on: + - rest image: python-integration container_name: pyiceberg-spark build: . @@ -39,6 +41,8 @@ services: - rest:rest - minio:minio rest: + depends_on: + - mc image: tabulario/iceberg-rest container_name: pyiceberg-rest networks: