Skip to content

Commit

Permalink
fix: unity catalog import from write_deltalake
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 20, 2024
1 parent 5d4db4f commit 644259e
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,12 @@ def write_deltalake(
from daft.io import DataCatalogTable
from daft.io._deltalake import large_dtypes_kwargs
from daft.io.object_store_options import io_config_to_storage_options
from daft.unity_catalog import UnityCatalogTable

_UNITY_AVAILABLE = True
try:
from daft.unity_catalog import UnityCatalogTable
except ImportError:
_UNITY_AVAILABLE = False

Check warning on line 882 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L881-L882

Added lines #L881 - L882 were not covered by tests

if schema_mode == "merge":
raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.")
Expand All @@ -884,30 +889,35 @@ def write_deltalake(

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)):
# Retrieve table_uri and storage_options from various backends
table_uri: str
storage_options: dict

if isinstance(table, deltalake.DeltaTable):
table_uri = table.table_uri
storage_options = table._storage_options or {}
new_storage_options = io_config_to_storage_options(io_config, table_uri)
storage_options.update(new_storage_options or {})

Check warning on line 900 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L897-L900

Added lines #L897 - L900 were not covered by tests
else:
if isinstance(table, str):
table_uri = table
elif isinstance(table, pathlib.Path):
table_uri = str(table)
elif isinstance(table, UnityCatalogTable):
elif _UNITY_AVAILABLE and isinstance(table, UnityCatalogTable):

Check warning on line 906 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L906

Added line #L906 was not covered by tests
table_uri = table.table_uri
io_config = table.io_config
else:
elif isinstance(table, DataCatalogTable):

Check warning on line 909 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L909

Added line #L909 was not covered by tests
table_uri = table.table_uri(io_config)
else:
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

Check warning on line 912 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L912

Added line #L912 was not covered by tests

if io_config is None:
raise ValueError(
"io_config was not provided to write_deltalake and could not be retrieved from the default configuration."
"io_config was not provided to write_deltalake and could not be retrieved from defaults."
)

storage_options = io_config_to_storage_options(io_config, table_uri) or {}
table = try_get_deltatable(table_uri, storage_options=storage_options)
elif isinstance(table, deltalake.DeltaTable):
table_uri = table.table_uri
storage_options = table._storage_options or {}
new_storage_options = io_config_to_storage_options(io_config, table_uri)
storage_options.update(new_storage_options or {})
else:
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

# see: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/
scheme = get_protocol_from_path(table_uri)
Expand Down

0 comments on commit 644259e

Please sign in to comment.