Skip to content

Commit

Permalink
clean up azure stuff and other small tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Nov 28, 2024
1 parent e3d9f63 commit bf9666b
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 16 deletions.
20 changes: 20 additions & 0 deletions docs/connectors/sources/azure-file-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,23 @@ within the provided topic's folder structure.

The default topic name the Application dumps to is based on the last folder name of
the `FileSource` `filepath` as: `source__<last folder name>`.


## Testing Locally

Rather than connect to Azure, you can alternatively test your application using a local
emulated Azure host via Docker:

1. Execute in terminal:

```bash
docker run --rm -d --name azurite \
-p 10000:10000 \
mcr.microsoft.com/azure-storage/azurite:latest
```

2. Set `connection_string` for `AzureOrigin` to:

```python
"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
```
22 changes: 22 additions & 0 deletions docs/connectors/sources/s3-file-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,25 @@ within the provided topic's folder structure.

The default topic name the Application dumps to is based on the last folder name of
the `FileSource` `filepath` as: `source__<last folder name>`.

## Testing Locally

Rather than connect to AWS, you can alternatively test your application using a local
emulated S3 host via Docker:

1. Execute in terminal:

```bash
docker run --rm -d --name s3 \
-p 4566:4566 \
-e SERVICES=s3 \
-e EDGE_PORT=4566 \
-e DEBUG=1 \
localstack/localstack:latest
```

2. Set `aws_endpoint_url` for `S3Origin` _OR_ the `AWS_ENDPOINT_URL_S3`
environment variable to `http://localhost:4566`

3. Set all other `aws_` parameters for `S3Origin` to _any_ string.
They will not be used, but they must still be populated!
2 changes: 1 addition & 1 deletion quixstreams/sources/community/file/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def default_topic(self) -> Topic:
"""
topic = super().default_topic()
topic.config = TopicConfig(
num_partitions=self._origin.get_folder_count(self._filepath),
num_partitions=self._origin.get_folder_count(self._filepath) or 1,
replication_factor=1,
)
return topic
Expand Down
28 changes: 17 additions & 11 deletions quixstreams/sources/community/file/origins/azure.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from io import BytesIO
from pathlib import Path
from typing import Generator, Optional
from typing import Generator

from .base import ExternalOrigin

Expand All @@ -17,9 +18,17 @@


class AzureOrigin(ExternalOrigin):
def get_folder_count(self, filepath: Path) -> int:
data = self._client.list_blobs(name_starts_with=str(filepath))
folders = [f for page in data.by_page() for f in page.get("blob_prefixes", [])]
def get_folder_count(self, path: Path) -> int:
"""
This is a simplified version of the recommended way to retrieve folder
names based on the azure SDK docs examples.
"""
path = f"{path}/"
folders = set()
for blob in self._client.list_blobs(name_starts_with=path):
relative_dir = os.path.dirname(os.path.relpath(blob.name, path))
if relative_dir and ("/" not in relative_dir):
folders.add(relative_dir)
return len(folders)

def __init__(
Expand All @@ -31,15 +40,12 @@ def __init__(
:param connection_string: Azure client authentication string.
:param container: Azure container name.
"""
self._client: Optional[ContainerClient] = self._get_client(connection_string)
self.root_location = container
self._client = self._get_client(connection_string)

def _get_client(self, auth: str):
if not self._client:
blob_client = BlobServiceClient.from_connection_string(auth)
container_client = blob_client.get_container_client(self.root_location)
self._client: ContainerClient = container_client
return self._client
def _get_client(self, auth: str) -> ContainerClient:
blob_client = BlobServiceClient.from_connection_string(auth)
return blob_client.get_container_client(self.root_location)

def file_collector(self, folder: Path) -> Generator[Path, None, None]:
data = self._client.list_blob_names(name_starts_with=str(folder))
Expand Down
8 changes: 4 additions & 4 deletions quixstreams/sources/community/file/origins/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(
:param aws_secret_access_key: the AWS secret access key.
NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable
:param aws_endpoint_url: the endpoint URL to use; only required for connecting
to a locally hosted Kinesis.
to a locally hosted S3.
NOTE: can alternatively set the AWS_ENDPOINT_URL_S3 environment variable
"""
self.root_location = bucket
Expand All @@ -66,7 +66,7 @@ def get_raw_file_stream(self, filepath: Path) -> BytesIO:

def get_folder_count(self, folder: Path) -> int:
resp = self._get_client().list_objects(
Bucket=self.root_location, Prefix=str(folder), Delimiter="/"
Bucket=self.root_location, Prefix=f"{folder}/", Delimiter="/"
)
return len(resp["CommonPrefixes"])

Expand All @@ -77,8 +77,8 @@ def file_collector(self, folder: Union[str, Path]) -> Generator[Path, None, None
Prefix=str(folder),
Delimiter="/",
)
for folder in resp.get("CommonPrefixes", []):
yield from self.file_collector(folder["Prefix"])
for _folder in resp.get("CommonPrefixes", []):
yield from self.file_collector(_folder["Prefix"])

for file in resp.get("Contents", []):
yield Path(file["Key"])

0 comments on commit bf9666b

Please sign in to comment.