diff --git a/quixstreams/sources/community/blob_store/blob_source.py b/quixstreams/sources/community/blob_store/blob_source.py index 914a9e40a..d540937eb 100644 --- a/quixstreams/sources/community/blob_store/blob_source.py +++ b/quixstreams/sources/community/blob_store/blob_source.py @@ -19,34 +19,28 @@ def __init__( self, blob_client: BlobClient, blob_format: Union[FormatName, Format], + filepath: Union[str, Path], blob_compression: Optional[CompressionName] = None, - blob_folder: Optional[Union[str, Path]] = None, - blob_file: Optional[Union[str, Path]] = None, as_replay: bool = True, name: Optional[str] = None, shutdown_timeout: float = 10.0, ): self._client = blob_client - self._blob_file = Path(blob_file) if blob_file else None - self._blob_folder = Path(blob_folder) if blob_folder else None super().__init__( - filepath=self._client.location, + filepath=filepath, file_format=blob_format, file_compression=blob_compression, as_replay=as_replay, - name=name or self._client.location, + name=name or self._client.root_location, shutdown_timeout=shutdown_timeout, ) def _get_partition_count(self) -> int: - return self._client.get_root_folder_count(self._blob_folder) + return self._client.get_root_folder_count(self._filepath) def _file_read(self, file: Path) -> Generator[dict, None, None]: yield from super()._file_read(self._client.get_raw_blob_stream(file)) def _file_list(self) -> Generator[Path, None, None]: - if self._blob_file: - yield self._blob_file - else: - yield from self._client.blob_collector(self._blob_folder) + yield from self._client.blob_collector(self._filepath) diff --git a/quixstreams/sources/community/blob_store/clients/aws.py b/quixstreams/sources/community/blob_store/clients/aws.py index 6f747d8f9..6e58fc1d4 100644 --- a/quixstreams/sources/community/blob_store/clients/aws.py +++ b/quixstreams/sources/community/blob_store/clients/aws.py @@ -43,7 +43,7 @@ def __init__( to a locally hosted Kinesis. NOTE: can alternatively set the AWS_ENDPOINT_URL_KINESIS environment variable """ - self.location = aws_s3_bucket + self.root_location = aws_s3_bucket self._client: Optional[S3Client] = None self._credentials = { "region_name": aws_region, diff --git a/quixstreams/sources/community/blob_store/clients/azure.py b/quixstreams/sources/community/blob_store/clients/azure.py index 2830135a6..9626d2c7d 100644 --- a/quixstreams/sources/community/blob_store/clients/azure.py +++ b/quixstreams/sources/community/blob_store/clients/azure.py @@ -14,20 +14,24 @@ class AzureBlobClient(BlobClient): + def get_root_folder_count(self, filepath: Path) -> int: + # TODO: implement + ... + def __init__( self, connection_string: str, container: str, ): self._client: Optional[ContainerClient] = None - self.location = container + self.root_location = container self._credentials = connection_string @property def client(self): if not self._client: blob_client = BlobServiceClient.from_connection_string(self._credentials) - container_client = blob_client.get_container_client(self.location) + container_client = blob_client.get_container_client(self.root_location) self._client: ContainerClient = container_client return self._client diff --git a/quixstreams/sources/community/blob_store/clients/base.py b/quixstreams/sources/community/blob_store/clients/base.py index 3cc085ef1..f8e54688e 100644 --- a/quixstreams/sources/community/blob_store/clients/base.py +++ b/quixstreams/sources/community/blob_store/clients/base.py @@ -11,7 +11,7 @@ class BlobClient: _client: Any _credentials: Union[dict, str] - location: Union[str, Path] + root_location: Union[str, Path] @property @abstractmethod