Skip to content

Commit

Permalink
iceberg partition keys
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Oct 26, 2023
1 parent 211bb57 commit 351fc91
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 6 deletions.
61 changes: 59 additions & 2 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,63 @@

from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.partitioning import PartitionField as IcebergPartitionField
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import Table

Check warning on line 8 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L3-L8

Added lines #L3 - L8 were not covered by tests

from daft.io.scan import ScanOperator
from daft.logical.schema import Schema
from daft.datatype import DataType
from daft.expressions.expressions import col
from daft.io.scan import PartitionField, ScanOperator
from daft.logical.schema import Field, Schema

Check warning on line 13 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L10-L13

Added lines #L10 - L13 were not covered by tests


def _iceberg_partition_field_to_daft_partition_field(

Check warning on line 16 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L16

Added line #L16 was not covered by tests
iceberg_schema: IcebergSchema, pfield: IcebergPartitionField
) -> PartitionField:
name = pfield.name
source_id = pfield.source_id
source_field = iceberg_schema.find_field(source_id)
source_name = source_field.name
daft_field = Field.create(

Check warning on line 23 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L19-L23

Added lines #L19 - L23 were not covered by tests
source_name, DataType.from_arrow_type(schema_to_pyarrow(iceberg_schema.find_type(source_name)))
)
transform = pfield.transform
iceberg_result_type = transform.result_type(source_field.field_type)
arrow_result_type = schema_to_pyarrow(iceberg_result_type)
daft_result_type = DataType.from_arrow_type(arrow_result_type)
result_field = Field.create(name, daft_result_type)

Check warning on line 30 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L26-L30

Added lines #L26 - L30 were not covered by tests

from pyiceberg.transforms import (

Check warning on line 32 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L32

Added line #L32 was not covered by tests
DayTransform,
HourTransform,
IdentityTransform,
MonthTransform,
YearTransform,
)

expr = None
if isinstance(transform, IdentityTransform):
expr = col(source_name)
if source_name != name:
expr = expr.alias(name)
elif isinstance(transform, YearTransform):
expr = col(source_name).dt.year().alias(name)
elif isinstance(transform, MonthTransform):
expr = col(source_name).dt.month().alias(name)
elif isinstance(transform, DayTransform):
expr = col(source_name).dt.day().alias(name)
elif isinstance(transform, HourTransform):
raise NotImplementedError("HourTransform not implemented, Please make an issue!")

Check warning on line 52 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L40-L52

Added lines #L40 - L52 were not covered by tests
else:
raise NotImplementedError(f"{transform} not implemented, Please make an issue!")

Check warning on line 54 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L54

Added line #L54 was not covered by tests

assert expr is not None
return PartitionField(result_field, daft_field, transform=expr)

Check warning on line 57 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L56-L57

Added lines #L56 - L57 were not covered by tests


def iceberg_partition_spec_to_fields(iceberg_schema: IcebergSchema, spec: IcebergPartitionSpec) -> list[PartitionField]:
return [_iceberg_partition_field_to_daft_partition_field(iceberg_schema, field) for field in spec.fields]

Check warning on line 61 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L60-L61

Added lines #L60 - L61 were not covered by tests


class IcebergScanOperator(ScanOperator):
Expand All @@ -14,10 +67,14 @@ def __init__(self, iceberg_table: Table) -> None:
self._table = iceberg_table
arrow_schema = schema_to_pyarrow(iceberg_table.schema())
self._schema = Schema.from_pyarrow_schema(arrow_schema)
self._partition_keys = iceberg_partition_spec_to_fields(self._table.schema(), self._table.spec())

Check warning on line 70 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L64-L70

Added lines #L64 - L70 were not covered by tests

def schema(self) -> Schema:
return self._schema

Check warning on line 73 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L72-L73

Added lines #L72 - L73 were not covered by tests

def partitioning_keys(self) -> list[PartitionField]:
return self._partition_keys

Check warning on line 76 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L75-L76

Added lines #L75 - L76 were not covered by tests


def catalog() -> Catalog:
return load_catalog(

Check warning on line 80 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L79-L80

Added lines #L79 - L80 were not covered by tests
Expand Down
16 changes: 12 additions & 4 deletions daft/io/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import abc
from dataclasses import dataclass

Check warning on line 4 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L3-L4

Added lines #L3 - L4 were not covered by tests

from daft.logical.schema import Schema
from daft.expressions.expressions import Expression
from daft.logical.schema import Field, Schema

Check warning on line 7 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L6-L7

Added lines #L6 - L7 were not covered by tests


@dataclass(frozen=True)
Expand All @@ -13,14 +14,21 @@ class ScanTask:
limit: int | None

Check warning on line 14 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L10-L14

Added lines #L10 - L14 were not covered by tests


@dataclass(frozen=True)
class PartitionField:
field: Field
source_field: Field
transform: Expression

Check warning on line 21 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L17-L21

Added lines #L17 - L21 were not covered by tests


class ScanOperator(abc.ABC):
@abc.abstractmethod
def schema(self) -> Schema:
raise NotImplementedError()

Check warning on line 27 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L24-L27

Added lines #L24 - L27 were not covered by tests

# @abc.abstractmethod
# def partitioning_keys(self) -> list[Field]:
# raise NotImplementedError()
@abc.abstractmethod
def partitioning_keys(self) -> list[PartitionField]:
raise NotImplementedError()

Check warning on line 31 in daft/io/scan.py

View check run for this annotation

Codecov / codecov/patch

daft/io/scan.py#L29-L31

Added lines #L29 - L31 were not covered by tests

# @abc.abstractmethod
# def num_partitions(self) -> int:
Expand Down
5 changes: 5 additions & 0 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def _from_pyfield(field: _PyField) -> Field:
f._field = field
return f

@staticmethod
def create(name: str, dtype: DataType) -> Field:
pyfield = _PyField.create(name, dtype._dtype)
return Field._from_pyfield(pyfield)

Check warning on line 39 in daft/logical/schema.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/schema.py#L38-L39

Added lines #L38 - L39 were not covered by tests

@property
def name(self):
return self._field.name()
Expand Down
5 changes: 5 additions & 0 deletions src/daft-core/src/python/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ pub struct PyField {

#[pymethods]
impl PyField {
#[staticmethod]
pub fn create(name: &str, data_type: PyDataType) -> PyResult<Self> {
Ok(datatypes::Field::new(name, data_type.dtype).into())
}

pub fn name(&self) -> PyResult<String> {
Ok(self.field.name.clone())
}
Expand Down

0 comments on commit 351fc91

Please sign in to comment.