diff --git a/docs/connectors/sources/azure-file-source.md b/docs/connectors/sources/azure-file-source.md index b3bcb538b..80569cc21 100644 --- a/docs/connectors/sources/azure-file-source.md +++ b/docs/connectors/sources/azure-file-source.md @@ -183,3 +183,23 @@ within the provided topic's folder structure. The default topic name the Application dumps to is based on the last folder name of the `FileSource` `filepath` as: `source__`. + + +## Testing Locally + +Rather than connect to Azure, you can alternatively test your application using a local +emulated Azure host via Docker: + +1. Execute in terminal: + + ```bash + docker run --rm -d --name azurite \ + -p 10000:10000 \ + mcr.microsoft.com/azure-storage/azurite:latest + ``` + +2. Set `connection_string` for `AzureOrigin` to: + +```python +"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;" +``` diff --git a/docs/connectors/sources/s3-file-source.md b/docs/connectors/sources/s3-file-source.md index 09ab62ac4..8674a6159 100644 --- a/docs/connectors/sources/s3-file-source.md +++ b/docs/connectors/sources/s3-file-source.md @@ -190,3 +190,25 @@ within the provided topic's folder structure. The default topic name the Application dumps to is based on the last folder name of the `FileSource` `filepath` as: `source__`. + +## Testing Locally + +Rather than connect to AWS, you can alternatively test your application using a local +emulated S3 host via Docker: + +1. Execute in terminal: + + ```bash + docker run --rm -d --name s3 \ + -p 4566:4566 \ + -e SERVICES=s3 \ + -e EDGE_PORT=4566 \ + -e DEBUG=1 \ + localstack/localstack:latest + ``` + +2. Set `aws_endpoint_url` for `S3Origin` _OR_ the `AWS_ENDPOINT_URL_S3` + environment variable to `http://localhost:4566` + +3. Set all other `aws_` parameters for `S3Origin` to _any_ string. +They will not be used, but they must still be populated! diff --git a/quixstreams/sources/community/file/file.py b/quixstreams/sources/community/file/file.py index df75c8ff7..f1e324313 100644 --- a/quixstreams/sources/community/file/file.py +++ b/quixstreams/sources/community/file/file.py @@ -152,7 +152,7 @@ def default_topic(self) -> Topic: """ topic = super().default_topic() topic.config = TopicConfig( - num_partitions=self._origin.get_folder_count(self._filepath), + num_partitions=self._origin.get_folder_count(self._filepath) or 1, replication_factor=1, ) return topic diff --git a/quixstreams/sources/community/file/origins/azure.py b/quixstreams/sources/community/file/origins/azure.py index 133192adf..4597e9346 100644 --- a/quixstreams/sources/community/file/origins/azure.py +++ b/quixstreams/sources/community/file/origins/azure.py @@ -1,6 +1,7 @@ +import os from io import BytesIO from pathlib import Path -from typing import Generator, Optional +from typing import Generator from .base import ExternalOrigin @@ -17,9 +18,17 @@ class AzureOrigin(ExternalOrigin): - def get_folder_count(self, filepath: Path) -> int: - data = self._client.list_blobs(name_starts_with=str(filepath)) - folders = [f for page in data.by_page() for f in page.get("blob_prefixes", [])] + def get_folder_count(self, path: Path) -> int: + """ + This is a simplified version of the recommended way to retrieve folder + names based on the azure SDK docs examples. + """ + path = f"{path}/" + folders = set() + for blob in self._client.list_blobs(name_starts_with=path): + relative_dir = os.path.dirname(os.path.relpath(blob.name, path)) + if relative_dir and ("/" not in relative_dir): + folders.add(relative_dir) return len(folders) def __init__( @@ -31,15 +40,12 @@ def __init__( :param connection_string: Azure client authentication string. :param container: Azure container name. """ - self._client: Optional[ContainerClient] = self._get_client(connection_string) self.root_location = container + self._client = self._get_client(connection_string) - def _get_client(self, auth: str): - if not self._client: - blob_client = BlobServiceClient.from_connection_string(auth) - container_client = blob_client.get_container_client(self.root_location) - self._client: ContainerClient = container_client - return self._client + def _get_client(self, auth: str) -> ContainerClient: + blob_client = BlobServiceClient.from_connection_string(auth) + return blob_client.get_container_client(self.root_location) def file_collector(self, folder: Path) -> Generator[Path, None, None]: data = self._client.list_blob_names(name_starts_with=str(folder)) diff --git a/quixstreams/sources/community/file/origins/s3.py b/quixstreams/sources/community/file/origins/s3.py index 6995e44dc..ae4c9ecb6 100644 --- a/quixstreams/sources/community/file/origins/s3.py +++ b/quixstreams/sources/community/file/origins/s3.py @@ -40,7 +40,7 @@ def __init__( :param aws_secret_access_key: the AWS secret access key. NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable :param aws_endpoint_url: the endpoint URL to use; only required for connecting - to a locally hosted Kinesis. + to a locally hosted S3. NOTE: can alternatively set the AWS_ENDPOINT_URL_S3 environment variable """ self.root_location = bucket @@ -66,7 +66,7 @@ def get_raw_file_stream(self, filepath: Path) -> BytesIO: def get_folder_count(self, folder: Path) -> int: resp = self._get_client().list_objects( - Bucket=self.root_location, Prefix=str(folder), Delimiter="/" + Bucket=self.root_location, Prefix=f"{folder}/", Delimiter="/" ) return len(resp["CommonPrefixes"]) @@ -77,8 +77,8 @@ def file_collector(self, folder: Union[str, Path]) -> Generator[Path, None, None Prefix=str(folder), Delimiter="/", ) - for folder in resp.get("CommonPrefixes", []): - yield from self.file_collector(folder["Prefix"]) + for _folder in resp.get("CommonPrefixes", []): + yield from self.file_collector(_folder["Prefix"]) for file in resp.get("Contents", []): yield Path(file["Key"])