Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read_deltalake does not support Unity catalog tables on Azure storage #3024

Closed
anilmenon14 opened this issue Oct 9, 2024 · 7 comments
Closed
Assignees
Labels
bug Something isn't working needs triage

Comments

@anilmenon14
Copy link
Contributor

anilmenon14 commented Oct 9, 2024

Describe the bug

When Databricks Unity catalog tables are on Azure storage, the daft.read_deltalake method does not support reading from this storage.
It appears to only work for AWS storage, albeit with io_config passed down if AWS region of the storage is not us-east-1 ( see issue here where the commands to read data from AWS is shown as an example)

On Azure, the io_config is not populated upstream using the unity.load_table method, which is the primary cause of this issue. Following which, the DeltaLakeScanOperator instantiation withindaft.read_deltalake is unable to handle non-s3 io_config objects

Another issue to consider, which is not part of this issue, is to consider whether, for AWS storage, region needs to be passed down when instantiating an object in DeltaLake class, since this possibly can be done without specifying region. Happy to help contribute to explore this further since this would mean that we don't have to deal with having to have the us-east-1 default.

To Reproduce

import daft # Using version 0.3.4
from daft.unity_catalog import UnityCatalog
from databricks.sdk import WorkspaceClient # Using version  0.33.0
import os

DATABRICKS_HOST = os.environ.get('DATABRICKS_HOST') # Databricks Host URL stored in environment variables
PAT_TOKEN_AZURE = os.environ.get('PAT_TOKEN_AZURE') #i.e, the PAT token stored in environment variables . Does not yet support OAuth , as of Daft 0.3.4
METASTORE_ID  = os.environ.get('METASTORE_ID')

This block of code is really not being used and simply mentioned here just in case region is required anywhere

w = WorkspaceClient(host=DATABRICKS_HOST, token=PAT_TOKEN_AZURE)
metastore_summary = w.metastores.summary()
print("Metastore/Storage account cloud is : {}".format(metastore_summary.cloud))
print("Metastore region is : {}".format(metastore_summary.region))

Attempt 1: Passing instance of unity_catalog.UnityCatalogTable to daft.read_deltalake

unity = UnityCatalog(endpoint=DATABRICKS_HOST,token=PAT_TOKEN_AZURE)
unity_table_mngd = unity.load_table("some_catalog.some_schema.some_table")
df_mngd = daft.read_deltalake(unity_table_mngd)

Error:

failed to load region from IMDS err=failed to load IMDS session token: dispatch failure: timeout: error trying to connect: HTTP connect timeout occurred after 1s: HTTP connect timeout occurred after 1s: timed out (FailedToLoadToken(FailedToLoadToken { source: DispatchFailure(DispatchFailure { source: ConnectorError { kind: Timeout, source: hyper::Error(Connect, HttpTimeoutError { kind: "HTTP connect", duration: 1s }), connection: Unknown } }) }))
failed to load region from IMDS err=failed to load IMDS session token: dispatch failure: timeout: error trying to connect: HTTP connect timeout occurred after 1s: HTTP connect timeout occurred after 1s: timed out (FailedToLoadToken(FailedToLoadToken { source: DispatchFailure(DispatchFailure { source: ConnectorError { kind: Timeout, source: hyper::Error(Connect, HttpTimeoutError { kind: "HTTP connect", duration: 1s }), connection: Unknown } }) }))
S3 Credentials not provided or found when making client for us-east-1! Reverting to Anonymous mode. the credential provider was not enabled

OSError: Generic MicrosoftAzure error: Error performing token request: Error after 10 retries in 2.299535584s, max_retries:10, retry_timeout:180s, source:error sending request for url (http://169.254.169.254/metadata/identity/oauth2/token?api-version=2019-08-01&resource=https%3A%2F%2Fstorage.azure.com)

Attempt 2: Passing table_uri and io_config

unity = UnityCatalog(endpoint=DATABRICKS_HOST,token=PAT_TOKEN_AZURE)
unity_table_mngd = unity.load_table("some_catalog.some_schema.some_table")
df_mngd = daft.read_deltalake(unity_table_mngd.table_uri,io_config=unity_table_mngd.io_config)

Error:

failed to load region from IMDS err=failed to load IMDS session token: dispatch failure: timeout: error trying to connect: HTTP connect timeout occurred after 1s: HTTP connect timeout occurred after 1s: timed out (FailedToLoadToken(FailedToLoadToken { source: DispatchFailure(DispatchFailure { source: ConnectorError { kind: Timeout, source: hyper::Error(Connect, HttpTimeoutError { kind: "HTTP connect", duration: 1s }), connection: Unknown } }) }))
failed to load region from IMDS err=failed to load IMDS session token: dispatch failure: timeout: error trying to connect: HTTP connect timeout occurred after 1s: HTTP connect timeout occurred after 1s: timed out (FailedToLoadToken(FailedToLoadToken { source: DispatchFailure(DispatchFailure { source: ConnectorError { kind: Timeout, source: hyper::Error(Connect, HttpTimeoutError { kind: "HTTP connect", duration: 1s }), connection: Unknown } }) }))
S3 Credentials not provided or found when making client for us-east-1! Reverting to Anonymous mode. the credential provider was not enabled

OSError: Generic MicrosoftAzure error: Error performing token request: Error after 10 retries in 8.117160375s, max_retries:10, retry_timeout:180s, source:error sending request for url (http://169.254.169.254/metadata/identity/oauth2/token?api-version=2019-08-01&resource=https%3A%2F%2Fstorage.azure.com)

Expected behavior

This is the behavior seen with AWS storage where I can successfully retrieve a table.

unity = UnityCatalog(endpoint=DATABRICKS_HOST,token=PAT_TOKEN_AWS) # AWS Databricks workspace
unity_table_mngd = unity.load_table("some_catalog.some_schema.some_table")
# The metastore region is being passed down to ensure the us-east-1 default is not being applied.
io_config = IOConfig(s3=unity_table_mngd.io_config.s3.replace(region_name=metastore_summary.region))
df_mngd = daft.read_deltalake(unity_table_mngd.table_uri,io_config=io_config)

df_mngd.show()

Output:

Dataframe is displayed without issues.

Component(s)

Python Runner

Additional context

No response

@jordandakota
Copy link

Can confirm, io_config isn't populated at all, it has a value of None when reading from azure databricks using unity.load_table(). Current workaround is to use your own credential to storage directly which bypasses unity's permissions model (and most of my clients have essentially locked down and disabled).

@anilmenon14
Copy link
Contributor Author

Thanks @jordandakota for confirming this too.
I am working on a fix and will log a PR sometime later this week since I think I have a fair understanding where the fixes need to be applied.

@jaychia
Copy link
Contributor

jaychia commented Oct 9, 2024

Thanks @anilmenon14 for the really well fleshed out issue! Great to have someone with more databricks knowledge than us working on this.

Here is the logic where we create our io_config from the information provided to us by the Unity table:
https://github.com/Eventual-Inc/Daft/blob/main/daft/unity_catalog/unity_catalog.py#L97-L110

As you can see, it currently only populates AWS credentials (and also does not handle regions, because we could not figure out how Unity would be vending region information given the early nature of the project)

@anilmenon14
Copy link
Contributor Author

anilmenon14 commented Oct 9, 2024

Thank you, @jaychia .
As you rightly pointed out, we will have to fix how the io_config is created in load_table() in the UnityCatalog class.

Apart from that, this block also needs to be fixed to handle Azure storage being passed down in io_config
https://github.com/Eventual-Inc/Daft/blob/main/daft/delta_lake/delta_lake_scan.py#L43-L67

As for the AWS credential vending region, an interesting observation I noticed when I stepped into the DeltaLakeScanOperator code is that even if I didn't pass 'region' in the io_config, I received a valid table object back from deltalake

DeltaTable(table_uri, storage_options=io_config_to_storage_options(io_config, table_uri))

The io_config I used looked something like this for s3, which was region independent in the payload:

temp_table_credentials = unity._client.temporary_table_credentials.create(operation="READ", table_id=table_id)

io_config = IOConfig(
        s3=S3Config(
                    key_id=temp_table_credentials.aws_temp_credentials.access_key_id,
                    access_key=temp_table_credentials.aws_temp_credentials.secret_access_key,
                    session_token=temp_table_credentials.aws_temp_credentials.session_token
                )
    )

I'd certainly love to look further into that and try and contribute on that issue as well :)
For now, I will get the Azure fix as a PR worked on and keep the AWS region fix separate as another PR shortly after to not bring in too many variables with one change.

@anilmenon14
Copy link
Contributor Author

@jaychia , just a heads up that I have logged PR 3025 for the Azure storage compatibility issue.

@kevinzwang
Copy link
Member

I believe this is now resolved with #3025, and will be available in Daft v0.3.9

@anilmenon14
Copy link
Contributor Author

Cross-posting information, originally mentioned in issue #2903, regarding testing on v0.3.9

The issue is fixed, but it needs a workaround, which I hope we can avoid in the future. I will look into this since it was a bit of an oversight on my part not to test the .show() method, which appears to make API calls that expect the exact name of the storage account.

What works for Azure now on Daft v0.3.9

unity = UnityCatalog(endpoint=DATABRICKS_HOST,token=PAT_TOKEN_AZURE)
unity_table_ext = unity.load_table("some_catalog.some_schema.some_table") 

regex_match_az_storage_acc = re.search(r'@([^\.]+)\.', unity_table_ext.table_uri) # Gather storage account name from table URI
if regex_match_az_storage_acc:
    storage_account_parsed = regex_match_az_storage_acc.group(1)
else:
    raise ValueError("{} does not appear to be a valid Azure Storage URI".format(unity_table_ext.table_uri))

io_config = IOConfig(azure=unity_table_ext.io_config.azure.replace(storage_account=storage_account_parsed))
df_ext = daft.read_deltalake(unity_table_ext.table_uri,io_config=io_config)
df_ext.show()

What does not work for Azure:

unity = UnityCatalog(endpoint=DATABRICKS_HOST,token=PAT_TOKEN_AZURE)
unity_table_ext = unity.load_table("some_catalog.some_schema.some_table") 
df_ext = daft.read_deltalake(unity_table_ext)
df_ext.show()

Error:

DaftCoreException: DaftError::External Generic AzureBlob error: Azure Storage Account not set and is required.
 Set either `AzureConfig.storage_account` or the `AZURE_STORAGE_ACCOUNT` environment variable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage
Projects
None yet
Development

No branches or pull requests

4 participants