From 644259e86e7812d5e44ef3f907acd14331dcbe3a Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 20 Dec 2024 20:54:45 +0800 Subject: [PATCH] fix: unity catalog import from write_deltalake --- daft/dataframe/dataframe.py | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index ca23c73cbb..10f79a018b 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -874,7 +874,12 @@ def write_deltalake( from daft.io import DataCatalogTable from daft.io._deltalake import large_dtypes_kwargs from daft.io.object_store_options import io_config_to_storage_options - from daft.unity_catalog import UnityCatalogTable + + _UNITY_AVAILABLE = True + try: + from daft.unity_catalog import UnityCatalogTable + except ImportError: + _UNITY_AVAILABLE = False if schema_mode == "merge": raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.") @@ -884,30 +889,35 @@ def write_deltalake( io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config - if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)): + # Retrieve table_uri and storage_options from various backends + table_uri: str + storage_options: dict + + if isinstance(table, deltalake.DeltaTable): + table_uri = table.table_uri + storage_options = table._storage_options or {} + new_storage_options = io_config_to_storage_options(io_config, table_uri) + storage_options.update(new_storage_options or {}) + else: if isinstance(table, str): table_uri = table elif isinstance(table, pathlib.Path): table_uri = str(table) - elif isinstance(table, UnityCatalogTable): + elif _UNITY_AVAILABLE and isinstance(table, UnityCatalogTable): table_uri = table.table_uri io_config = table.io_config - else: + elif isinstance(table, DataCatalogTable): table_uri = table.table_uri(io_config) + else: + raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}") if io_config is None: raise ValueError( - "io_config was not provided to write_deltalake and could not be retrieved from the default configuration." + "io_config was not provided to write_deltalake and could not be retrieved from defaults." ) + storage_options = io_config_to_storage_options(io_config, table_uri) or {} table = try_get_deltatable(table_uri, storage_options=storage_options) - elif isinstance(table, deltalake.DeltaTable): - table_uri = table.table_uri - storage_options = table._storage_options or {} - new_storage_options = io_config_to_storage_options(io_config, table_uri) - storage_options.update(new_storage_options or {}) - else: - raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}") # see: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/ scheme = get_protocol_from_path(table_uri)