Skip to content

Commit

Permalink
squash
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Nov 2, 2023
1 parent 66a7245 commit 100c15f
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 4 deletions.
Empty file added daft/iceberg/__init__.py
Empty file.
94 changes: 94 additions & 0 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

Check warning on line 1 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L1

Added line #L1 was not covered by tests

from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.partitioning import PartitionField as IcebergPartitionField
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import Table

Check warning on line 8 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L3-L8

Added lines #L3 - L8 were not covered by tests

from daft.datatype import DataType
from daft.expressions.expressions import col
from daft.io.scan import PartitionField, ScanOperator
from daft.logical.schema import Field, Schema

Check warning on line 13 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L10-L13

Added lines #L10 - L13 were not covered by tests


def _iceberg_partition_field_to_daft_partition_field(

Check warning on line 16 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L16

Added line #L16 was not covered by tests
iceberg_schema: IcebergSchema, pfield: IcebergPartitionField
) -> PartitionField:
name = pfield.name
source_id = pfield.source_id
source_field = iceberg_schema.find_field(source_id)
source_name = source_field.name
daft_field = Field.create(

Check warning on line 23 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L19-L23

Added lines #L19 - L23 were not covered by tests
source_name, DataType.from_arrow_type(schema_to_pyarrow(iceberg_schema.find_type(source_name)))
)
transform = pfield.transform
iceberg_result_type = transform.result_type(source_field.field_type)
arrow_result_type = schema_to_pyarrow(iceberg_result_type)
daft_result_type = DataType.from_arrow_type(arrow_result_type)
result_field = Field.create(name, daft_result_type)

Check warning on line 30 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L26-L30

Added lines #L26 - L30 were not covered by tests

from pyiceberg.transforms import (

Check warning on line 32 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L32

Added line #L32 was not covered by tests
DayTransform,
HourTransform,
IdentityTransform,
MonthTransform,
YearTransform,
)

expr = None
if isinstance(transform, IdentityTransform):
expr = col(source_name)
if source_name != name:
expr = expr.alias(name)
elif isinstance(transform, YearTransform):
expr = col(source_name).dt.year().alias(name)
elif isinstance(transform, MonthTransform):
expr = col(source_name).dt.month().alias(name)
elif isinstance(transform, DayTransform):
expr = col(source_name).dt.day().alias(name)
elif isinstance(transform, HourTransform):
raise NotImplementedError("HourTransform not implemented, Please make an issue!")

Check warning on line 52 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L40-L52

Added lines #L40 - L52 were not covered by tests
else:
raise NotImplementedError(f"{transform} not implemented, Please make an issue!")

Check warning on line 54 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L54

Added line #L54 was not covered by tests

assert expr is not None
return PartitionField(result_field, daft_field, transform=expr)

Check warning on line 57 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L56-L57

Added lines #L56 - L57 were not covered by tests


def iceberg_partition_spec_to_fields(iceberg_schema: IcebergSchema, spec: IcebergPartitionSpec) -> list[PartitionField]:
return [_iceberg_partition_field_to_daft_partition_field(iceberg_schema, field) for field in spec.fields]

Check warning on line 61 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L60-L61

Added lines #L60 - L61 were not covered by tests


class IcebergScanOperator(ScanOperator):
def __init__(self, iceberg_table: Table) -> None:
super().__init__()
self._table = iceberg_table
arrow_schema = schema_to_pyarrow(iceberg_table.schema())
self._schema = Schema.from_pyarrow_schema(arrow_schema)
self._partition_keys = iceberg_partition_spec_to_fields(self._table.schema(), self._table.spec())

Check warning on line 70 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L64-L70

Added lines #L64 - L70 were not covered by tests

def schema(self) -> Schema:
return self._schema

Check warning on line 73 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L72-L73

Added lines #L72 - L73 were not covered by tests

def partitioning_keys(self) -> list[PartitionField]:
return self._partition_keys

Check warning on line 76 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L75-L76

Added lines #L75 - L76 were not covered by tests


def catalog() -> Catalog:
return load_catalog(

Check warning on line 80 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L79-L80

Added lines #L79 - L80 were not covered by tests
"local",
**{
"type": "rest",
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)


cat = catalog()
tab = cat.load_table("default.test_partitioned_by_years")
ice = IcebergScanOperator(tab)

Check warning on line 94 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L92-L94

Added lines #L92 - L94 were not covered by tests
51 changes: 51 additions & 0 deletions daft/io/scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from __future__ import annotations

Check warning on line 1 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L1

Added line #L1 was not covered by tests

import abc
from dataclasses import dataclass

Check warning on line 4 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L3-L4

Added lines #L3 - L4 were not covered by tests

from daft.expressions.expressions import Expression
from daft.logical.schema import Field, Schema

Check warning on line 7 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L6-L7

Added lines #L6 - L7 were not covered by tests


@dataclass(frozen=True)
class ScanTask:
file_type: str
columns: list[str] | None
limit: int | None

Check warning on line 14 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L10-L14

Added lines #L10 - L14 were not covered by tests


@dataclass(frozen=True)
class PartitionField:
field: Field
source_field: Field
transform: Expression

Check warning on line 21 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L17-L21

Added lines #L17 - L21 were not covered by tests


class ScanOperator(abc.ABC):
@abc.abstractmethod
def schema(self) -> Schema:
raise NotImplementedError()

Check warning on line 27 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L24-L27

Added lines #L24 - L27 were not covered by tests

@abc.abstractmethod
def partitioning_keys(self) -> list[PartitionField]:
raise NotImplementedError()

Check warning on line 31 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L29-L31

Added lines #L29 - L31 were not covered by tests

# @abc.abstractmethod
# def num_partitions(self) -> int:
# raise NotImplementedError()

# @abc.abstractmethod
# def filter(self, predicate: Expression) -> tuple[bool, ScanOperator]:
# raise NotImplementedError()

# @abc.abstractmethod
# def limit(self, num: int) -> ScanOperator:
# raise NotImplementedError()

# @abc.abstractmethod
# def select(self, columns: list[str]) -> ScanOperator:
# raise NotImplementedError()

# @abc.abstractmethod
# def to_scan_tasks(self) -> Iterator[Any]:
# raise NotImplementedError()
90 changes: 86 additions & 4 deletions src/daft-scan/src/python.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use pyo3::prelude::*;

pub mod pylib {
use daft_dsl::col;
use daft_dsl::python::PyExpr;
use pyo3::exceptions::PyNotADirectoryError;
use pyo3::exceptions::PyNotImplementedError;
use pyo3::prelude::*;
use std::borrow::BorrowMut;
use std::fmt::Display;
use std::str::FromStr;

use daft_core::python::schema::PySchema;
Expand All @@ -10,15 +16,16 @@ pub mod pylib {

use crate::anonymous::AnonymousScanOperator;
use crate::FileType;
use crate::ScanOperator;
use crate::ScanOperatorRef;

#[pyclass(module = "daft.daft", frozen)]
pub(crate) struct ScanOperator {
pub(crate) struct ScanOperatorHandle {
scan_op: ScanOperatorRef,
}

#[pymethods]
impl ScanOperator {
impl ScanOperatorHandle {
pub fn __repr__(&self) -> PyResult<String> {
Ok(format!("{}", self.scan_op))
}
Expand All @@ -35,12 +42,87 @@ pub mod pylib {
FileType::from_str(file_type)?,
files,
));
Ok(ScanOperator { scan_op: operator })
Ok(ScanOperatorHandle { scan_op: operator })
}

#[staticmethod]
pub fn from_python_abc(py_scan: PyObject) -> PyResult<Self> {
let scan_op: ScanOperatorRef =
Box::new(PythonScanOperatorBridge::from_python_abc(py_scan)?);
Ok(ScanOperatorHandle { scan_op })
}
}
#[pyclass(module = "daft.daft")]
#[derive(Debug)]
pub(self) struct PythonScanOperatorBridge {
operator: PyObject,
}
#[pymethods]
impl PythonScanOperatorBridge {
#[staticmethod]
pub fn from_python_abc(abc: PyObject) -> PyResult<Self> {
Ok(Self { operator: abc })
}

pub fn _filter(&self, py: Python, predicate: PyExpr) -> PyResult<(bool, Self)> {
let _from_pyexpr = py
.import(pyo3::intern!(py, "daft.expressions"))?
.getattr(pyo3::intern!(py, "Expression"))?
.getattr(pyo3::intern!(py, "_from_pyexpr"))?;
let expr = _from_pyexpr.call1((predicate,))?;
let result = self.operator.call_method(py, "filter", (expr,), None)?;
let (absorb, new_op) = result.extract::<(bool, PyObject)>(py)?;
Ok((absorb, Self { operator: new_op }))
}
}

impl Display for PythonScanOperatorBridge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:#?}", self)
}
}

impl ScanOperator for PythonScanOperatorBridge {
fn filter(
self: Box<Self>,
predicate: &daft_dsl::Expr,
) -> common_error::DaftResult<(bool, ScanOperatorRef)> {
Python::with_gil(|py| {
let (can, new_op) = self._filter(
py,
PyExpr {
expr: predicate.clone(),
},
)?;
Ok((can, Box::new(new_op) as ScanOperatorRef))
})
}
fn limit(self: Box<Self>, num: usize) -> common_error::DaftResult<ScanOperatorRef> {
todo!()
}
fn num_partitions(&self) -> common_error::DaftResult<usize> {
todo!()
}
fn partitioning_keys(&self) -> &[crate::PartitionField] {
todo!()
}
fn schema(&self) -> daft_core::schema::SchemaRef {
todo!()
}
fn select(self: Box<Self>, columns: &[&str]) -> common_error::DaftResult<ScanOperatorRef> {
todo!()
}
fn to_scan_tasks(
self: Box<Self>,
) -> common_error::DaftResult<
Box<dyn Iterator<Item = common_error::DaftResult<crate::ScanTask>>>,
> {
todo!()
}
}
}

pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<pylib::ScanOperator>()?;
parent.add_class::<pylib::ScanOperatorHandle>()?;
Ok(())
}
4 changes: 4 additions & 0 deletions tests/integration/iceberg/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ version: '3'

services:
spark-iceberg:
depends_on:
- rest
image: python-integration
container_name: pyiceberg-spark
build: .
Expand All @@ -39,6 +41,8 @@ services:
- rest:rest
- minio:minio
rest:
depends_on:
- mc
image: tabulario/iceberg-rest
container_name: pyiceberg-rest
networks:
Expand Down

0 comments on commit 100c15f

Please sign in to comment.