diff --git a/docs/build/build.py b/docs/build/build.py index 102c626e4..0c62146b2 100644 --- a/docs/build/build.py +++ b/docs/build/build.py @@ -145,6 +145,8 @@ "quixstreams.sources.core.kafka.quix", "quixstreams.sources.community.file.file", "quixstreams.sources.community.file.compressions.gzip", + "quixstreams.sources.community.file.origins.local", + "quixstreams.sources.community.file.origins.s3", "quixstreams.sources.community.file.formats.json", "quixstreams.sources.community.file.formats.parquet", "quixstreams.sources.community.kinesis.kinesis", diff --git a/docs/connectors/sources/file-source.md b/docs/connectors/sources/file-source.md deleted file mode 100644 index f538f9da1..000000000 --- a/docs/connectors/sources/file-source.md +++ /dev/null @@ -1,89 +0,0 @@ -# File Source - -!!! info - - This is a **Community** connector. Test it before using in production. - - To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. - -This source enables reading from a localized file source, such as a JSONlines or Parquet -file. It also supports file (de)compression. - -The resulting messages can be produced in "replay" mode, where the time between record -producing is matched as close as possible to the original. (per topic partition only). - -The File Source connector is generally intended to be used alongside the related -[File Sink](../sinks/file-sink.md) (in terms of expected file and data formatting). - -## How To Use - -To use a File Source, you need to create an instance of `FileSource` -and pass it to the `app.dataframe()` method. - -One important thing to note is that you should in general point to a single topic folder -(rather than a root folder with many topics) otherwise topic partitions may not line up correctly. - -For the full description of expected parameters, see the [File Source API](../../api-reference/sources.md#filesource) page. - -```python -from quixstreams import Application -from quixstreams.sources.community.file import FileSource - -app = Application(broker_address="localhost:9092") -source = FileSource( - filepath="/path/to/my/topic_folder", - file_format="json", - file_compression="gzip", - as_replay=True -) -sdf = app.dataframe(source=source).print(metadata=True) - -if __name__ == "__main__": - app.run() -``` - -## File hierarchy/structure - -The File Source expects a folder structure like so: - -``` - my_sinked_topics/ - ├── topic_a/ # topic name (use this path to File Source!) - │ ├── 0/ # topic partition number - │ │ ├── 0000.ext # formatted offset files (ex: JSON) - │ │ └── 0011.ext - │ └── 1/ - │ ├── 0003.ext - │ └── 0016.ext - └── topic_b/ - └── etc... -``` - -This is the default structure generated by the File Sink. - -## File data format/schema - -The expected data schema is largely dependent on the file format chosen. - -For easiest use with the [File Sink](../sinks/file-sink.md), you can follow these patterns: - -- for row-based formats (like JSON), the expected data should have records -with the following fields, where value is the entirety of the message value, -ideally as a JSON-deserializable item: - - `_key` - - `_value` - - `_timestamp` - -- for columnar formats (like Parquet), they do not expect an explicit `value` -field; instead all columns should be included individually while including `_key` and `_timestamp`: - - `_key` - - `_timestamp` - - `field_a` - - `field_b`... - -etc... - -## Topic - -The default topic will have a partition count that reflects the partition count found -within the provided topic's folder structure. \ No newline at end of file diff --git a/docs/connectors/sources/local-file-source.md b/docs/connectors/sources/local-file-source.md new file mode 100644 index 000000000..8626c3c42 --- /dev/null +++ b/docs/connectors/sources/local-file-source.md @@ -0,0 +1,171 @@ +# Local File Source + +!!! info + + This is a **Community** connector. Test it before using in production. + + To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. + +This source reads records from files at a local directory and produces +them as messages to a kafka topic using any desired `StreamingDataFrame`-based transformations. + +The resulting messages can be produced in "replay" mode, where the time between record +producing is matched as close as possible to the original. (per topic partition only). + + +## How To Install + +Simply install Quix Streams, no options required: + +```bash +pip install quixstreams +``` + +## How It Works + +`FileSource` steps through each folder within the provided path and dumps each record +contained in each file as a message to a Kafka topic. Folders are navigated in +lexicographical order. + +Records are read in a streaming fashion and committed after every file, offering +[at-least-once guarantees](#processingdelivery-guarantees). + +It can handle one given file type (ex. JSONlines or Parquet) at a time, and also +supports file decompression. + +You can learn more details about the [expected kafka message format](#message-data-formatschema) below. + +## How To Use + +Local File Source is the default configuration of the `FileSource` connector. + +Simply hand the configured `FileSource` (without a `origin`) to your `SDF` +(`app.dataframe(source=)`). + +For more details around various settings, see [configuration](#configuration). + +```python +from quixstreams import Application +from quixstreams.sources.community.file import FileSource + +app = Application(broker_address="localhost:9092") +source = FileSource( + directory="/path/to/my/topic_folder", + format="json", + compression="gzip", + replay_speed=1.0, +) +sdf = app.dataframe(source=source).print(metadata=True) +# YOUR LOGIC HERE! + +if __name__ == "__main__": + app.run() +``` + +## Configuration + +Here are some important configurations to be aware of (see [File Source API](../../api-reference/sources.md#filesource) for all parameters). + +### Required: + +- `directory`: a directory to recursively read through (exclude bucket name). + **Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`). + +### Optional: + +- `format`: what format the message files are in (ex: `"json"`, `"parquet"`). + **Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored). + **Default**: `"json"` +- `compression`: what compression is used on the given files, if any (ex: `"gzip"`) + **Default**: `None` +- `replay_speed`: Produce the messages with this speed multiplier, which roughly + reflects the time "delay" between the original message producing. + Use any `float` `>= 0.0`, where `0.0` is no delay, and `1.0` is the original speed. + **Note**: Time delay will only be accurate _per partition_, NOT overall. + **Default**: 1.0 + + +## File hierarchy/structure + +The File Source expects a folder structure like so: + +``` + my_sinked_topics/ + ├── topic_a/ # topic name (use this path to File Source!) + │ ├── 0/ # topic partition number + │ │ ├── 0000.ext # formatted offset files (ex: JSON) + │ │ └── 0011.ext + │ └── 1/ + │ ├── 0003.ext + │ └── 0016.ext + └── topic_b/ + └── etc... +``` + +This is the default structure generated by the File Sink. + +## Message Data Format/Schema + +The expected file schema largely depends on the chosen +file format. + +For easiest use (especially alongside [`FileSink`](../sinks/file-sink.md)), +you can follow these patterns: + +### Row-based Formats (ex: JSON) + +Files should have records with the following fields, with `_value` being a +JSON-deserializable item: + + - `_key` + - `_value` + - `_timestamp` + + +This will result in the following Kafka message format for `Application`: + +- Message `key` will be the record `_key` as `bytes`. +- Message `value` will be the record `_value` as a `json`/`dict` +- Message `timestamp` will be the record `_timestamp` (ms). + +### Columnar Formats (ex: Parquet) +These do not expect an explicit `value` field; instead all columns should be included +individually while including `_key` and `_timestamp`: + + - `_key` + - `_timestamp` + - `field_a` + - `field_b` + etc... + + +This will result in the following Kafka message format for `Application`: + +- Message `key` will be the record `_key` as `bytes`. +- Message `value` will be every record field except `_key` and `_timestamp` packed as a `json`/`dict` +- Message `timestamp` will be the record `_timestamp` (ms). + + +### Custom Schemas (Advanced) + +If the original files are not formatted as expected, custom loaders can be configured +on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(format=)`. + +Formats can be imported from `quixstreams.sources.community.file.formats`. + +## Processing/Delivery Guarantees + +This Source offers "at-least-once" guarantees with message delivery: messages are +guaranteed to be committed when a file is finished processing. + +However, it does not save any state/position: an unhandled exception will cause the +`Application` to fail, and rerunning the `Application` will begin processing from the +beginning (reproducing all previously processed messages). + +## Topic + +The default topic will have a partition count that reflects the partition count found +within the provided topic's folder structure. + +The default topic name the Application dumps to is based on the last folder name of +the `FileSource` `directory` as: `source__`. diff --git a/docs/connectors/sources/s3-file-source.md b/docs/connectors/sources/s3-file-source.md new file mode 100644 index 000000000..681063a80 --- /dev/null +++ b/docs/connectors/sources/s3-file-source.md @@ -0,0 +1,216 @@ +# AWS S3 File Source + +!!! info + + This is a **Community** connector. Test it before using in production. + + To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. + +This source reads records from files located in an AWS S3 bucket path and produces +them as messages to a kafka topic using any desired `StreamingDataFrame`-based transformations. + +The resulting messages can be produced in "replay" mode, where the time between record +producing is matched as close as possible to the original. (per topic partition only). + + +## How To Install + +Install Quix Streams with the following optional dependencies: + +```bash +pip install quixstreams[s3] +``` + +## How It Works + +`FileSource` steps through each folder within the provided path and dumps each record +contained in each file as a message to a Kafka topic. Folders are navigated in +lexicographical order. + +Records are read in a streaming fashion and committed after every file, offering +[at-least-once guarantees](#processingdelivery-guarantees). + +It can handle one given file type (ex. JSONlines or Parquet) at a time, and also +supports file decompression. + +You can learn more details about the [expected kafka message format](#message-data-formatschema) below. + +## How To Use + +S3 File Source is just a special configuration of the `FileSource` connector. + +Simply provide it an `S3Origin` (`FileSource(origin=)`). + +Then, hand the configured `FileSource` to your `SDF` (`app.dataframe(source=)`). + +For more details around various settings, see [configuration](#configuration). + +```python +from quixstreams import Application +from quixstreams.sources.community.file import FileSource +from quixstreams.sources.community.file.origins import S3Origin + +app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") + +origin = S3Origin( + bucket="", + aws_access_key_id="", + aws_secret_access_key="", + region_name="", +) +source = FileSource( + directory="path/to/your/topic_folder/", + origin=origin, + format="json", + compression="gzip", +) +sdf = app.dataframe(source=source).print(metadata=True) +# YOUR LOGIC HERE! + +if __name__ == "__main__": + app.run() +``` + +## Configuration + +Here are some important configurations to be aware of (see [File Source API](../../api-reference/sources.md#filesource) for all parameters). + +### Required: + +`S3Origin`: + +- `bucket`: The S3 bucket name only (ex: `"your-bucket"`). +- `region_name`: AWS region (ex: us-east-1). + **Note**: can alternatively set the `AWS_REGION` environment variable. +- `aws_access_key_id`: AWS User key ID. + **Note**: can alternatively set the `AWS_ACCESS_KEY_ID` environment variable. +- `aws_secret_access_key`: AWS secret key. + **Note**: can alternatively set the `AWS_SECRET_ACCESS_KEY` environment variable. + + +`FileSource`: + +- `directory`: a directory to recursively read through (exclude bucket name or starting "/"). + **Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`). +- `origin`: An `S3Origin` instance. + + +### Optional: + +`FileSource`: + +- `format`: what format the message files are in (ex: `"json"`, `"parquet"`). + **Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored). + **Default**: `"json"` +- `compression`: what compression is used on the given files, if any (ex: `"gzip"`) + **Default**: `None` +- `replay_speed`: Produce the messages with this speed multiplier, which roughly + reflects the time "delay" between the original message producing. + Use any `float` `>= 0.0`, where `0.0` is no delay, and `1.0` is the original speed. + **Note**: Time delay will only be accurate _per partition_, NOT overall. + **Default**: 1.0 + +## File hierarchy/structure + +The File Source expects a folder structure like so: + +``` + my_sinked_topics/ + ├── topic_a/ # topic name (use this path to File Source!) + │ ├── 0/ # topic partition number + │ │ ├── 0000.ext # formatted offset files (ex: JSON) + │ │ └── 0011.ext + │ └── 1/ + │ ├── 0003.ext + │ └── 0016.ext + └── topic_b/ + └── etc... +``` + +## Message Data Format/Schema + +The expected file schema largely depends on the chosen +file format. + +For easiest use (especially alongside [`FileSink`](../sinks/file-sink.md)), +you can follow these patterns: + +### Row-based Formats (ex: JSON) + +Files should have records with the following fields, with `_value` being a +JSON-deserializable item: + + - `_key` + - `_value` + - `_timestamp` + + +This will result in the following Kafka message format for `Application`: + +- Message `key` will be the record `_key` as `bytes`. +- Message `value` will be the record `_value` as a `json`/`dict` +- Message `timestamp` will be the record `_timestamp` (ms). + +### Columnar Formats (ex: Parquet) +These do not expect an explicit `value` field; instead all columns should be included +individually while including `_key` and `_timestamp`: + + - `_key` + - `_timestamp` + - `field_a` + - `field_b` + etc... + + +This will result in the following Kafka message format for `Application`: + +- Message `key` will be the record `_key` as `bytes`. +- Message `value` will be every record field except `_key` and `_timestamp` packed as a `json`/`dict` +- Message `timestamp` will be the record `_timestamp` (ms). + + +### Custom Schemas (Advanced) + +If the original files are not formatted as expected, custom loaders can be configured +on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(format=)`. + +Formats can be imported from `quixstreams.sources.community.file.formats`. + +## Processing/Delivery Guarantees + +This Source offers "at-least-once" guarantees with message delivery: messages are +guaranteed to be committed when a file is finished processing. + +However, it does not save any state/position: an unhandled exception will cause the +`Application` to fail, and rerunning the `Application` will begin processing from the +beginning (reproducing all previously processed messages). + +## Topic + +The default topic will have a partition count that reflects the partition count found +within the provided topic's folder structure. + +The default topic name the Application dumps to is based on the last folder name of +the `FileSource` `directory` as: `source__`. + +## Testing Locally + +Rather than connect to AWS, you can alternatively test your application using a local +emulated S3 host via Docker: + +1. Execute in terminal: + + ```bash + docker run --rm -d --name s3 \ + -p 4566:4566 \ + -e SERVICES=s3 \ + -e EDGE_PORT=4566 \ + -e DEBUG=1 \ + localstack/localstack:latest + ``` + +2. Set `endpoint_url` for `S3Origin` _OR_ the `AWS_ENDPOINT_URL_S3` + environment variable to `http://localhost:4566` + +3. Set all other `aws_` parameters for `S3Origin` to _any_ string. +They will not be used, but they must still be populated! diff --git a/quixstreams/sources/community/file/__init__.py b/quixstreams/sources/community/file/__init__.py index 9d6be93f9..73db23783 100644 --- a/quixstreams/sources/community/file/__init__.py +++ b/quixstreams/sources/community/file/__init__.py @@ -1,3 +1,4 @@ # ruff: noqa: F403 from .file import * from .formats import * +from .origins import * diff --git a/quixstreams/sources/community/file/compressions/base.py b/quixstreams/sources/community/file/compressions/base.py index c4b90b7e6..c2c772259 100644 --- a/quixstreams/sources/community/file/compressions/base.py +++ b/quixstreams/sources/community/file/compressions/base.py @@ -1,6 +1,5 @@ from abc import ABC, abstractmethod -from pathlib import Path -from typing import Literal +from typing import BinaryIO, Literal __all__ = ( "Decompressor", @@ -13,4 +12,4 @@ class Decompressor(ABC): @abstractmethod - def decompress(self, filepath: Path) -> bytes: ... + def decompress(self, filestream: BinaryIO) -> bytes: ... diff --git a/quixstreams/sources/community/file/compressions/gzip.py b/quixstreams/sources/community/file/compressions/gzip.py index 5bfc9aeea..2aaf67442 100644 --- a/quixstreams/sources/community/file/compressions/gzip.py +++ b/quixstreams/sources/community/file/compressions/gzip.py @@ -1,4 +1,5 @@ -from pathlib import Path +from gzip import decompress +from typing import BinaryIO from .base import Decompressor @@ -7,10 +8,7 @@ class GZipDecompressor(Decompressor): def __init__(self): - from gzip import decompress - self._decompressor = decompress - def decompress(self, filepath: Path) -> bytes: - with open(filepath, "rb") as f: - return self._decompressor(f.read()) + def decompress(self, filestream: BinaryIO) -> bytes: + return self._decompressor(filestream.read()) diff --git a/quixstreams/sources/community/file/file.py b/quixstreams/sources/community/file/file.py index 91313dd07..0d3f4dc84 100644 --- a/quixstreams/sources/community/file/file.py +++ b/quixstreams/sources/community/file/file.py @@ -1,13 +1,15 @@ import logging from pathlib import Path from time import sleep -from typing import Generator, Optional, Union +from typing import Optional, Union from quixstreams.models import Topic, TopicConfig from quixstreams.sources import Source from .compressions import CompressionName from .formats import FORMATS, Format, FormatName +from .origins import LocalOrigin +from .origins.base import Origin __all__ = ("FileSource",) @@ -16,8 +18,12 @@ class FileSource(Source): """ - Ingest a set of local files into kafka by iterating through the provided folder and - processing all nested files within it. + Ingest a set of files from a desired origin into Kafka by iterating through the + provided folder and processing all nested files within it. + + Origins include a local filestore, AWS S3, or Microsoft Azure. + + FileSource defaults to a local filestore (LocalOrigin) + JSON format. Expects folder and file structures as generated by the related FileSink connector: @@ -46,14 +52,24 @@ class FileSource(Source): ```python from quixstreams import Application from quixstreams.sources.community.file import FileSource + from quixstreams.sources.community.file.origins import S3Origin app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") + + origin = S3Origin( + bucket="", + aws_access_key_id="", + aws_secret_access_key="", + aws_region="", + ) source = FileSource( - filepath="/path/to/my/topic_folder", - file_format="json", - file_compression="gzip", + directory="path/to/your/topic_folder/", + origin=origin, + format="json", + compression="gzip", ) sdf = app.dataframe(source=source).print(metadata=True) + # YOUR LOGIC HERE! if __name__ == "__main__": app.run() @@ -62,34 +78,41 @@ class FileSource(Source): def __init__( self, - filepath: Union[str, Path], - file_format: Union[Format, FormatName], - file_compression: Optional[CompressionName] = None, - as_replay: bool = True, + directory: Union[str, Path], + format: Union[Format, FormatName] = "json", + origin: Origin = LocalOrigin(), + compression: Optional[CompressionName] = None, + replay_speed: float = 1.0, name: Optional[str] = None, shutdown_timeout: float = 10, ): """ - :param filepath: a filepath to recursively read through; it is recommended to + :param directory: a directory to recursively read through; it is recommended to provide the path to a given topic folder (ex: `/path/to/topic_a`). - :param file_format: what format the message files are in (ex: json, parquet). - Optionally, can provide a `Format` instance if more than file_compression - is necessary to define (file_compression will then be ignored). - :param file_compression: what compression is used on the given files, if any. - :param as_replay: Produce the messages with the original time delay between them. - Otherwise, produce the messages as fast as possible. + :param format: what format the message files are in (ex: json, parquet). + Optionally, can provide a `Format` instance if more than compression + is necessary to define (compression will then be ignored). + :param origin: an Origin type (defaults to reading local files). + :param compression: what compression is used on the given files, if any. + :param replay_speed: Produce the messages with this speed multiplier, which + roughly reflects the time "delay" between the original message producing. + Use any float >= 0, where 0 is no delay, and 1 is the original speed. NOTE: Time delay will only be accurate per partition, NOT overall. :param name: The name of the Source application (Default: last folder name). :param shutdown_timeout: Time in seconds the application waits for the source to gracefully shutdown """ - self._filepath = Path(filepath) - self._formatter = _get_formatter(file_format, file_compression) - self._as_replay = as_replay + if not replay_speed >= 0: + raise ValueError("`replay_speed` must be a positive value") + + self._directory = Path(directory) + self._origin = origin + self._formatter = _get_formatter(format, compression) + self._replay_speed = replay_speed self._previous_timestamp = None self._previous_partition = None super().__init__( - name=name or self._filepath.name, shutdown_timeout=shutdown_timeout + name=name or self._directory.name, shutdown_timeout=shutdown_timeout ) def _replay_delay(self, current_timestamp: int): @@ -98,27 +121,13 @@ def _replay_delay(self, current_timestamp: int): based on their timestamps. """ if self._previous_timestamp is not None: - time_diff = (current_timestamp - self._previous_timestamp) / 1000 - if time_diff > 0: - logger.debug(f"Sleeping for {time_diff} seconds...") - sleep(time_diff) + time_diff_seconds = (current_timestamp - self._previous_timestamp) / 1000 + replay_diff_seconds = time_diff_seconds * self._replay_speed + if replay_diff_seconds > 0.01: # only sleep when diff is "big enough" + logger.debug(f"Sleeping for {replay_diff_seconds} seconds...") + sleep(replay_diff_seconds) self._previous_timestamp = current_timestamp - def _get_partition_count(self) -> int: - return len([f for f in self._filepath.iterdir()]) - - def default_topic(self) -> Topic: - """ - Uses the file structure to generate the desired partition count for the - internal topic. - :return: the original default topic, with updated partition count - """ - topic = super().default_topic() - topic.config = TopicConfig( - num_partitions=self._get_partition_count(), replication_factor=1 - ) - return topic - def _check_file_partition_number(self, file: Path): """ Checks whether the next file is the start of a new partition so the timestamp @@ -140,14 +149,29 @@ def _produce(self, record: dict): key=kafka_msg.key, value=kafka_msg.value, timestamp=kafka_msg.timestamp ) + def default_topic(self) -> Topic: + """ + Uses the file structure to generate the desired partition count for the + internal topic. + :return: the original default topic, with updated partition count + """ + topic = super().default_topic() + topic.config = TopicConfig( + num_partitions=self._origin.get_folder_count(self._directory) or 1, + replication_factor=1, + ) + return topic + def run(self): while self._running: - for file in _file_finder(self._filepath): - logger.info(f"Reading files from topic {self._filepath.name}") + logger.info(f"Reading files from topic {self._directory.name}") + for file in self._origin.file_collector(self._directory): + logger.debug(f"Reading file {file}") self._check_file_partition_number(file) - for record in self._formatter.file_read(file): - if self._as_replay: - self._replay_delay(record["_timestamp"]) + filestream = self._origin.get_raw_file_stream(file) + for record in self._formatter.read(filestream): + if timestamp := record.get("_timestamp"): + self._replay_delay(timestamp) self._produce(record) self.flush() return @@ -167,11 +191,3 @@ def _get_formatter( f"Allowed values: {allowed_formats}, " f"or an instance of a subclass of `Format`." ) - - -def _file_finder(filepath: Path) -> Generator[Path, None, None]: - if filepath.is_dir(): - for i in sorted(filepath.iterdir(), key=lambda x: x.name): - yield from _file_finder(i) - else: - yield filepath diff --git a/quixstreams/sources/community/file/formats/base.py b/quixstreams/sources/community/file/formats/base.py index 9a164e5ef..d18332d39 100644 --- a/quixstreams/sources/community/file/formats/base.py +++ b/quixstreams/sources/community/file/formats/base.py @@ -1,6 +1,5 @@ from abc import ABC, abstractmethod from io import BytesIO -from pathlib import Path from typing import BinaryIO, Generator, Iterable, Literal, Optional from ..compressions import COMPRESSION_MAPPER, CompressionName, Decompressor @@ -26,7 +25,6 @@ def __init__(self, compression: Optional[CompressionName] = None): """ super().__init__() this for a usable init. """ - self._file: Optional[BinaryIO] = None self._decompressor: Optional[Decompressor] = None if compression: self._set_decompressor(compression) @@ -48,22 +46,13 @@ def deserialize(self, filestream: BinaryIO) -> Iterable[dict]: """ ... + def _decompress(self, filestream: BinaryIO) -> BinaryIO: + if not self._decompressor: + return filestream + return BytesIO(self._decompressor.decompress(filestream)) + def _set_decompressor(self, extension_or_name: CompressionName): self._decompressor = COMPRESSION_MAPPER[extension_or_name]() - def _open_filestream(self, filepath: Path): - # TODO: maybe check that file extension is valid? - if self._decompressor: - self._file = BytesIO(self._decompressor.decompress(filepath)) - else: - self._file = open(filepath, "rb") - - def _close_filestream(self): - if self._file: - self._file.close() - self._file = None - - def file_read(self, filepath: Path) -> Generator[dict, None, None]: - self._open_filestream(filepath) - yield from self.deserialize(self._file) - self._close_filestream() + def read(self, filestream: BinaryIO) -> Generator[dict, None, None]: + yield from self.deserialize(self._decompress(filestream)) diff --git a/quixstreams/sources/community/file/origins/__init__.py b/quixstreams/sources/community/file/origins/__init__.py new file mode 100644 index 000000000..01248aef5 --- /dev/null +++ b/quixstreams/sources/community/file/origins/__init__.py @@ -0,0 +1,3 @@ +# ruff: noqa: F403 +from .local import * +from .s3 import * diff --git a/quixstreams/sources/community/file/origins/base.py b/quixstreams/sources/community/file/origins/base.py new file mode 100644 index 000000000..927ccef0d --- /dev/null +++ b/quixstreams/sources/community/file/origins/base.py @@ -0,0 +1,36 @@ +from abc import ABC, abstractmethod +from pathlib import Path +from typing import BinaryIO, Iterable + +__all__ = ("Origin",) + + +class Origin(ABC): + """ + An interface for interacting with a file-based client. + + Provides methods for navigating folders and retrieving/opening raw files. + """ + + @abstractmethod + def file_collector(self, filepath: Path) -> Iterable[Path]: ... + + """ + Find all blobs starting from a root folder. + + Each item in the iterable should be a filepath resolvable by `get_raw_file_stream`. + """ + + @abstractmethod + def get_folder_count(self, directory: Path) -> int: ... + + """Counts the number of folders at directory to assume partition counts.""" + + @abstractmethod + def get_raw_file_stream(self, filepath: Path) -> BinaryIO: ... + + """ + Obtain a file and return it as an (open) filestream. + + Result should be ready for deserialization (and/or decompression). + """ diff --git a/quixstreams/sources/community/file/origins/local.py b/quixstreams/sources/community/file/origins/local.py new file mode 100644 index 000000000..5bfb1980c --- /dev/null +++ b/quixstreams/sources/community/file/origins/local.py @@ -0,0 +1,22 @@ +from io import BytesIO +from pathlib import Path +from typing import Generator + +from .base import Origin + +__all__ = ("LocalOrigin",) + + +class LocalOrigin(Origin): + def file_collector(self, filepath: Path) -> Generator[Path, None, None]: + if filepath.is_dir(): + for i in sorted(filepath.iterdir(), key=lambda x: x.name): + yield from self.file_collector(i) + else: + yield filepath + + def get_folder_count(self, directory: Path) -> int: + return len([f for f in directory.iterdir()]) + + def get_raw_file_stream(self, filepath: Path) -> BytesIO: + return BytesIO(filepath.read_bytes()) diff --git a/quixstreams/sources/community/file/origins/s3.py b/quixstreams/sources/community/file/origins/s3.py new file mode 100644 index 000000000..ca30b5a75 --- /dev/null +++ b/quixstreams/sources/community/file/origins/s3.py @@ -0,0 +1,84 @@ +import logging +from io import BytesIO +from os import getenv +from pathlib import Path +from typing import Generator, Optional, Union + +from .base import Origin + +try: + from boto3 import client as boto_client + from mypy_boto3_s3 import S3Client +except ImportError as exc: + raise ImportError( + f"Package {exc.name} is missing: " + 'run "pip install quixstreams[s3]" to use S3Origin' + ) from exc + +logger = logging.getLogger(__name__) + +__all__ = ("S3Origin",) + + +class S3Origin(Origin): + def __init__( + self, + bucket: str, + region_name: Optional[str] = getenv("AWS_REGION"), + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"), + endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_S3"), + ): + """ + Configure IcebergSink to work with AWS Glue. + + :param bucket: The S3 bucket name only (ex: 'your-bucket'). + :param region_name: The AWS region. + NOTE: can alternatively set the AWS_REGION environment variable + :param aws_access_key_id: the AWS access key ID. + NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable + :param aws_secret_access_key: the AWS secret access key. + NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable + :param endpoint_url: the endpoint URL to use; only required for connecting + to a locally hosted S3. + NOTE: can alternatively set the AWS_ENDPOINT_URL_S3 environment variable + """ + self.root_location = bucket + self._credentials = { + "region_name": region_name, + "aws_access_key_id": aws_access_key_id, + "aws_secret_access_key": aws_secret_access_key, + "endpoint_url": endpoint_url, + } + # S3 client runs into pickling errors with multiprocessing. We can't set it + # until multiprocessing starts it. + # We can work around it by setting it during file collection + self._client: Optional[S3Client] = None + + def _get_client(self) -> S3Client: + return boto_client("s3", **self._credentials) + + def file_collector(self, filepath: Union[str, Path]) -> Generator[Path, None, None]: + self._client = self._get_client() + resp = self._client.list_objects( + Bucket=self.root_location, + Prefix=str(filepath), + Delimiter="/", + ) + for _folder in resp.get("CommonPrefixes", []): + yield from self.file_collector(_folder["Prefix"]) + + for file in resp.get("Contents", []): + yield Path(file["Key"]) + + def get_folder_count(self, directory: Path) -> int: + resp = self._get_client().list_objects( + Bucket=self.root_location, Prefix=f"{directory}/", Delimiter="/" + ) + return len(resp["CommonPrefixes"]) + + def get_raw_file_stream(self, filepath: Path) -> BytesIO: + data = self._client.get_object(Bucket=self.root_location, Key=str(filepath))[ + "Body" + ].read() + return BytesIO(data)