Skip to content

Commit

Permalink
Support for Azure storage for Unity Catalog read_deltalake
Browse files Browse the repository at this point in the history
  • Loading branch information
anilmenon14 committed Oct 10, 2024
1 parent c2397bf commit eac67c3
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 34 deletions.
55 changes: 32 additions & 23 deletions daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
from typing import TYPE_CHECKING
from urllib.parse import urlparse

from deltalake.table import DeltaTable

Expand Down Expand Up @@ -37,31 +38,39 @@ def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
#
# See: https://github.com/delta-io/delta-rs/issues/2117
deltalake_sdk_io_config = storage_config.config.io_config
if any([deltalake_sdk_io_config.s3.key_id is None, deltalake_sdk_io_config.s3.region_name is None]):
try:
s3_config_from_env = S3Config.from_env()
# Sometimes S3Config.from_env throws an error, for example on CI machines with weird metadata servers.
except daft.exceptions.DaftCoreException:
pass
else:
if (
deltalake_sdk_io_config.s3.key_id is None
and deltalake_sdk_io_config.s3.access_key is None
and deltalake_sdk_io_config.s3.session_token is None
):
deltalake_sdk_io_config = deltalake_sdk_io_config.replace(
s3=deltalake_sdk_io_config.s3.replace(
key_id=s3_config_from_env.key_id,
access_key=s3_config_from_env.access_key,
session_token=s3_config_from_env.session_token,
scheme = urlparse(table_uri).scheme
if scheme == "s3" or scheme == "s3a":
if any([deltalake_sdk_io_config.s3.key_id is None, deltalake_sdk_io_config.s3.region_name is None]):
try:
s3_config_from_env = S3Config.from_env()
# Sometimes S3Config.from_env throws an error, for example on CI machines with weird metadata servers.
except daft.exceptions.DaftCoreException:
pass
else:
if (
deltalake_sdk_io_config.s3.key_id is None
and deltalake_sdk_io_config.s3.access_key is None
and deltalake_sdk_io_config.s3.session_token is None
):
deltalake_sdk_io_config = deltalake_sdk_io_config.replace(
s3=deltalake_sdk_io_config.s3.replace(
key_id=s3_config_from_env.key_id,
access_key=s3_config_from_env.access_key,
session_token=s3_config_from_env.session_token,
)
)
)
if deltalake_sdk_io_config.s3.region_name is None:
deltalake_sdk_io_config = deltalake_sdk_io_config.replace(
s3=deltalake_sdk_io_config.s3.replace(
region_name=s3_config_from_env.region_name,
if deltalake_sdk_io_config.s3.region_name is None:
deltalake_sdk_io_config = deltalake_sdk_io_config.replace(
s3=deltalake_sdk_io_config.s3.replace(
region_name=s3_config_from_env.region_name,
)
)
)
elif scheme == "gcs" or scheme == "gs":
# TO-DO: Handle any key-value replacements in `io_config` if there are missing elements
pass
elif scheme == "az" or scheme == "abfs" or scheme == "abfss":
# TO-DO: Handle any key-value replacements in `io_config` if there are missing elements
pass

self._table = DeltaTable(
table_uri, storage_options=io_config_to_storage_options(deltalake_sdk_io_config, table_uri)
Expand Down
35 changes: 24 additions & 11 deletions daft/unity_catalog/unity_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import dataclasses
from typing import Callable
from urllib.parse import urlparse

import unitycatalog

from daft.io import IOConfig, S3Config
from daft.io import IOConfig, S3Config, AzureConfig


@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -96,18 +97,30 @@ def load_table(self, table_name: str) -> UnityCatalogTable:

# Grab credentials from Unity catalog and place it into the Table
temp_table_credentials = self._client.temporary_table_credentials.create(operation="READ", table_id=table_id)
aws_temp_credentials = temp_table_credentials.aws_temp_credentials
io_config = (
IOConfig(
s3=S3Config(
key_id=aws_temp_credentials.access_key_id,
access_key=aws_temp_credentials.secret_access_key,
session_token=aws_temp_credentials.session_token,

scheme = urlparse(storage_location).scheme
if scheme == "s3" or scheme == "s3a":
aws_temp_credentials = temp_table_credentials.aws_temp_credentials
io_config = (
IOConfig(
s3=S3Config(
key_id=aws_temp_credentials.access_key_id,
access_key=aws_temp_credentials.secret_access_key,
session_token=aws_temp_credentials.session_token,
)
)
if aws_temp_credentials is not None
else None
)
if aws_temp_credentials is not None
else None
)
elif scheme == "gcs" or scheme == "gs":
# TO-DO: gather GCS credential vending assets from Unity and construct 'io_config``
pass
elif scheme == "az" or scheme == "abfs" or scheme == "abfss":
io_config = IOConfig(
azure=AzureConfig(
sas_token = temp_table_credentials.azure_user_delegation_sas.get('sas_token')
)
)

return UnityCatalogTable(
table_uri=storage_location,
Expand Down

0 comments on commit eac67c3

Please sign in to comment.