Skip to content

Commit

Permalink
a few more tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Nov 26, 2024
1 parent 08cafc0 commit 9473aba
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 15 deletions.
16 changes: 5 additions & 11 deletions quixstreams/sources/community/blob_store/blob_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,28 @@ def __init__(
self,
blob_client: BlobClient,
blob_format: Union[FormatName, Format],
filepath: Union[str, Path],
blob_compression: Optional[CompressionName] = None,
blob_folder: Optional[Union[str, Path]] = None,
blob_file: Optional[Union[str, Path]] = None,
as_replay: bool = True,
name: Optional[str] = None,
shutdown_timeout: float = 10.0,
):
self._client = blob_client
self._blob_file = Path(blob_file) if blob_file else None
self._blob_folder = Path(blob_folder) if blob_folder else None

super().__init__(
filepath=self._client.location,
filepath=filepath,
file_format=blob_format,
file_compression=blob_compression,
as_replay=as_replay,
name=name or self._client.location,
name=name or self._client.root_location,
shutdown_timeout=shutdown_timeout,
)

def _get_partition_count(self) -> int:
return self._client.get_root_folder_count(self._blob_folder)
return self._client.get_root_folder_count(self._filepath)

def _file_read(self, file: Path) -> Generator[dict, None, None]:
yield from super()._file_read(self._client.get_raw_blob_stream(file))

def _file_list(self) -> Generator[Path, None, None]:
if self._blob_file:
yield self._blob_file
else:
yield from self._client.blob_collector(self._blob_folder)
yield from self._client.blob_collector(self._filepath)
2 changes: 1 addition & 1 deletion quixstreams/sources/community/blob_store/clients/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(
to a locally hosted Kinesis.
NOTE: can alternatively set the AWS_ENDPOINT_URL_KINESIS environment variable
"""
self.location = aws_s3_bucket
self.root_location = aws_s3_bucket
self._client: Optional[S3Client] = None
self._credentials = {
"region_name": aws_region,
Expand Down
8 changes: 6 additions & 2 deletions quixstreams/sources/community/blob_store/clients/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@


class AzureBlobClient(BlobClient):
def get_root_folder_count(self, filepath: Path) -> int:
# TODO: implement
...

def __init__(
self,
connection_string: str,
container: str,
):
self._client: Optional[ContainerClient] = None
self.location = container
self.root_location = container
self._credentials = connection_string

@property
def client(self):
if not self._client:
blob_client = BlobServiceClient.from_connection_string(self._credentials)
container_client = blob_client.get_container_client(self.location)
container_client = blob_client.get_container_client(self.root_location)
self._client: ContainerClient = container_client
return self._client

Expand Down
2 changes: 1 addition & 1 deletion quixstreams/sources/community/blob_store/clients/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class BlobClient:
_client: Any
_credentials: Union[dict, str]
location: Union[str, Path]
root_location: Union[str, Path]

@property
@abstractmethod
Expand Down

0 comments on commit 9473aba

Please sign in to comment.