Skip to content

Commit

Permalink
[FEAT] Delta Lake Writer (non-public API) (#2304)
Browse files Browse the repository at this point in the history
Continuation of work by @siddharth-gulia in #2073 on our Delta Lake
writer

---------

Co-authored-by: siddharth kumar <[email protected]>
  • Loading branch information
kevinzwang and sherlockbeard authored May 24, 2024
1 parent f5b8dfa commit 06b00a6
Show file tree
Hide file tree
Showing 23 changed files with 748 additions and 15 deletions.
2 changes: 2 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ def set_execution_config(
ctx = get_context()
with ctx._lock:
old_daft_execution_config = ctx._daft_execution_config if config is None else config

# TODO: Re-addd Parquet configs when we are ready to support Delta Lake writes
new_daft_execution_config = old_daft_execution_config.with_config_values(
scan_tasks_min_size_bytes=scan_tasks_min_size_bytes,
scan_tasks_max_size_bytes=scan_tasks_max_size_bytes,
Expand Down
21 changes: 21 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,15 @@ class LogicalPlanBuilder:
catalog_columns: list[str],
io_config: IOConfig | None = None,
) -> LogicalPlanBuilder: ...
def delta_write(
self,
path: str,
columns_name: list[str],
mode: str,
current_version: int,
large_dtypes: bool,
io_config: IOConfig | None = None,
) -> LogicalPlanBuilder: ...
def schema(self) -> PySchema: ...
def optimize(self) -> LogicalPlanBuilder: ...
def to_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> PhysicalPlanScheduler: ...
Expand All @@ -1426,6 +1435,10 @@ class PyDaftExecutionConfig:
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
parquet_target_row_group_size: int | None = None,
parquet_max_open_files: int | None = None,
parquet_max_rows_per_file: int | None = None,
parquet_min_rows_per_group: int | None = None,
parquet_max_rows_per_group: int | None = None,
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
Expand All @@ -1450,6 +1463,14 @@ class PyDaftExecutionConfig:
@property
def parquet_target_row_group_size(self) -> int: ...
@property
def parquet_max_open_files(self) -> int: ...
@property
def parquet_max_rows_per_file(self) -> int: ...
@property
def parquet_min_rows_per_group(self) -> int: ...
@property
def parquet_max_rows_per_group(self) -> int: ...
@property
def parquet_inflation_factor(self) -> float: ...
@property
def csv_target_filesize(self) -> int: ...
Expand Down
111 changes: 110 additions & 1 deletion daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,20 @@
from daft.api_annotations import DataframePublicAPI
from daft.context import get_context
from daft.convert import InputListType
from daft.daft import FileFormat, IOConfig, JoinStrategy, JoinType, ResourceRequest
from daft.daft import (
FileFormat,
IOConfig,
JoinStrategy,
JoinType,
NativeStorageConfig,
ResourceRequest,
StorageConfig,
)
from daft.dataframe.preview import DataFramePreview
from daft.datatype import DataType
from daft.delta_lake.delta_lake_storage_function import (
_storage_config_to_storage_options,
)
from daft.errors import ExpressionTypeError
from daft.expressions import Expression, ExpressionsProjection, col, lit
from daft.logical.builder import LogicalPlanBuilder
Expand Down Expand Up @@ -534,6 +545,104 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
# This is due to the fact that the logical plan of the write_iceberg returns datafiles but we want to return the above data
return with_operations

def write_delta(
self,
path: str,
mode: str = "append",
io_config: Optional[IOConfig] = None,
) -> None:
import deltalake
import pyarrow as pa
from deltalake.schema import _convert_pa_schema_to_delta
from deltalake.writer import (
try_get_table_and_table_uri,
write_deltalake_pyarrow,
)
from packaging.version import parse

if mode not in ["append"]:
raise ValueError(f"Mode {mode} is not supported. Only 'append' mode is supported")

if parse(deltalake.__version__) < parse("0.14.0"):
raise ValueError(f"Write delta lake is only supported on deltalake>=0.14.0, found {deltalake.__version__}")

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_config = StorageConfig.native(NativeStorageConfig(False, io_config))
storage_options = _storage_config_to_storage_options(storage_config, path)
table, table_uri = try_get_table_and_table_uri(path, storage_options)
if table is not None:
storage_options = table._storage_options or {}
storage_options.update(storage_options or {})

table.update_incremental()

fields = [f for f in self.schema()]
pyarrow_fields = [pa.field(f.name, f.dtype.to_arrow_dtype()) for f in fields]
pyarrow_schema = pa.schema(pyarrow_fields)

delta_schema = _convert_pa_schema_to_delta(pyarrow_schema, large_dtypes=True)
if table:
if delta_schema != table.schema().to_pyarrow(as_large_types=True):
raise ValueError(
"Schema of data does not match table schema\n"
f"Data schema:\n{delta_schema}\nTable Schema:\n{table.schema().to_pyarrow(as_large_types=True)}"
)
if mode == "error":
raise AssertionError("DeltaTable already exists.")
elif mode == "ignore":
return

current_version = table.version()

else:
current_version = -1

builder = self._builder.write_delta(
path=path,
mode=mode,
current_version=current_version,
large_dtypes=True,
io_config=io_config,
)
write_df = DataFrame(builder)
write_df.collect()

write_result = write_df.to_pydict()
assert "data_file" in write_result
data_files = write_result["data_file"]
add_action = []

operations = []
respath = []
size = []

for data_file in data_files:
operations.append("ADD")
respath.append(data_file.path)
size.append(data_file.size)
add_action.append(data_file)

if table is None:
write_deltalake_pyarrow(
table_uri,
delta_schema,
add_action,
mode,
[],
storage_options=storage_options,
)
else:
table._table.create_write_transaction(
add_action,
mode,
[],
delta_schema,
None,
)
table.update_incremental()

return None

###
# DataFrame operations
###
Expand Down
78 changes: 78 additions & 0 deletions daft/delta_lake/delta_lake_storage_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import annotations

from typing import Any
from urllib.parse import urlparse

from daft.daft import (
AzureConfig,
GCSConfig,
IOConfig,
NativeStorageConfig,
S3Config,
StorageConfig,
)


def _storage_config_to_storage_options(storage_config: StorageConfig, table_uri: str) -> dict[str, str] | None:
"""
Converts the Daft storage config to a storage options dict that deltalake/object_store
understands.
"""
config = storage_config.config
assert isinstance(config, NativeStorageConfig)
io_config = config.io_config
return _io_config_to_storage_options(io_config, table_uri)


def _io_config_to_storage_options(io_config: IOConfig, table_uri: str) -> dict[str, str] | None:
scheme = urlparse(table_uri).scheme
if scheme == "s3" or scheme == "s3a":
return _s3_config_to_storage_options(io_config.s3)
elif scheme == "gcs" or scheme == "gs":
return _gcs_config_to_storage_options(io_config.gcs)
elif scheme == "az" or scheme == "abfs":
return _azure_config_to_storage_options(io_config.azure)
else:
return None


def _s3_config_to_storage_options(s3_config: S3Config) -> dict[str, str]:
storage_options: dict[str, Any] = {}
if s3_config.region_name is not None:
storage_options["region"] = s3_config.region_name
if s3_config.endpoint_url is not None:
storage_options["endpoint_url"] = s3_config.endpoint_url
if s3_config.key_id is not None:
storage_options["access_key_id"] = s3_config.key_id
if s3_config.session_token is not None:
storage_options["session_token"] = s3_config.session_token
if s3_config.access_key is not None:
storage_options["secret_access_key"] = s3_config.access_key
if s3_config.use_ssl is not None:
storage_options["allow_http"] = "false" if s3_config.use_ssl else "true"
if s3_config.verify_ssl is not None:
storage_options["allow_invalid_certificates"] = "false" if s3_config.verify_ssl else "true"
if s3_config.connect_timeout_ms is not None:
storage_options["connect_timeout"] = str(s3_config.connect_timeout_ms) + "ms"
if s3_config.anonymous:
raise ValueError(
"Reading from DeltaLake does not support anonymous mode! Please supply credentials via your S3Config."
)
return storage_options


def _azure_config_to_storage_options(azure_config: AzureConfig) -> dict[str, str]:
storage_options = {}
if azure_config.storage_account is not None:
storage_options["account_name"] = azure_config.storage_account
if azure_config.access_key is not None:
storage_options["access_key"] = azure_config.access_key
if azure_config.endpoint_url is not None:
storage_options["endpoint"] = azure_config.endpoint_url
if azure_config.use_ssl is not None:
storage_options["allow_http"] = "false" if azure_config.use_ssl else "true"
return storage_options


def _gcs_config_to_storage_options(_: GCSConfig) -> dict[str, str]:
return {}
36 changes: 36 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,42 @@ def _handle_file_write(self, input: MicroPartition) -> MicroPartition:
)


@dataclass(frozen=True)
class WriteDeltaLake(SingleOutputInstruction):
base_path: str
large_dtypes: bool
current_version: int
io_config: IOConfig | None

def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
return self._write_deltalake(inputs)

def _write_deltalake(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
[input] = inputs
partition = self._handle_file_write(
input=input,
)
return [partition]

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
assert len(input_metadatas) == 1
return [
PartialPartitionMetadata(
num_rows=None, # we can write more than 1 file per partition
size_bytes=None,
)
]

def _handle_file_write(self, input: MicroPartition) -> MicroPartition:
return table_io.write_deltalake(
input,
large_dtypes=self.large_dtypes,
base_path=self.base_path,
current_version=self.current_version,
io_config=self.io_config,
)


@dataclass(frozen=True)
class Filter(SingleOutputInstruction):
predicate: ExpressionsProjection
Expand Down
34 changes: 33 additions & 1 deletion daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@
import math
import pathlib
from collections import deque
from typing import TYPE_CHECKING, Generator, Generic, Iterable, Iterator, TypeVar, Union
from typing import (
TYPE_CHECKING,
Generator,
Generic,
Iterable,
Iterator,
TypeVar,
Union,
)

from daft.context import get_context
from daft.daft import FileFormat, IOConfig, JoinType, ResourceRequest
Expand Down Expand Up @@ -130,6 +138,30 @@ def iceberg_write(
)


def deltalake_write(
child_plan: InProgressPhysicalPlan[PartitionT],
base_path: str,
large_dtypes: bool,
current_version: int,
io_config: IOConfig | None,
) -> InProgressPhysicalPlan[PartitionT]:
"""Write the results of `child_plan` into pyiceberg data files described by `write_info`."""

yield from (
step.add_instruction(
execution_step.WriteDeltaLake(
base_path=base_path,
large_dtypes=large_dtypes,
current_version=current_version,
io_config=io_config,
),
)
if isinstance(step, PartitionTaskBuilder)
else step
for step in child_plan
)


def pipeline_instruction(
child_plan: InProgressPhysicalPlan[PartitionT],
pipeable_instruction: Instruction,
Expand Down
16 changes: 16 additions & 0 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,19 @@ def write_iceberg(
spec_id=spec_id,
io_config=io_config,
)


def write_deltalake(
input: physical_plan.InProgressPhysicalPlan[PartitionT],
path: str,
large_dtypes: bool,
current_version: int,
io_config: IOConfig | None,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
return physical_plan.deltalake_write(
input,
path,
large_dtypes,
current_version,
io_config,
)
19 changes: 19 additions & 0 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,22 @@ def write_iceberg(self, table: IcebergTable) -> LogicalPlanBuilder:
io_config = _convert_iceberg_file_io_properties_to_io_config(table.io.properties)
builder = self._builder.iceberg_write(name, location, spec_id, schema, props, columns, io_config)
return LogicalPlanBuilder(builder)

def write_delta(
self,
path: str | pathlib.Path,
mode: str,
current_version: int,
large_dtypes: bool,
io_config: IOConfig,
) -> LogicalPlanBuilder:
columns_name = self.schema().column_names()
builder = self._builder.delta_write(
str(path),
columns_name,
mode,
current_version,
large_dtypes,
io_config,
)
return LogicalPlanBuilder(builder)
Loading

0 comments on commit 06b00a6

Please sign in to comment.