Skip to content

Commit

Permalink
clean up after second round of code review
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Dec 2, 2024
1 parent 72cba51 commit f581e28
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 122 deletions.
24 changes: 12 additions & 12 deletions docs/connectors/sources/azure-file-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ from quixstreams.sources.community.file.origins import AzureOrigin

app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")

file_origin = AzureOrigin(
origin = AzureOrigin(
container="<YOUR CONTAINER NAME>",
connection_string="<YOUR CONNECTION STRING>",
)
source = FileSource(
filepath="path/to/your/topic_folder/",
file_origin=file_origin,
file_format="json",
file_compression="gzip",
directory="path/to/your/topic_folder/",
origin=origin,
format="json",
compression="gzip",
)
sdf = app.dataframe(source=source).print(metadata=True)
# YOUR LOGIC HERE!
Expand All @@ -83,19 +83,19 @@ Here are some important configurations to be aware of (see [File Source API](../

`FileSource`:

- `filepath`: a filepath to recursively read through (exclude bucket name).
- `directory`: a directory to recursively read through (exclude container name).
**Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`).
- `file_origin`: An `AzureOrigin` instance.
- `origin`: An `AzureOrigin` instance.


### Optional:

`FileSource`:

- `file_format`: what format the message files are in (ex: `"json"`, `"parquet"`).
**Advanced**: can optionally provide a `Format` instance (`file_compression` will then be ignored).
- `format`: what format the message files are in (ex: `"json"`, `"parquet"`).
**Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored).
**Default**: `"json"`
- `file_compression`: what compression is used on the given files, if any (ex: `"gzip"`)
- `compression`: what compression is used on the given files, if any (ex: `"gzip"`)
**Default**: `None`
- `as_replay`: Produce the messages with the original time delay between them, else as fast as possible.
**Note**: Time delay will only be accurate _per partition_, NOT overall.
Expand Down Expand Up @@ -163,7 +163,7 @@ This will result in the following Kafka message format for `Application`:
### Custom Schemas (Advanced)

If the original files are not formatted as expected, custom loaders can be configured
on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(file_format=<Format>)`.
on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(format=<Format>)`.

Formats can be imported from `quixstreams.sources.community.file.formats`.

Expand All @@ -182,7 +182,7 @@ The default topic will have a partition count that reflects the partition count
within the provided topic's folder structure.

The default topic name the Application dumps to is based on the last folder name of
the `FileSource` `filepath` as: `source__<last folder name>`.
the `FileSource` `directory` as: `source__<last folder name>`.


## Testing Locally
Expand Down
22 changes: 11 additions & 11 deletions docs/connectors/sources/local-file-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.

This source reads records from files at a local filepath and produces
This source reads records from files at a local directory and produces
them as messages to a kafka topic using any desired `StreamingDataFrame`-based transformations.

The resulting messages can be produced in "replay" mode, where the time between record
Expand Down Expand Up @@ -39,7 +39,7 @@ You can learn more details about the [expected kafka message format](#message-da

Local File Source is the default configuration of the `FileSource` connector.

Simply hand the configured `FileSource` (without a `file_origin`) to your `SDF`
Simply hand the configured `FileSource` (without a `origin`) to your `SDF`
(`app.dataframe(source=<SOURCE>)`).

For more details around various settings, see [configuration](#configuration).
Expand All @@ -50,9 +50,9 @@ from quixstreams.sources.community.file import FileSource

app = Application(broker_address="localhost:9092")
source = FileSource(
filepath="/path/to/my/topic_folder",
file_format="json",
file_compression="gzip",
directory="/path/to/my/topic_folder",
format="json",
compression="gzip",
as_replay=True,
)
sdf = app.dataframe(source=source).print(metadata=True)
Expand All @@ -68,15 +68,15 @@ Here are some important configurations to be aware of (see [File Source API](../

### Required:

- `filepath`: a filepath to recursively read through (exclude bucket name).
- `directory`: a directory to recursively read through (exclude bucket name).
**Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`).

### Optional:

- `file_format`: what format the message files are in (ex: `"json"`, `"parquet"`).
**Advanced**: can optionally provide a `Format` instance (`file_compression` will then be ignored).
- `format`: what format the message files are in (ex: `"json"`, `"parquet"`).
**Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored).
**Default**: `"json"`
- `file_compression`: what compression is used on the given files, if any (ex: `"gzip"`)
- `compression`: what compression is used on the given files, if any (ex: `"gzip"`)
**Default**: `None`
- `as_replay`: Produce the messages with the original time delay between them, else as fast as possible.
**Note**: Time delay will only be accurate _per partition_, NOT overall.
Expand Down Expand Up @@ -147,7 +147,7 @@ This will result in the following Kafka message format for `Application`:
### Custom Schemas (Advanced)

If the original files are not formatted as expected, custom loaders can be configured
on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(file_format=<Format>)`.
on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(format=<Format>)`.

Formats can be imported from `quixstreams.sources.community.file.formats`.

Expand All @@ -166,4 +166,4 @@ The default topic will have a partition count that reflects the partition count
within the provided topic's folder structure.

The default topic name the Application dumps to is based on the last folder name of
the `FileSource` `filepath` as: `source__<last folder name>`.
the `FileSource` `directory` as: `source__<last folder name>`.
24 changes: 12 additions & 12 deletions docs/connectors/sources/s3-file-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ from quixstreams.sources.community.file.origins import S3Origin

app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")

file_origin = S3Origin(
origin = S3Origin(
bucket="<YOUR BUCKET NAME>",
aws_access_key_id="<YOUR KEY ID>",
aws_secret_access_key="<YOUR SECRET KEY>",
aws_region="<YOUR REGION>",
)
source = FileSource(
filepath="path/to/your/topic_folder/",
file_origin=file_origin,
file_format="json",
file_compression="gzip",
directory="path/to/your/topic_folder/",
origin=origin,
format="json",
compression="gzip",
)
sdf = app.dataframe(source=source).print(metadata=True)
# YOUR LOGIC HERE!
Expand Down Expand Up @@ -90,19 +90,19 @@ Here are some important configurations to be aware of (see [File Source API](../

`FileSource`:

- `filepath`: a filepath to recursively read through (exclude bucket name).
- `directory`: a directory to recursively read through (exclude bucket name).
**Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`).
- `file_origin`: An `S3Origin` instance.
- `origin`: An `S3Origin` instance.


### Optional:

`FileSource`:

- `file_format`: what format the message files are in (ex: `"json"`, `"parquet"`).
**Advanced**: can optionally provide a `Format` instance (`file_compression` will then be ignored).
- `format`: what format the message files are in (ex: `"json"`, `"parquet"`).
**Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored).
**Default**: `"json"`
- `file_compression`: what compression is used on the given files, if any (ex: `"gzip"`)
- `compression`: what compression is used on the given files, if any (ex: `"gzip"`)
**Default**: `None`
- `as_replay`: Produce the messages with the original time delay between them, else as fast as possible.
**Note**: Time delay will only be accurate _per partition_, NOT overall.
Expand Down Expand Up @@ -170,7 +170,7 @@ This will result in the following Kafka message format for `Application`:
### Custom Schemas (Advanced)

If the original files are not formatted as expected, custom loaders can be configured
on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(file_format=<Format>)`.
on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(format=<Format>)`.

Formats can be imported from `quixstreams.sources.community.file.formats`.

Expand All @@ -189,7 +189,7 @@ The default topic will have a partition count that reflects the partition count
within the provided topic's folder structure.

The default topic name the Application dumps to is based on the last folder name of
the `FileSource` `filepath` as: `source__<last folder name>`.
the `FileSource` `directory` as: `source__<last folder name>`.

## Testing Locally

Expand Down
44 changes: 22 additions & 22 deletions quixstreams/sources/community/file/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ class FileSource(Source):
app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")
file_origin = S3Origin(
origin = S3Origin(
bucket="<YOUR BUCKET>",
aws_access_key_id="<YOUR KEY ID>",
aws_secret_access_key="<YOUR SECRET KEY>",
aws_region="<YOUR REGION>",
)
source = FileSource(
filepath="path/to/your/topic_folder/",
file_origin=file_origin,
file_format="json",
file_compression="gzip",
directory="path/to/your/topic_folder/",
origin=origin,
format="json",
compression="gzip",
)
sdf = app.dataframe(source=source).print(metadata=True)
# YOUR LOGIC HERE!
Expand All @@ -78,37 +78,37 @@ class FileSource(Source):

def __init__(
self,
filepath: Union[str, Path],
file_format: Union[Format, FormatName] = "json",
file_origin: Origin = LocalOrigin(),
file_compression: Optional[CompressionName] = None,
directory: Union[str, Path],
format: Union[Format, FormatName] = "json",
origin: Origin = LocalOrigin(),
compression: Optional[CompressionName] = None,
as_replay: bool = True,
name: Optional[str] = None,
shutdown_timeout: float = 10,
):
"""
:param filepath: a filepath to recursively read through; it is recommended to
:param directory: a directory to recursively read through; it is recommended to
provide the path to a given topic folder (ex: `/path/to/topic_a`).
:param file_format: what format the message files are in (ex: json, parquet).
Optionally, can provide a `Format` instance if more than file_compression
is necessary to define (file_compression will then be ignored).
:param file_origin: an Origin type (defaults to reading local files).
:param file_compression: what compression is used on the given files, if any.
:param format: what format the message files are in (ex: json, parquet).
Optionally, can provide a `Format` instance if more than compression
is necessary to define (compression will then be ignored).
:param origin: an Origin type (defaults to reading local files).
:param compression: what compression is used on the given files, if any.
:param as_replay: Produce the messages with the original time delay between them.
Otherwise, produce the messages as fast as possible.
NOTE: Time delay will only be accurate per partition, NOT overall.
:param name: The name of the Source application (Default: last folder name).
:param shutdown_timeout: Time in seconds the application waits for the source
to gracefully shutdown
"""
self._filepath = Path(filepath)
self._origin = file_origin
self._formatter = _get_formatter(file_format, file_compression)
self._directory = Path(directory)
self._origin = origin
self._formatter = _get_formatter(format, compression)
self._as_replay = as_replay
self._previous_timestamp = None
self._previous_partition = None
super().__init__(
name=name or self._filepath.name, shutdown_timeout=shutdown_timeout
name=name or self._directory.name, shutdown_timeout=shutdown_timeout
)

def _replay_delay(self, current_timestamp: int):
Expand Down Expand Up @@ -152,15 +152,15 @@ def default_topic(self) -> Topic:
"""
topic = super().default_topic()
topic.config = TopicConfig(
num_partitions=self._origin.get_folder_count(self._filepath) or 1,
num_partitions=self._origin.get_folder_count(self._directory) or 1,
replication_factor=1,
)
return topic

def run(self):
while self._running:
logger.info(f"Reading files from topic {self._filepath.name}")
for file in self._origin.file_collector(self._filepath):
logger.info(f"Reading files from topic {self._directory.name}")
for file in self._origin.file_collector(self._directory):
logger.debug(f"Reading file {file}")
self._check_file_partition_number(file)
filestream = self._origin.get_raw_file_stream(file)
Expand Down
40 changes: 20 additions & 20 deletions quixstreams/sources/community/file/origins/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Generator

from .base import ExternalOrigin
from .base import Origin

try:
from azure.storage.blob import BlobServiceClient
Expand All @@ -14,23 +14,10 @@
'run "pip install quixstreams[azure]" to use AzureOrigin'
) from exc

__all__ = ("AzureOrigin",)
__all__ = ("AzureFilesOrigin",)


class AzureOrigin(ExternalOrigin):
def get_folder_count(self, path: Path) -> int:
"""
This is a simplified version of the recommended way to retrieve folder
names based on the azure SDK docs examples.
"""
path = f"{path}/"
folders = set()
for blob in self._client.list_blobs(name_starts_with=path):
relative_dir = os.path.dirname(os.path.relpath(blob.name, path))
if relative_dir and ("/" not in relative_dir):
folders.add(relative_dir)
return len(folders)

class AzureFilesOrigin(Origin):
def __init__(
self,
connection_string: str,
Expand All @@ -47,13 +34,26 @@ def _get_client(self, auth: str) -> ContainerClient:
blob_client = BlobServiceClient.from_connection_string(auth)
return blob_client.get_container_client(self.root_location)

def file_collector(self, folder: Path) -> Generator[Path, None, None]:
data = self._client.list_blob_names(name_starts_with=str(folder))
def file_collector(self, filepath: Path) -> Generator[Path, None, None]:
data = self._client.list_blob_names(name_starts_with=str(filepath))
for page in data.by_page():
for item in page:
yield Path(item)

def get_raw_file_stream(self, blob_name: Path) -> BytesIO:
blob_client = self._client.get_blob_client(str(blob_name))
def get_folder_count(self, directory: Path) -> int:
"""
This is a simplified version of the recommended way to retrieve folder
names based on the azure SDK docs examples.
"""
path = f"{directory}/"
folders = set()
for blob in self._client.list_blobs(name_starts_with=path):
relative_dir = os.path.dirname(os.path.relpath(blob.name, path))
if relative_dir and ("/" not in relative_dir):
folders.add(relative_dir)
return len(folders)

def get_raw_file_stream(self, filepath: Path) -> BytesIO:
blob_client = self._client.get_blob_client(str(filepath))
data = blob_client.download_blob().readall()
return BytesIO(data)
20 changes: 4 additions & 16 deletions quixstreams/sources/community/file/origins/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Any, BinaryIO, Iterable, Union
from typing import BinaryIO, Iterable

__all__ = (
"Origin",
"ExternalOrigin",
)
__all__ = ("Origin",)


class Origin(ABC):
Expand All @@ -26,9 +22,9 @@ def file_collector(self, filepath: Path) -> Iterable[Path]: ...
"""

@abstractmethod
def get_folder_count(self, folder: Path) -> int: ...
def get_folder_count(self, directory: Path) -> int: ...

"""Counts the number of folders at filepath to assume partition counts."""
"""Counts the number of folders at directory to assume partition counts."""

@abstractmethod
def get_raw_file_stream(self, filepath: Path) -> BinaryIO: ...
Expand All @@ -38,11 +34,3 @@ def get_raw_file_stream(self, filepath: Path) -> BinaryIO: ...
Result should be ready for deserialization (and/or decompression).
"""


@dataclass
class ExternalOrigin(Origin, ABC):
"""An interface for interacting with an external file-based client"""

_client: Any
root_location: Union[str, Path]
Loading

0 comments on commit f581e28

Please sign in to comment.