From 3bc5b1d2a215145e97b89039417486e3cf03b924 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Mon, 2 Dec 2024 13:35:49 -0500 Subject: [PATCH] clean up after second round of code review --- docs/connectors/sources/azure-file-source.md | 24 +++++----- docs/connectors/sources/local-file-source.md | 22 +++++----- docs/connectors/sources/s3-file-source.md | 24 +++++----- quixstreams/sources/community/file/file.py | 44 +++++++++---------- .../sources/community/file/origins/azure.py | 40 ++++++++--------- .../sources/community/file/origins/base.py | 20 ++------- .../sources/community/file/origins/local.py | 15 +------ .../sources/community/file/origins/s3.py | 32 +++++++------- 8 files changed, 99 insertions(+), 122 deletions(-) diff --git a/docs/connectors/sources/azure-file-source.md b/docs/connectors/sources/azure-file-source.md index 80569cc21..08fc4c7c7 100644 --- a/docs/connectors/sources/azure-file-source.md +++ b/docs/connectors/sources/azure-file-source.md @@ -52,15 +52,15 @@ from quixstreams.sources.community.file.origins import AzureOrigin app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") -file_origin = AzureOrigin( +origin = AzureOrigin( container="", connection_string="", ) source = FileSource( - filepath="path/to/your/topic_folder/", - file_origin=file_origin, - file_format="json", - file_compression="gzip", + directory="path/to/your/topic_folder/", + origin=origin, + format="json", + compression="gzip", ) sdf = app.dataframe(source=source).print(metadata=True) # YOUR LOGIC HERE! @@ -83,19 +83,19 @@ Here are some important configurations to be aware of (see [File Source API](../ `FileSource`: -- `filepath`: a filepath to recursively read through (exclude bucket name). +- `directory`: a directory to recursively read through (exclude container name). **Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`). -- `file_origin`: An `AzureOrigin` instance. +- `origin`: An `AzureOrigin` instance. ### Optional: `FileSource`: -- `file_format`: what format the message files are in (ex: `"json"`, `"parquet"`). - **Advanced**: can optionally provide a `Format` instance (`file_compression` will then be ignored). +- `format`: what format the message files are in (ex: `"json"`, `"parquet"`). + **Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored). **Default**: `"json"` -- `file_compression`: what compression is used on the given files, if any (ex: `"gzip"`) +- `compression`: what compression is used on the given files, if any (ex: `"gzip"`) **Default**: `None` - `as_replay`: Produce the messages with the original time delay between them, else as fast as possible. **Note**: Time delay will only be accurate _per partition_, NOT overall. @@ -163,7 +163,7 @@ This will result in the following Kafka message format for `Application`: ### Custom Schemas (Advanced) If the original files are not formatted as expected, custom loaders can be configured -on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(file_format=)`. +on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(format=)`. Formats can be imported from `quixstreams.sources.community.file.formats`. @@ -182,7 +182,7 @@ The default topic will have a partition count that reflects the partition count within the provided topic's folder structure. The default topic name the Application dumps to is based on the last folder name of -the `FileSource` `filepath` as: `source__`. +the `FileSource` `directory` as: `source__`. ## Testing Locally diff --git a/docs/connectors/sources/local-file-source.md b/docs/connectors/sources/local-file-source.md index d7ec77a18..89a48d7d3 100644 --- a/docs/connectors/sources/local-file-source.md +++ b/docs/connectors/sources/local-file-source.md @@ -6,7 +6,7 @@ To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. -This source reads records from files at a local filepath and produces +This source reads records from files at a local directory and produces them as messages to a kafka topic using any desired `StreamingDataFrame`-based transformations. The resulting messages can be produced in "replay" mode, where the time between record @@ -39,7 +39,7 @@ You can learn more details about the [expected kafka message format](#message-da Local File Source is the default configuration of the `FileSource` connector. -Simply hand the configured `FileSource` (without a `file_origin`) to your `SDF` +Simply hand the configured `FileSource` (without a `origin`) to your `SDF` (`app.dataframe(source=)`). For more details around various settings, see [configuration](#configuration). @@ -50,9 +50,9 @@ from quixstreams.sources.community.file import FileSource app = Application(broker_address="localhost:9092") source = FileSource( - filepath="/path/to/my/topic_folder", - file_format="json", - file_compression="gzip", + directory="/path/to/my/topic_folder", + format="json", + compression="gzip", as_replay=True, ) sdf = app.dataframe(source=source).print(metadata=True) @@ -68,15 +68,15 @@ Here are some important configurations to be aware of (see [File Source API](../ ### Required: -- `filepath`: a filepath to recursively read through (exclude bucket name). +- `directory`: a directory to recursively read through (exclude bucket name). **Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`). ### Optional: -- `file_format`: what format the message files are in (ex: `"json"`, `"parquet"`). - **Advanced**: can optionally provide a `Format` instance (`file_compression` will then be ignored). +- `format`: what format the message files are in (ex: `"json"`, `"parquet"`). + **Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored). **Default**: `"json"` -- `file_compression`: what compression is used on the given files, if any (ex: `"gzip"`) +- `compression`: what compression is used on the given files, if any (ex: `"gzip"`) **Default**: `None` - `as_replay`: Produce the messages with the original time delay between them, else as fast as possible. **Note**: Time delay will only be accurate _per partition_, NOT overall. @@ -147,7 +147,7 @@ This will result in the following Kafka message format for `Application`: ### Custom Schemas (Advanced) If the original files are not formatted as expected, custom loaders can be configured -on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(file_format=)`. +on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(format=)`. Formats can be imported from `quixstreams.sources.community.file.formats`. @@ -166,4 +166,4 @@ The default topic will have a partition count that reflects the partition count within the provided topic's folder structure. The default topic name the Application dumps to is based on the last folder name of -the `FileSource` `filepath` as: `source__`. +the `FileSource` `directory` as: `source__`. diff --git a/docs/connectors/sources/s3-file-source.md b/docs/connectors/sources/s3-file-source.md index 8674a6159..acb75d6da 100644 --- a/docs/connectors/sources/s3-file-source.md +++ b/docs/connectors/sources/s3-file-source.md @@ -52,17 +52,17 @@ from quixstreams.sources.community.file.origins import S3Origin app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") -file_origin = S3Origin( +origin = S3Origin( bucket="", aws_access_key_id="", aws_secret_access_key="", aws_region="", ) source = FileSource( - filepath="path/to/your/topic_folder/", - file_origin=file_origin, - file_format="json", - file_compression="gzip", + directory="path/to/your/topic_folder/", + origin=origin, + format="json", + compression="gzip", ) sdf = app.dataframe(source=source).print(metadata=True) # YOUR LOGIC HERE! @@ -90,19 +90,19 @@ Here are some important configurations to be aware of (see [File Source API](../ `FileSource`: -- `filepath`: a filepath to recursively read through (exclude bucket name). +- `directory`: a directory to recursively read through (exclude bucket name). **Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`). -- `file_origin`: An `S3Origin` instance. +- `origin`: An `S3Origin` instance. ### Optional: `FileSource`: -- `file_format`: what format the message files are in (ex: `"json"`, `"parquet"`). - **Advanced**: can optionally provide a `Format` instance (`file_compression` will then be ignored). +- `format`: what format the message files are in (ex: `"json"`, `"parquet"`). + **Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored). **Default**: `"json"` -- `file_compression`: what compression is used on the given files, if any (ex: `"gzip"`) +- `compression`: what compression is used on the given files, if any (ex: `"gzip"`) **Default**: `None` - `as_replay`: Produce the messages with the original time delay between them, else as fast as possible. **Note**: Time delay will only be accurate _per partition_, NOT overall. @@ -170,7 +170,7 @@ This will result in the following Kafka message format for `Application`: ### Custom Schemas (Advanced) If the original files are not formatted as expected, custom loaders can be configured -on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(file_format=)`. +on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(format=)`. Formats can be imported from `quixstreams.sources.community.file.formats`. @@ -189,7 +189,7 @@ The default topic will have a partition count that reflects the partition count within the provided topic's folder structure. The default topic name the Application dumps to is based on the last folder name of -the `FileSource` `filepath` as: `source__`. +the `FileSource` `directory` as: `source__`. ## Testing Locally diff --git a/quixstreams/sources/community/file/file.py b/quixstreams/sources/community/file/file.py index f1e324313..f53772ebf 100644 --- a/quixstreams/sources/community/file/file.py +++ b/quixstreams/sources/community/file/file.py @@ -56,17 +56,17 @@ class FileSource(Source): app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") - file_origin = S3Origin( + origin = S3Origin( bucket="", aws_access_key_id="", aws_secret_access_key="", aws_region="", ) source = FileSource( - filepath="path/to/your/topic_folder/", - file_origin=file_origin, - file_format="json", - file_compression="gzip", + directory="path/to/your/topic_folder/", + origin=origin, + format="json", + compression="gzip", ) sdf = app.dataframe(source=source).print(metadata=True) # YOUR LOGIC HERE! @@ -78,22 +78,22 @@ class FileSource(Source): def __init__( self, - filepath: Union[str, Path], - file_format: Union[Format, FormatName] = "json", - file_origin: Origin = LocalOrigin(), - file_compression: Optional[CompressionName] = None, + directory: Union[str, Path], + format: Union[Format, FormatName] = "json", + origin: Origin = LocalOrigin(), + compression: Optional[CompressionName] = None, as_replay: bool = True, name: Optional[str] = None, shutdown_timeout: float = 10, ): """ - :param filepath: a filepath to recursively read through; it is recommended to + :param directory: a directory to recursively read through; it is recommended to provide the path to a given topic folder (ex: `/path/to/topic_a`). - :param file_format: what format the message files are in (ex: json, parquet). - Optionally, can provide a `Format` instance if more than file_compression - is necessary to define (file_compression will then be ignored). - :param file_origin: an Origin type (defaults to reading local files). - :param file_compression: what compression is used on the given files, if any. + :param format: what format the message files are in (ex: json, parquet). + Optionally, can provide a `Format` instance if more than compression + is necessary to define (compression will then be ignored). + :param origin: an Origin type (defaults to reading local files). + :param compression: what compression is used on the given files, if any. :param as_replay: Produce the messages with the original time delay between them. Otherwise, produce the messages as fast as possible. NOTE: Time delay will only be accurate per partition, NOT overall. @@ -101,14 +101,14 @@ def __init__( :param shutdown_timeout: Time in seconds the application waits for the source to gracefully shutdown """ - self._filepath = Path(filepath) - self._origin = file_origin - self._formatter = _get_formatter(file_format, file_compression) + self._directory = Path(directory) + self._origin = origin + self._formatter = _get_formatter(format, compression) self._as_replay = as_replay self._previous_timestamp = None self._previous_partition = None super().__init__( - name=name or self._filepath.name, shutdown_timeout=shutdown_timeout + name=name or self._directory.name, shutdown_timeout=shutdown_timeout ) def _replay_delay(self, current_timestamp: int): @@ -152,15 +152,15 @@ def default_topic(self) -> Topic: """ topic = super().default_topic() topic.config = TopicConfig( - num_partitions=self._origin.get_folder_count(self._filepath) or 1, + num_partitions=self._origin.get_folder_count(self._directory) or 1, replication_factor=1, ) return topic def run(self): while self._running: - logger.info(f"Reading files from topic {self._filepath.name}") - for file in self._origin.file_collector(self._filepath): + logger.info(f"Reading files from topic {self._directory.name}") + for file in self._origin.file_collector(self._directory): logger.debug(f"Reading file {file}") self._check_file_partition_number(file) filestream = self._origin.get_raw_file_stream(file) diff --git a/quixstreams/sources/community/file/origins/azure.py b/quixstreams/sources/community/file/origins/azure.py index 4597e9346..dbfb0c638 100644 --- a/quixstreams/sources/community/file/origins/azure.py +++ b/quixstreams/sources/community/file/origins/azure.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Generator -from .base import ExternalOrigin +from .base import Origin try: from azure.storage.blob import BlobServiceClient @@ -14,23 +14,10 @@ 'run "pip install quixstreams[azure]" to use AzureOrigin' ) from exc -__all__ = ("AzureOrigin",) +__all__ = ("AzureFilesOrigin",) -class AzureOrigin(ExternalOrigin): - def get_folder_count(self, path: Path) -> int: - """ - This is a simplified version of the recommended way to retrieve folder - names based on the azure SDK docs examples. - """ - path = f"{path}/" - folders = set() - for blob in self._client.list_blobs(name_starts_with=path): - relative_dir = os.path.dirname(os.path.relpath(blob.name, path)) - if relative_dir and ("/" not in relative_dir): - folders.add(relative_dir) - return len(folders) - +class AzureFilesOrigin(Origin): def __init__( self, connection_string: str, @@ -47,13 +34,26 @@ def _get_client(self, auth: str) -> ContainerClient: blob_client = BlobServiceClient.from_connection_string(auth) return blob_client.get_container_client(self.root_location) - def file_collector(self, folder: Path) -> Generator[Path, None, None]: - data = self._client.list_blob_names(name_starts_with=str(folder)) + def file_collector(self, filepath: Path) -> Generator[Path, None, None]: + data = self._client.list_blob_names(name_starts_with=str(filepath)) for page in data.by_page(): for item in page: yield Path(item) - def get_raw_file_stream(self, blob_name: Path) -> BytesIO: - blob_client = self._client.get_blob_client(str(blob_name)) + def get_folder_count(self, directory: Path) -> int: + """ + This is a simplified version of the recommended way to retrieve folder + names based on the azure SDK docs examples. + """ + path = f"{directory}/" + folders = set() + for blob in self._client.list_blobs(name_starts_with=path): + relative_dir = os.path.dirname(os.path.relpath(blob.name, path)) + if relative_dir and ("/" not in relative_dir): + folders.add(relative_dir) + return len(folders) + + def get_raw_file_stream(self, filepath: Path) -> BytesIO: + blob_client = self._client.get_blob_client(str(filepath)) data = blob_client.download_blob().readall() return BytesIO(data) diff --git a/quixstreams/sources/community/file/origins/base.py b/quixstreams/sources/community/file/origins/base.py index b3ef10aab..927ccef0d 100644 --- a/quixstreams/sources/community/file/origins/base.py +++ b/quixstreams/sources/community/file/origins/base.py @@ -1,12 +1,8 @@ from abc import ABC, abstractmethod -from dataclasses import dataclass from pathlib import Path -from typing import Any, BinaryIO, Iterable, Union +from typing import BinaryIO, Iterable -__all__ = ( - "Origin", - "ExternalOrigin", -) +__all__ = ("Origin",) class Origin(ABC): @@ -26,9 +22,9 @@ def file_collector(self, filepath: Path) -> Iterable[Path]: ... """ @abstractmethod - def get_folder_count(self, folder: Path) -> int: ... + def get_folder_count(self, directory: Path) -> int: ... - """Counts the number of folders at filepath to assume partition counts.""" + """Counts the number of folders at directory to assume partition counts.""" @abstractmethod def get_raw_file_stream(self, filepath: Path) -> BinaryIO: ... @@ -38,11 +34,3 @@ def get_raw_file_stream(self, filepath: Path) -> BinaryIO: ... Result should be ready for deserialization (and/or decompression). """ - - -@dataclass -class ExternalOrigin(Origin, ABC): - """An interface for interacting with an external file-based client""" - - _client: Any - root_location: Union[str, Path] diff --git a/quixstreams/sources/community/file/origins/local.py b/quixstreams/sources/community/file/origins/local.py index db6f4208d..5bfb1980c 100644 --- a/quixstreams/sources/community/file/origins/local.py +++ b/quixstreams/sources/community/file/origins/local.py @@ -8,17 +8,6 @@ class LocalOrigin(Origin): - def __init__( - self, - ): - self._client = None - self._credentials = {} - self.root_location = "/" - - @property - def client(self): - return - def file_collector(self, filepath: Path) -> Generator[Path, None, None]: if filepath.is_dir(): for i in sorted(filepath.iterdir(), key=lambda x: x.name): @@ -26,8 +15,8 @@ def file_collector(self, filepath: Path) -> Generator[Path, None, None]: else: yield filepath - def get_folder_count(self, folder: Path) -> int: - return len([f for f in folder.iterdir()]) + def get_folder_count(self, directory: Path) -> int: + return len([f for f in directory.iterdir()]) def get_raw_file_stream(self, filepath: Path) -> BytesIO: return BytesIO(filepath.read_bytes()) diff --git a/quixstreams/sources/community/file/origins/s3.py b/quixstreams/sources/community/file/origins/s3.py index ae4c9ecb6..b5aa2ef6b 100644 --- a/quixstreams/sources/community/file/origins/s3.py +++ b/quixstreams/sources/community/file/origins/s3.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Generator, Optional, Union -from .base import ExternalOrigin +from .base import Origin try: from boto3 import client as boto_client @@ -20,7 +20,7 @@ __all__ = ("S3Origin",) -class S3Origin(ExternalOrigin): +class S3Origin(Origin): def __init__( self, bucket: str, @@ -58,23 +58,11 @@ def __init__( def _get_client(self) -> S3Client: return boto_client("s3", **self._credentials) - def get_raw_file_stream(self, filepath: Path) -> BytesIO: - data = self._client.get_object(Bucket=self.root_location, Key=str(filepath))[ - "Body" - ].read() - return BytesIO(data) - - def get_folder_count(self, folder: Path) -> int: - resp = self._get_client().list_objects( - Bucket=self.root_location, Prefix=f"{folder}/", Delimiter="/" - ) - return len(resp["CommonPrefixes"]) - - def file_collector(self, folder: Union[str, Path]) -> Generator[Path, None, None]: + def file_collector(self, filepath: Union[str, Path]) -> Generator[Path, None, None]: self._client = self._get_client() resp = self._client.list_objects( Bucket=self.root_location, - Prefix=str(folder), + Prefix=str(filepath), Delimiter="/", ) for _folder in resp.get("CommonPrefixes", []): @@ -82,3 +70,15 @@ def file_collector(self, folder: Union[str, Path]) -> Generator[Path, None, None for file in resp.get("Contents", []): yield Path(file["Key"]) + + def get_folder_count(self, directory: Path) -> int: + resp = self._get_client().list_objects( + Bucket=self.root_location, Prefix=f"{directory}/", Delimiter="/" + ) + return len(resp["CommonPrefixes"]) + + def get_raw_file_stream(self, filepath: Path) -> BytesIO: + data = self._client.get_object(Bucket=self.root_location, Key=str(filepath))[ + "Body" + ].read() + return BytesIO(data)