diff --git a/dlt/common/configuration/specs/azure_credentials.py b/dlt/common/configuration/specs/azure_credentials.py index c2071e2188..63f42056fa 100644 --- a/dlt/common/configuration/specs/azure_credentials.py +++ b/dlt/common/configuration/specs/azure_credentials.py @@ -22,6 +22,8 @@ class AzureCredentialsWithoutDefaults(CredentialsConfiguration): azure_storage_sas_token: TSecretStrValue = None azure_sas_token_permissions: str = "racwdl" """Permissions to use when generating a SAS token. Ignored when sas token is provided directly""" + azure_account_host: Optional[str] = None + """Alternative host when accessing blob storage endpoint ie. my_account.dfs.core.windows.net""" def to_adlfs_credentials(self) -> Dict[str, Any]: """Return a dict that can be passed as kwargs to adlfs""" @@ -29,6 +31,7 @@ def to_adlfs_credentials(self) -> Dict[str, Any]: account_name=self.azure_storage_account_name, account_key=self.azure_storage_account_key, sas_token=self.azure_storage_sas_token, + account_host=self.azure_account_host, ) def to_object_store_rs_credentials(self) -> Dict[str, str]: @@ -68,10 +71,13 @@ class AzureServicePrincipalCredentialsWithoutDefaults(CredentialsConfiguration): azure_tenant_id: str = None azure_client_id: str = None azure_client_secret: TSecretStrValue = None + azure_account_host: Optional[str] = None + """Alternative host when accessing blob storage endpoint ie. my_account.dfs.core.windows.net""" def to_adlfs_credentials(self) -> Dict[str, Any]: return dict( account_name=self.azure_storage_account_name, + account_host=self.azure_account_host, tenant_id=self.azure_tenant_id, client_id=self.azure_client_id, client_secret=self.azure_client_secret, diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index 777b51a488..4220716706 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -14,6 +14,7 @@ BaseConfiguration, SFTPCredentials, ) +from dlt.common.exceptions import TerminalValueError from dlt.common.typing import DictStrAny from dlt.common.utils import digest128 @@ -57,6 +58,40 @@ class LoadStorageConfiguration(BaseConfiguration): ] +def ensure_canonical_az_url( + bucket_url: str, target_scheme: str, storage_account_name: str = None, account_host: str = None +) -> str: + """Converts any of the forms of azure blob storage into canonical form of {target_scheme}://@.{account_host}/ + + `azure_storage_account_name` is optional only if not present in bucket_url, `account_host` assumes "dfs.core.windows.net" by default + """ + parsed_bucket_url = urlparse(bucket_url) + # Converts an az:/// to abfss://@.dfs.core.windows.net/ + if parsed_bucket_url.username: + # has the right form, ensure abfss schema + return urlunparse(parsed_bucket_url._replace(scheme=target_scheme)) + + if not storage_account_name and not account_host: + raise TerminalValueError( + f"Could not convert azure blob storage url {bucket_url} into canonical form " + f" ({target_scheme}://@.dfs.core.windows.net/)" + f" because storage account name is not known. Please use {target_scheme}:// canonical" + " url as bucket_url in filesystem credentials" + ) + + account_host = account_host or f"{storage_account_name}.dfs.core.windows.net" + + # as required by databricks + _path = parsed_bucket_url.path + return urlunparse( + parsed_bucket_url._replace( + scheme=target_scheme, + netloc=f"{parsed_bucket_url.netloc}@{account_host}", + path=_path, + ) + ) + + def _make_sftp_url(scheme: str, fs_path: str, bucket_url: str) -> str: parsed_bucket_url = urlparse(bucket_url) return f"{scheme}://{parsed_bucket_url.hostname}{fs_path}" diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 6ac5f31007..fb929031a1 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -40,6 +40,7 @@ ) from dlt.common.time import ensure_pendulum_datetime from dlt.common.typing import DictStrAny +from dlt.common.utils import without_none class FileItem(TypedDict, total=False): @@ -97,6 +98,10 @@ class FileItem(TypedDict, total=False): DEFAULT_KWARGS["azure"] = DEFAULT_KWARGS["az"] DEFAULT_KWARGS["abfss"] = DEFAULT_KWARGS["az"] +AZURE_BLOB_STORAGE_PROTOCOLS = ["az", "azure", "adl", "abfss", "abfs"] +S3_PROTOCOLS = ["s3", "s3a"] +GCS_PROTOCOLS = ["gs", "gcs"] + def fsspec_filesystem( protocol: str, @@ -130,7 +135,11 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny: """ protocol = config.protocol # never use listing caches - fs_kwargs: DictStrAny = {"use_listings_cache": False, "listings_expiry_time": 60.0} + fs_kwargs: DictStrAny = { + "use_listings_cache": False, + "listings_expiry_time": 60.0, + "skip_instance_cache": True, + } credentials = CREDENTIALS_DISPATCH.get(protocol, lambda _: {})(config) if protocol == "gdrive": @@ -151,7 +160,7 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny: if "client_kwargs" in fs_kwargs and "client_kwargs" in credentials: fs_kwargs["client_kwargs"].update(credentials.pop("client_kwargs")) - fs_kwargs.update(credentials) + fs_kwargs.update(without_none(credentials)) return fs_kwargs @@ -174,8 +183,13 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys # first get the class to check the protocol fs_cls = get_filesystem_class(config.protocol) if fs_cls.protocol == "abfs": + url = urlparse(config.bucket_url) # if storage account is present in bucket_url and in credentials, az fsspec will fail - if urlparse(config.bucket_url).username: + # account name is detected only for blob.core.windows.net host + if url.username and ( + url.hostname.endswith("blob.core.windows.net") + or url.hostname.endswith("dfs.core.windows.net") + ): fs_kwargs.pop("account_name") return url_to_fs(config.bucket_url, **fs_kwargs) # type: ignore except ImportError as e: diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index dba0a8667d..6e320dba8a 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -29,7 +29,8 @@ ) from dlt.common.schema.utils import is_nullable_column from dlt.common.storages import FileStorage -from dlt.common.storages.configuration import FilesystemConfiguration +from dlt.common.storages.configuration import FilesystemConfiguration, ensure_canonical_az_url +from dlt.common.storages.fsspec_filesystem import AZURE_BLOB_STORAGE_PROTOCOLS from dlt.destinations.exceptions import LoadJobTerminalException from dlt.destinations.impl.clickhouse.configuration import ( ClickHouseClientConfiguration, @@ -140,7 +141,7 @@ def run(self) -> None: f"s3('{bucket_http_url}',{auth},'{clickhouse_format}','auto','{compression}')" ) - elif bucket_scheme in ("az", "abfs"): + elif bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS: if not isinstance(self._staging_credentials, AzureCredentialsWithoutDefaults): raise LoadJobTerminalException( self._file_path, @@ -149,7 +150,10 @@ def run(self) -> None: # Authenticated access. account_name = self._staging_credentials.azure_storage_account_name - storage_account_url = f"https://{self._staging_credentials.azure_storage_account_name}.blob.core.windows.net" + account_host = self._staging_credentials.azure_account_host + storage_account_url = ensure_canonical_az_url( + bucket_path, "https", account_name, account_host + ) account_key = self._staging_credentials.azure_storage_account_key # build table func diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index 718427af87..2bb68a607e 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -18,12 +18,17 @@ AzureCredentialsWithoutDefaults, ) from dlt.common.exceptions import TerminalValueError +from dlt.common.storages.configuration import ensure_canonical_az_url from dlt.common.storages.file_storage import FileStorage +from dlt.common.storages.fsspec_filesystem import ( + AZURE_BLOB_STORAGE_PROTOCOLS, + S3_PROTOCOLS, + GCS_PROTOCOLS, +) from dlt.common.schema import TColumnSchema, Schema from dlt.common.schema.typing import TColumnType from dlt.common.storages import FilesystemConfiguration, fsspec_from_config - from dlt.destinations.insert_job_client import InsertValuesJobClient from dlt.destinations.exceptions import LoadJobTerminalException from dlt.destinations.impl.databricks.configuration import DatabricksClientConfiguration @@ -32,8 +37,8 @@ from dlt.destinations.job_impl import ReferenceFollowupJobRequest from dlt.destinations.utils import is_compression_disabled -AZURE_BLOB_STORAGE_PROTOCOLS = ["az", "abfss", "abfs"] -SUPPORTED_BLOB_STORAGE_PROTOCOLS = AZURE_BLOB_STORAGE_PROTOCOLS + ["s3", "gs", "gcs"] + +SUPPORTED_BLOB_STORAGE_PROTOCOLS = AZURE_BLOB_STORAGE_PROTOCOLS + S3_PROTOCOLS + GCS_PROTOCOLS class DatabricksLoadJob(RunnableLoadJob, HasFollowupJobs): @@ -106,7 +111,9 @@ def run(self) -> None: # Explicit azure credentials are needed to load from bucket without a named stage credentials_clause = f"""WITH(CREDENTIAL(AZURE_SAS_TOKEN='{staging_credentials.azure_storage_sas_token}'))""" bucket_path = self.ensure_databricks_abfss_url( - bucket_path, staging_credentials.azure_storage_account_name + bucket_path, + staging_credentials.azure_storage_account_name, + staging_credentials.azure_account_host, ) else: raise LoadJobTerminalException( @@ -124,7 +131,9 @@ def run(self) -> None: ), ) bucket_path = self.ensure_databricks_abfss_url( - bucket_path, staging_credentials.azure_storage_account_name + bucket_path, + staging_credentials.azure_storage_account_name, + staging_credentials.azure_account_host, ) # always add FROM clause @@ -165,30 +174,10 @@ def run(self) -> None: @staticmethod def ensure_databricks_abfss_url( - bucket_path: str, azure_storage_account_name: str = None + bucket_path: str, azure_storage_account_name: str = None, account_host: str = None ) -> str: - bucket_url = urlparse(bucket_path) - # Converts an az:/// to abfss://@.dfs.core.windows.net/ - if bucket_url.username: - # has the right form, ensure abfss schema - return urlunparse(bucket_url._replace(scheme="abfss")) - - if not azure_storage_account_name: - raise TerminalValueError( - f"Could not convert azure blob storage url {bucket_path} into form required by" - " Databricks" - " (abfss://@.dfs.core.windows.net/)" - " because storage account name is not known. Please use Databricks abfss://" - " canonical url as bucket_url in staging credentials" - ) - # as required by databricks - _path = bucket_url.path - return urlunparse( - bucket_url._replace( - scheme="abfss", - netloc=f"{bucket_url.netloc}@{azure_storage_account_name}.dfs.core.windows.net", - path=_path, - ) + return ensure_canonical_az_url( + bucket_path, "abfss", azure_storage_account_name, account_host ) diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index 41a8384754..b66923002e 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -15,12 +15,13 @@ AwsCredentialsWithoutDefaults, AzureCredentialsWithoutDefaults, ) -from dlt.common.storages.configuration import FilesystemConfiguration +from dlt.common.storages.configuration import FilesystemConfiguration, ensure_canonical_az_url from dlt.common.storages.file_storage import FileStorage from dlt.common.schema import TColumnSchema, Schema from dlt.common.schema.typing import TColumnType from dlt.common.exceptions import TerminalValueError +from dlt.common.storages.fsspec_filesystem import AZURE_BLOB_STORAGE_PROTOCOLS, S3_PROTOCOLS from dlt.common.typing import TLoaderFileFormat from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset from dlt.destinations.exceptions import LoadJobTerminalException @@ -124,33 +125,29 @@ def gen_copy_sql( if not is_local: bucket_scheme = parsed_file_url.scheme # referencing an external s3/azure stage does not require explicit AWS credentials - if bucket_scheme in ["s3", "az", "abfs"] and stage_name: + if bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS + S3_PROTOCOLS and stage_name: from_clause = f"FROM '@{stage_name}'" files_clause = f"FILES = ('{parsed_file_url.path.lstrip('/')}')" # referencing an staged files via a bucket URL requires explicit AWS credentials elif ( - bucket_scheme == "s3" + bucket_scheme in S3_PROTOCOLS and staging_credentials and isinstance(staging_credentials, AwsCredentialsWithoutDefaults) ): credentials_clause = f"""CREDENTIALS=(AWS_KEY_ID='{staging_credentials.aws_access_key_id}' AWS_SECRET_KEY='{staging_credentials.aws_secret_access_key}')""" from_clause = f"FROM '{file_url}'" elif ( - bucket_scheme in ["az", "abfs"] + bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS and staging_credentials and isinstance(staging_credentials, AzureCredentialsWithoutDefaults) ): # Explicit azure credentials are needed to load from bucket without a named stage credentials_clause = f"CREDENTIALS=(AZURE_SAS_TOKEN='?{staging_credentials.azure_storage_sas_token}')" - # Converts an az:/// to azure://.blob.core.windows.net// - # as required by snowflake - _path = "/" + parsed_file_url.netloc + parsed_file_url.path - file_url = urlunparse( - parsed_file_url._replace( - scheme="azure", - netloc=f"{staging_credentials.azure_storage_account_name}.blob.core.windows.net", - path=_path, - ) + file_url = ensure_canonical_az_url( + file_url, + "azure", + staging_credentials.azure_storage_account_name, + staging_credentials.azure_account_host, ) from_clause = f"FROM '{file_url}'" else: diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 4e9bf1068e..86970fe49d 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -122,14 +122,21 @@ endpoint_url = "https://.r2.cloudflarestorage.com" # copy your endpo #### Adding additional configuration -To pass any additional arguments to `fsspec`, you may supply `kwargs` and `client_kwargs` in the config as a **stringified dictionary**: +To pass any additional arguments to `fsspec`, you may supply `kwargs` and `client_kwargs` in toml config. ```toml -[destination.filesystem] -kwargs = '{"use_ssl": true, "auto_mkdir": true}' -client_kwargs = '{"verify": "public.crt"}' +[destination.filesystem.kwargs] +use_ssl=true +auto_mkdir=true + +[destination.filesystem.client_kwargs] +verify="public.crt" ``` +To pass additional arguments via env variables, use **stringified dictionary**: +`DESTINATION__FILESYSTEM__KWARGS='{"use_ssl": true, "auto_mkdir": true}` + + ### Google storage Run `pip install "dlt[gs]"` which will install the `gcfs` package. @@ -159,6 +166,30 @@ Run `pip install "dlt[az]"` which will install the `adlfs` package to interface Edit the credentials in `.dlt/secrets.toml`, you'll see AWS credentials by default; replace them with your Azure credentials. +`dlt` supports both forms of the blob storage urls: +```toml +[destination.filesystem] +bucket_url = "az:///path" # replace with your container name and path +``` + +and + +```toml +[destination.filesystem] +bucket_url = "abfss://@.dfs.core.windows.net/path" +``` + +You can use `az`, `abfss`, `azure` and `abfs` url schemes. + +If you need to use a custom host for your account you can set it up like below: +```toml +[destination.filesystem.credentials] +# The storage account name is always required +azure_account_host = "." +``` +Remember to include `storage_account_name` with your base host: `dlt_ci.blob.core.usgovcloudapi.net`. + + Two forms of Azure credentials are supported: #### SAS token credentials @@ -166,9 +197,6 @@ Two forms of Azure credentials are supported: Supply storage account name and either SAS token or storage account key ```toml -[destination.filesystem] -bucket_url = "az://[your_container name]" # replace with your container name - [destination.filesystem.credentials] # The storage account name is always required azure_storage_account_name = "account_name" # please set me up! @@ -181,14 +209,13 @@ If you have the correct Azure credentials set up on your machine (e.g., via Azur you can omit both `azure_storage_account_key` and `azure_storage_sas_token` and `dlt` will fall back to the available default. Note that `azure_storage_account_name` is still required as it can't be inferred from the environment. +`dlt` supports the + #### Service principal credentials Supply a client ID, client secret, and a tenant ID for a service principal authorized to access your container. ```toml -[destination.filesystem] -bucket_url = "az://[your_container name]" # replace with your container name - [destination.filesystem.credentials] azure_client_id = "client_id" # please set me up! azure_client_secret = "client_secret" @@ -204,6 +231,8 @@ max_concurrency=3 ``` ::: + + ### Local file system If for any reason you want to have those files in a local folder, set up the `bucket_url` as follows (you are free to use `config.toml` for that as there are no secrets required): diff --git a/mypy.ini b/mypy.ini index 089fde35aa..46fcc7bb75 100644 --- a/mypy.ini +++ b/mypy.ini @@ -119,4 +119,10 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-pytz.*] +ignore_missing_imports = True + +[mypy-tornado.*] +ignore_missing_imports = True + +[mypy-adlfs.*] ignore_missing_imports = True \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 00980992fa..ad77a357a8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -13,13 +13,13 @@ files = [ [[package]] name = "adlfs" -version = "2024.4.1" +version = "2024.7.0" description = "Access Azure Datalake Gen1 with fsspec and dask" optional = true python-versions = ">=3.8" files = [ - {file = "adlfs-2024.4.1-py3-none-any.whl", hash = "sha256:acea94612ddacaa34ea8c6babcc95b8da6982f930cdade7a86fbd17382403e16"}, - {file = "adlfs-2024.4.1.tar.gz", hash = "sha256:75530a45447f358ae53c5c39c298b8d966dae684be84db899f63b94cd96fc000"}, + {file = "adlfs-2024.7.0-py3-none-any.whl", hash = "sha256:2005c8e124fda3948f2a6abb2dbebb2c936d2d821acaca6afd61932edfa9bc07"}, + {file = "adlfs-2024.7.0.tar.gz", hash = "sha256:106995b91f0eb5e775bcd5957d180d9a14faef3271a063b1f65c66fd5ab05ddf"}, ] [package.dependencies] diff --git a/tests/load/filesystem/test_azure_credentials.py b/tests/load/filesystem/test_azure_credentials.py index 64da35d9be..002e256cff 100644 --- a/tests/load/filesystem/test_azure_credentials.py +++ b/tests/load/filesystem/test_azure_credentials.py @@ -3,6 +3,7 @@ from uuid import uuid4 import pytest +from pytest_mock import MockerFixture import dlt from dlt.common import pendulum @@ -17,7 +18,6 @@ from dlt.common.storages.configuration import FilesystemConfiguration from tests.load.utils import ALL_FILESYSTEM_DRIVERS, AZ_BUCKET from tests.common.configuration.utils import environment -from tests.utils import autouse_test_storage from dlt.common.storages.fsspec_filesystem import fsspec_from_config # mark all tests as essential, do not remove @@ -82,6 +82,7 @@ def test_azure_credentials_from_sas_token(environment: Dict[str, str]) -> None: environment["CREDENTIALS__AZURE_STORAGE_SAS_TOKEN"] = ( "sp=rwdlacx&se=2021-01-01T00:00:00Z&sv=2019-12-12&sr=c&sig=1234567890" ) + environment["CREDENTIALS__AZURE_ACCOUNT_HOST"] = "blob.core.usgovcloudapi.net" config = resolve_configuration(AzureCredentials()) @@ -95,6 +96,7 @@ def test_azure_credentials_from_sas_token(environment: Dict[str, str]) -> None: "account_name": environment["CREDENTIALS__AZURE_STORAGE_ACCOUNT_NAME"], "account_key": None, "sas_token": environment["CREDENTIALS__AZURE_STORAGE_SAS_TOKEN"], + "account_host": "blob.core.usgovcloudapi.net", } @@ -199,3 +201,69 @@ def test_azure_service_principal_fs_operations( assert f"{bucket}/{fn}/{fn}" in files fs.delete(f"{bucket}/{fn}/{fn}") fs.rmdir(f"{bucket}/{fn}") + + +def test_account_host_kwargs(environment: Dict[str, str], mocker: MockerFixture) -> None: + environment["CREDENTIALS__AZURE_STORAGE_ACCOUNT_NAME"] = "fake_account_name" + environment["CREDENTIALS__AZURE_STORAGE_ACCOUNT_KEY"] = "QWERTYUIOPASDFGHJKLZXCVBNM1234567890" + environment["CREDENTIALS__AZURE_SAS_TOKEN_PERMISSIONS"] = "rl" + + # [destination.filesystem] + # bucket_url="..." + # [destination.filesystem.kwargs] + # account_host="blob.core.usgovcloudapi.net" + # [destination.filesystem.credentials] + # ... + + config = resolve_configuration(FilesystemConfiguration(bucket_url="az://dlt-ci-test-bucket")) + config.kwargs = {"account_host": "dlt_ci.blob.core.usgovcloudapi.net"} + + from adlfs import AzureBlobFileSystem + + connect_mock = mocker.spy(AzureBlobFileSystem, "do_connect") + fsspec_from_config(config) + + connect_mock.assert_called_once() + assert connect_mock.call_args[0][0].account_host == "dlt_ci.blob.core.usgovcloudapi.net" + + config = resolve_configuration( + FilesystemConfiguration( + bucket_url="abfss://dlt-ci-test-bucket@dlt_ci.blob.core.usgovcloudapi.net" + ) + ) + connect_mock.reset_mock() + + assert isinstance(config.credentials, AzureCredentialsWithoutDefaults) + assert config.credentials.azure_storage_account_name == "fake_account_name" + # ignores the url from the bucket_url 🤷 + fs, _ = fsspec_from_config(config) + connect_mock.assert_called_once() + assert connect_mock.call_args[0][0].account_url.endswith( + "fake_account_name.blob.core.windows.net" + ) + + # use host + environment["KWARGS"] = '{"account_host": "fake_account_name.blob.core.usgovcloudapi.net"}' + config = resolve_configuration( + FilesystemConfiguration( + bucket_url="abfss://dlt-ci-test-bucket@fake_account_name.blob.core.usgovcloudapi.net" + ) + ) + connect_mock.reset_mock() + + # NOTE: fsspec is caching instances created in the same thread: skip_instance_cache + fs, _ = fsspec_from_config(config) + connect_mock.assert_called_once() + # assert connect_mock.call_args[0][0].account_url.endswith("fake_account_name.blob.core.usgovcloudapi.net") + assert fs.account_url.endswith("fake_account_name.blob.core.usgovcloudapi.net") + + +def test_azure_account_host(environment: Dict[str, str]) -> None: + environment["CREDENTIALS__AZURE_STORAGE_ACCOUNT_NAME"] = "fake_account_name" + environment["CREDENTIALS__AZURE_STORAGE_ACCOUNT_KEY"] = "QWERTYUIOPASDFGHJKLZXCVBNM1234567890" + environment["CREDENTIALS__AZURE_SAS_TOKEN_PERMISSIONS"] = "rl" + environment["CREDENTIALS__AZURE_ACCOUNT_HOST"] = "dlt_ci.blob.core.usgovcloudapi.net" + + config = resolve_configuration(FilesystemConfiguration(bucket_url="az://dlt-ci-test-bucket")) + fs, _ = fsspec_from_config(config) + assert fs.account_host == "dlt_ci.blob.core.usgovcloudapi.net" diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 0db93410e5..d0a29d03d0 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -14,8 +14,9 @@ from dlt.common.configuration import resolve from dlt.common.configuration.inject import with_config from dlt.common.configuration.specs import AnyAzureCredentials +from dlt.common.exceptions import TerminalValueError from dlt.common.storages import fsspec_from_config, FilesystemConfiguration -from dlt.common.storages.configuration import make_fsspec_url +from dlt.common.storages.configuration import ensure_canonical_az_url, make_fsspec_url from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH, glob_files from dlt.common.utils import custom_environ, uniq_id from dlt.destinations import filesystem @@ -74,6 +75,36 @@ def test_remote_url(bucket_url: str) -> None: assert make_fsspec_url(scheme, fs_path, bucket_url) == bucket_url +def test_make_az_url() -> None: + url = make_fsspec_url( + "azure", "/dlt-ci/path", "abfss://dlt-ci-test-bucket@dlt_ci.blob.core.usgovcloudapi.net" + ) + assert url == "azure://@dlt_ci.blob.core.usgovcloudapi.net/dlt-ci/path" + + +def test_ensure_az_canonical_url() -> None: + url = ensure_canonical_az_url( + "abfss://dlt-ci-test-bucket@dlt_ci.blob.core.usgovcloudapi.net", "https" + ) + assert url == "https://dlt-ci-test-bucket@dlt_ci.blob.core.usgovcloudapi.net" + + with pytest.raises(TerminalValueError): + ensure_canonical_az_url("abfss://dlt-ci-test-bucket/path/path", "https") + + url = ensure_canonical_az_url( + "abfss://dlt-ci-test-bucket/path/path", "https", storage_account_name="fake_dlt" + ) + assert url == "https://dlt-ci-test-bucket@fake_dlt.dfs.core.windows.net/path/path" + + url = ensure_canonical_az_url( + "abfss://dlt-ci-test-bucket/path/path", + "https", + storage_account_name="fake_dlt", + account_host="dlt_ci.blob.core.usgovcloudapi.net", + ) + assert url == "https://dlt-ci-test-bucket@dlt_ci.blob.core.usgovcloudapi.net/path/path" + + def test_filesystem_instance(with_gdrive_buckets_env: str) -> None: @retry(stop=stop_after_attempt(10), wait=wait_fixed(1), reraise=True) def check_file_exists(filedir_: str, file_url_: str):