Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unity catalog import from write_deltalake #3630

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,11 +870,11 @@
from packaging.version import parse

from daft import from_pydict
from daft.dependencies import unity_catalog
from daft.filesystem import get_protocol_from_path
from daft.io import DataCatalogTable
from daft.io._deltalake import large_dtypes_kwargs
from daft.io.object_store_options import io_config_to_storage_options
from daft.unity_catalog import UnityCatalogTable

if schema_mode == "merge":
raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.")
Expand All @@ -884,30 +884,35 @@

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)):
# Retrieve table_uri and storage_options from various backends
table_uri: str
storage_options: dict

if isinstance(table, deltalake.DeltaTable):
table_uri = table.table_uri
storage_options = table._storage_options or {}
new_storage_options = io_config_to_storage_options(io_config, table_uri)
storage_options.update(new_storage_options or {})

Check warning on line 895 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L892-L895

Added lines #L892 - L895 were not covered by tests
else:
if isinstance(table, str):
table_uri = table
elif isinstance(table, pathlib.Path):
table_uri = str(table)
elif isinstance(table, UnityCatalogTable):
elif unity_catalog.module_available() and isinstance(table, unity_catalog.UnityCatalogTable):

Check warning on line 901 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L901

Added line #L901 was not covered by tests
table_uri = table.table_uri
io_config = table.io_config
else:
elif isinstance(table, DataCatalogTable):

Check warning on line 904 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L904

Added line #L904 was not covered by tests
table_uri = table.table_uri(io_config)
else:
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

Check warning on line 907 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L907

Added line #L907 was not covered by tests

if io_config is None:
raise ValueError(
"io_config was not provided to write_deltalake and could not be retrieved from the default configuration."
"io_config was not provided to write_deltalake and could not be retrieved from defaults."
)

storage_options = io_config_to_storage_options(io_config, table_uri) or {}
table = try_get_deltatable(table_uri, storage_options=storage_options)
elif isinstance(table, deltalake.DeltaTable):
table_uri = table.table_uri
storage_options = table._storage_options or {}
new_storage_options = io_config_to_storage_options(io_config, table_uri)
storage_options.update(new_storage_options or {})
else:
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

# see: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/
scheme = get_protocol_from_path(table_uri)
Expand Down
Loading