Skip to content

Commit

Permalink
feat: Unity Catalog writes using daft.DataFrame.write_deltalake() (#…
Browse files Browse the repository at this point in the history
…3522)

This pull request covers the below 4 workflows that were tested
internally (on Databricks on Azure and AWS) after building the package
in a local environment:

- Load existing table in Unity Catalog and append to it without schema
change : `df.write_deltalake( uc_table, mode=‘append’)` to existing
table in UC retrieved using `unity.load_table(table_name)`
- Load existing table in Unity Catalog and overwrite it without schema
change : `df.write_deltalake( uc_table, mode=‘overwrite’)` overwrite
existing table in UC retrieved using `unity.load_table(table_name)`
- Load existing table in Unity Catalog and overwrite it with schema
change : `df.w rite_deltalake( uc_table, mode=‘overwrite’, schema_mode =
‘overwrite’)` overwrite existing table, with schema change, in UC
retrieved using `unity.load_table(table_name)`
- Create new table in Unity Catalog using Daft engine and populate it
with data : Register a new table in UC without any schema using
`unity.load_table(table_name,
storage_path=“<some_valid_cloud_storage_path>”)` and
`df.write_deltalake( uc_table, mode=‘overwrite’ , schema_mode =
‘overwrite’)`

A few notes :
- `deltalake` (0.22.3) does not support writes to table with Deletion
vectors enabled. For appends to existing table, to avoid
`CommitFailedError: Unsupported reader features required:
[DeletionVectors]`, ensure the tables being written to do not have
Deletion vector enabled.

- `httpx==0.27.2` pinned dependency is due to a defect with
unitycatalog-python, which is affecting Daft as well for all the
previous versions. Fixing it from this PR.

- If schema updates are performed by Daft, readers will immediately see
the new schema since Delta log is self-containing. However, in Unity
Catalog UI for the schema to update, will need to use `REPAIR TABLE
catalog.schema.table SYNC METADATA;` from Databricks compute to update
UC metadata to match what is in Delta log.

- In this version, append to an existing table after changing schema is
not supported. Only overwrites are supported.

- For AWS, needed to set environment variable using `export
AWS_S3_ALLOW_UNSAFE_RENAME=true`.
- There appears to be a defect with the `allow_unsafe_rename` parameter
in df.write_deltalake as it did not work during internal testing. This
could be a new issue to log , once confirmed.

---------

Co-authored-by: Kev Wang <[email protected]>
  • Loading branch information
anilmenon14 and kevinzwang authored Dec 11, 2024
1 parent c057493 commit dd2dc23
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 7 deletions.
15 changes: 12 additions & 3 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import torch

from daft.io import DataCatalogTable
from daft.unity_catalog import UnityCatalogTable

from daft.logical.schema import Schema

Expand Down Expand Up @@ -826,7 +827,7 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
@DataframePublicAPI
def write_deltalake(
self,
table: Union[str, pathlib.Path, "DataCatalogTable", "deltalake.DeltaTable"],
table: Union[str, pathlib.Path, "DataCatalogTable", "deltalake.DeltaTable", "UnityCatalogTable"],
partition_cols: Optional[List[str]] = None,
mode: Literal["append", "overwrite", "error", "ignore"] = "append",
schema_mode: Optional[Literal["merge", "overwrite"]] = None,
Expand All @@ -844,7 +845,7 @@ def write_deltalake(
This call is **blocking** and will execute the DataFrame when called
Args:
table (Union[str, pathlib.Path, DataCatalogTable, deltalake.DeltaTable]): Destination `Delta Lake Table <https://delta-io.github.io/delta-rs/api/delta_table/>`__ or table URI to write dataframe to.
table (Union[str, pathlib.Path, DataCatalogTable, deltalake.DeltaTable, UnityCatalogTable]): Destination `Delta Lake Table <https://delta-io.github.io/delta-rs/api/delta_table/>`__ or table URI to write dataframe to.
partition_cols (List[str], optional): How to subpartition each partition further. If table exists, expected to match table's existing partitioning scheme, otherwise creates the table with specified partition columns. Defaults to None.
mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data, `error` will raise an error if table already exists, and `ignore` will not write anything if table already exists. Defaults to "append".
schema_mode (str, optional): Schema mode of the write. If set to `overwrite`, allows replacing the schema of the table when doing `mode=overwrite`. Schema mode `merge` is currently not supported.
Expand Down Expand Up @@ -872,6 +873,7 @@ def write_deltalake(
from daft.io import DataCatalogTable
from daft.io._deltalake import large_dtypes_kwargs
from daft.io.object_store_options import io_config_to_storage_options
from daft.unity_catalog import UnityCatalogTable

if schema_mode == "merge":
raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.")
Expand All @@ -881,14 +883,21 @@ def write_deltalake(

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

if isinstance(table, (str, pathlib.Path, DataCatalogTable)):
if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)):
if isinstance(table, str):
table_uri = table
elif isinstance(table, pathlib.Path):
table_uri = str(table)
elif isinstance(table, UnityCatalogTable):
table_uri = table.table_uri
io_config = table.io_config
else:
table_uri = table.table_uri(io_config)

if io_config is None:
raise ValueError(
"io_config was not provided to write_deltalake and could not be retrieved from the default configuration."
)
storage_options = io_config_to_storage_options(io_config, table_uri) or {}
table = try_get_deltatable(table_uri, storage_options=storage_options)
elif isinstance(table, deltalake.DeltaTable):
Expand Down
50 changes: 46 additions & 4 deletions daft/unity_catalog/unity_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,56 @@ def _paginated_list_tables(client: unitycatalog.Unitycatalog, page_token: str |

return self._paginate_to_completion(_paginated_list_tables)

def load_table(self, table_name: str) -> UnityCatalogTable:
def load_table(self, table_name: str, new_table_storage_path: str | None = None) -> UnityCatalogTable:
"""Loads an existing Unity Catalog table. If the table is not found, and information is provided in the method to create a new table, a new table will be attempted to be registered.
Args:
table_name (str): Name of the table in Unity Catalog in the form of dot-separated, 3-level namespace
new_table_storage_path (str, optional): Cloud storage path URI to register a new external table using this path. Unity Catalog will validate if the path is valid and authorized for the principal, else will raise an exception.
Returns:
UnityCatalogTable
"""
# Load the table ID
table_info = self._client.tables.retrieve(table_name)
try:
table_info = self._client.tables.retrieve(table_name)
if new_table_storage_path:
warnings.warn(
f"Table {table_name} is an existing storage table with a valid storage path. The 'new_table_storage_path' argument provided will be ignored."
)
except unitycatalog.NotFoundError:
if not new_table_storage_path:
raise ValueError(
f"Table {table_name} is not an existing table. If a new table needs to be created, provide 'new_table_storage_path' value."
)
try:
three_part_namesplit = table_name.split(".")
if len(three_part_namesplit) != 3 or not all(three_part_namesplit):
raise ValueError(
f"Expected table name to be in the format of 'catalog.schema.table', received: {table_name}"
)

params = {
"catalog_name": three_part_namesplit[0],
"schema_name": three_part_namesplit[1],
"name": three_part_namesplit[2],
"columns": None,
"data_source_format": "DELTA",
"table_type": "EXTERNAL",
"storage_location": new_table_storage_path,
"comment": None,
}

table_info = self._client.tables.create(**params)
except Exception as e:
raise Exception(f"An error occurred while registering the table in Unity Catalog: {e}")

table_id = table_info.table_id
storage_location = table_info.storage_location

# Grab credentials from Unity catalog and place it into the Table
temp_table_credentials = self._client.temporary_table_credentials.create(operation="READ", table_id=table_id)
temp_table_credentials = self._client.temporary_table_credentials.create(
operation="READ_WRITE", table_id=table_id
)

scheme = urlparse(storage_location).scheme
if scheme == "s3" or scheme == "s3a":
Expand Down
3 changes: 3 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ docker
# Pinned requests due to docker-py issue: https://github.com/docker/docker-py/issues/3256
requests<2.32.0

# Pinned httpx due to unitycatalog-python issue: https://github.com/unitycatalog/unitycatalog-python/issues/9
httpx==0.27.2

# Tracing
orjson==3.10.12 # orjson recommended for viztracer
py-spy==0.3.14
Expand Down

0 comments on commit dd2dc23

Please sign in to comment.