Skip to content

Commit

Permalink
[FEAT] Iceberg partitioned writes (#2842)
Browse files Browse the repository at this point in the history
This also changes the behavior of some of the partitioning functions in
ways that I would consider as bug fixes. However @samster25 maybe you
should take a look at them to make sure their new behavior is correct.
The changes:
- truncate function now renames columns to `{}_trunc` instead of
`{}_truncate` to match Spark behavior
- day partitioning now returns an Int32Array instead of a DateArray. I
believe the past behavior was there to match pyiceberg, but talking to
the pyiceberg team, this actually seems like a bug. I plan on making a
PR to pyiceberg to fix this, but I have also moved and fixed the buggy
logic to our codebase so that it works with past versions of pyiceberg
as well.
  • Loading branch information
kevinzwang authored Sep 20, 2024
1 parent c5b7062 commit 2c13f17
Show file tree
Hide file tree
Showing 21 changed files with 873 additions and 298 deletions.
3 changes: 2 additions & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ from daft.udf import PartialStatefulUDF, PartialStatelessUDF

if TYPE_CHECKING:
import pyarrow as pa
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

Expand Down Expand Up @@ -1699,7 +1700,7 @@ class LogicalPlanBuilder:
self,
table_name: str,
table_location: str,
spec_id: int,
partition_spec: IcebergPartitionSpec,
iceberg_schema: IcebergSchema,
iceberg_properties: IcebergTableProperties,
catalog_columns: list[str],
Expand Down
37 changes: 25 additions & 12 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,13 +647,13 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
DataFrame: The operations that occurred with this write.
"""

if len(table.spec().fields) > 0:
raise ValueError("Cannot write to partitioned Iceberg tables")

import pyarrow as pa
import pyiceberg
from packaging.version import parse

if len(table.spec().fields) > 0 and parse(pyiceberg.__version__) < parse("0.7.0"):
raise ValueError("pyiceberg>=0.7.0 is required to write to a partitioned table")

if parse(pyiceberg.__version__) < parse("0.6.0"):
raise ValueError(f"Write Iceberg is only supported on pyiceberg>=0.6.0, found {pyiceberg.__version__}")

Expand Down Expand Up @@ -683,19 +683,28 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
else:
deleted_files = []

schema = table.schema()
partitioning: Dict[str, list] = {schema.find_field(field.source_id).name: [] for field in table.spec().fields}

for data_file in data_files:
operations.append("ADD")
path.append(data_file.file_path)
rows.append(data_file.record_count)
size.append(data_file.file_size_in_bytes)

for field in partitioning.keys():
partitioning[field].append(getattr(data_file.partition, field, None))

for pf in deleted_files:
data_file = pf.file
operations.append("DELETE")
path.append(data_file.file_path)
rows.append(data_file.record_count)
size.append(data_file.file_size_in_bytes)

for field in partitioning.keys():
partitioning[field].append(getattr(data_file.partition, field, None))

if parse(pyiceberg.__version__) >= parse("0.7.0"):
from pyiceberg.table import ALWAYS_TRUE, PropertyUtil, TableProperties

Expand Down Expand Up @@ -735,19 +744,23 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->

merge.commit()

with_operations = {
"operation": pa.array(operations, type=pa.string()),
"rows": pa.array(rows, type=pa.int64()),
"file_size": pa.array(size, type=pa.int64()),
"file_name": pa.array([fp for fp in path], type=pa.string()),
}

if partitioning:
with_operations["partitioning"] = pa.StructArray.from_arrays(
partitioning.values(), names=partitioning.keys()
)

from daft import from_pydict

with_operations = from_pydict(
{
"operation": pa.array(operations, type=pa.string()),
"rows": pa.array(rows, type=pa.int64()),
"file_size": pa.array(size, type=pa.int64()),
"file_name": pa.array([os.path.basename(fp) for fp in path], type=pa.string()),
}
)
# NOTE: We are losing the history of the plan here.
# This is due to the fact that the logical plan of the write_iceberg returns datafiles but we want to return the above data
return with_operations
return from_pydict(with_operations)

@DataframePublicAPI
def write_deltalake(
Expand Down
5 changes: 3 additions & 2 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
if TYPE_CHECKING:
import pathlib

from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

Expand Down Expand Up @@ -390,7 +391,7 @@ class WriteIceberg(SingleOutputInstruction):
base_path: str
iceberg_schema: IcebergSchema
iceberg_properties: IcebergTableProperties
spec_id: int
partition_spec: IcebergPartitionSpec
io_config: IOConfig | None

def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
Expand Down Expand Up @@ -418,7 +419,7 @@ def _handle_file_write(self, input: MicroPartition) -> MicroPartition:
base_path=self.base_path,
schema=self.iceberg_schema,
properties=self.iceberg_properties,
spec_id=self.spec_id,
partition_spec=self.partition_spec,
io_config=self.io_config,
)

Expand Down
5 changes: 3 additions & 2 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
if TYPE_CHECKING:
import pathlib

from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

Expand Down Expand Up @@ -120,7 +121,7 @@ def iceberg_write(
base_path: str,
iceberg_schema: IcebergSchema,
iceberg_properties: IcebergTableProperties,
spec_id: int,
partition_spec: IcebergPartitionSpec,
io_config: IOConfig | None,
) -> InProgressPhysicalPlan[PartitionT]:
"""Write the results of `child_plan` into pyiceberg data files described by `write_info`."""
Expand All @@ -131,7 +132,7 @@ def iceberg_write(
base_path=base_path,
iceberg_schema=iceberg_schema,
iceberg_properties=iceberg_properties,
spec_id=spec_id,
partition_spec=partition_spec,
io_config=io_config,
),
)
Expand Down
5 changes: 3 additions & 2 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from daft.runners.partitioning import PartitionT

if TYPE_CHECKING:
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

Expand Down Expand Up @@ -344,15 +345,15 @@ def write_iceberg(
base_path: str,
iceberg_schema: IcebergSchema,
iceberg_properties: IcebergTableProperties,
spec_id: int,
partition_spec: IcebergPartitionSpec,
io_config: IOConfig | None,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
return physical_plan.iceberg_write(
input,
base_path=base_path,
iceberg_schema=iceberg_schema,
iceberg_properties=iceberg_properties,
spec_id=spec_id,
partition_spec=partition_spec,
io_config=io_config,
)

Expand Down
2 changes: 1 addition & 1 deletion daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3307,7 +3307,7 @@ def days(self) -> Expression:
"""Partitioning Transform that returns the number of days since epoch (1970-01-01)
Returns:
Expression: Date Type Expression
Expression: Int32 Expression in days
"""
return Expression._from_pyexpr(self._expr.partitioning_days())

Expand Down
17 changes: 13 additions & 4 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ def _iceberg_partition_field_to_daft_partition_field(
source_name, DataType.from_arrow_type(schema_to_pyarrow(iceberg_schema.find_type(source_name)))
)
transform = pfield.transform
iceberg_result_type = transform.result_type(source_field.field_type)
arrow_result_type = schema_to_pyarrow(iceberg_result_type)
daft_result_type = DataType.from_arrow_type(arrow_result_type)
result_field = Field.create(name, daft_result_type)
source_type = DataType.from_arrow_type(schema_to_pyarrow(source_field.field_type))

from pyiceberg.transforms import (
BucketTransform,
DayTransform,
Expand All @@ -62,22 +60,33 @@ def _iceberg_partition_field_to_daft_partition_field(
tfm = None
if isinstance(transform, IdentityTransform):
tfm = PartitionTransform.identity()
result_type = source_type
elif isinstance(transform, YearTransform):
tfm = PartitionTransform.year()
result_type = DataType.int32()
elif isinstance(transform, MonthTransform):
tfm = PartitionTransform.month()
result_type = DataType.int32()
elif isinstance(transform, DayTransform):
tfm = PartitionTransform.day()
# pyiceberg uses date as the result type of a day transform, which is incorrect
# so we cannot use transform.result_type() here
result_type = DataType.int32()
elif isinstance(transform, HourTransform):
tfm = PartitionTransform.hour()
result_type = DataType.int32()
elif isinstance(transform, BucketTransform):
n = transform.num_buckets
tfm = PartitionTransform.iceberg_bucket(n)
result_type = DataType.int32()
elif isinstance(transform, TruncateTransform):
w = transform.width
tfm = PartitionTransform.iceberg_truncate(w)
result_type = source_type
else:
warnings.warn(f"{transform} not implemented, Please make an issue!")
result_type = source_type
result_field = Field.create(name, result_type)
return make_partition_field(result_field, daft_field, transform=tfm)


Expand Down
Loading

0 comments on commit 2c13f17

Please sign in to comment.