Skip to content

Commit

Permalink
[BUG] Use Daft s3 credentials chain for deltalake reads (#2486)
Browse files Browse the repository at this point in the history
**Before:** Without user intervention (by passing in explicit
credentials), Daft would pass in `None` for credentials when
instantiating the `DeltaTable` objects, causing auth issues with S3.

**After:** We detect and special-case when there are no credentials
being passed in, and try to use Daft's credentials chain to override the
empty credentials before instantiating the `DeltaTable`

<img width="727" alt="image"
src="https://github.com/Eventual-Inc/Daft/assets/17691182/a45ac789-7f4d-4691-9f88-9b26089aeba6">

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Jul 8, 2024
1 parent 83532e2 commit 510786b
Showing 1 changed file with 40 additions and 2 deletions.
42 changes: 40 additions & 2 deletions daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from deltalake.table import DeltaTable

import daft
import daft.exceptions
from daft.daft import (
FileFormatConfig,
ParquetSourceConfig,
Pushdowns,
S3Config,
ScanTask,
StorageConfig,
)
Expand All @@ -24,8 +26,44 @@
class DeltaLakeScanOperator(ScanOperator):
def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
super().__init__()
storage_options = io_config_to_storage_options(storage_config.config.io_config, table_uri)
self._table = DeltaTable(table_uri, storage_options=storage_options)

# Unfortunately delta-rs doesn't do very good inference of credentials for S3. Thus the current Daft behavior of passing
# in `None` for credentials will cause issues when instantiating the DeltaTable without credentials.
#
# Thus, if we don't detect any credentials being available, we attempt to detect it from the environment using our Daft credentials chain.
#
# See: https://github.com/delta-io/delta-rs/issues/2117
deltalake_sdk_io_config = storage_config.config.io_config
if any([deltalake_sdk_io_config.s3.key_id is None, deltalake_sdk_io_config.s3.region_name is None]):
try:
s3_config_from_env = S3Config.from_env()
# Sometimes S3Config.from_env throws an error, for example on CI machines with weird metadata servers.
except daft.exceptions.DaftCoreException:
pass
else:
if (
deltalake_sdk_io_config.s3.key_id is None
and deltalake_sdk_io_config.s3.access_key is None
and deltalake_sdk_io_config.s3.session_token is None
):
deltalake_sdk_io_config = deltalake_sdk_io_config.replace(
s3=deltalake_sdk_io_config.s3.replace(
key_id=s3_config_from_env.key_id,
access_key=s3_config_from_env.access_key,
session_token=s3_config_from_env.session_token,
)
)
if deltalake_sdk_io_config.s3.region_name is None:
deltalake_sdk_io_config = deltalake_sdk_io_config.replace(
s3=deltalake_sdk_io_config.s3.replace(
region_name=s3_config_from_env.region_name,
)
)

self._table = DeltaTable(
table_uri, storage_options=io_config_to_storage_options(deltalake_sdk_io_config, table_uri)
)

self._storage_config = storage_config
self._schema = Schema.from_pyarrow_schema(self._table.schema().to_pyarrow())
partition_columns = set(self._table.metadata().partition_columns)
Expand Down

0 comments on commit 510786b

Please sign in to comment.