-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support S3 sink via FileSink interface
- Loading branch information
1 parent
7dace1a
commit 96a22bc
Showing
10 changed files
with
454 additions
and
138 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
# AmazonS3 Sink | ||
|
||
!!! info | ||
|
||
This is a **Community** connector. Test it before using in production. | ||
|
||
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. | ||
|
||
This sink writes batches of data to Amazon S3 in various formats. | ||
By default, the data will include the kafka message key, value, and timestamp. | ||
|
||
## How To Install | ||
|
||
To use the S3 sink, you need to install the required dependencies: | ||
|
||
```bash | ||
pip install quixstreams[s3] | ||
``` | ||
|
||
## How It Works | ||
|
||
`FileSink` with `S3Destination` is a batching sink that writes data directly to Amazon S3. | ||
|
||
It batches processed records in memory per topic partition and writes them to S3 objects in a specified bucket and prefix structure. Objects are organized by topic and partition, with each batch being written to a separate object named by its starting offset. | ||
|
||
!!! note | ||
|
||
The S3 bucket must already exist and be accessible. The sink does not create the bucket automatically. If the bucket does not exist or access is denied, an error will be raised when initializing the sink. | ||
|
||
## How To Use | ||
|
||
Create an instance of `FileSink` with `S3Destination` and pass it to the `StreamingDataFrame.sink()` method. | ||
|
||
```python | ||
from quixstreams import Application | ||
from quixstreams.sinks.community.file import FileSink | ||
from quixstreams.sinks.community.file.destinations import S3Destination | ||
|
||
|
||
# Configure the sink to write JSON files to S3 | ||
file_sink = FileSink( | ||
# Optional: defaults to current working directory | ||
directory="data", | ||
# Optional: defaults to "json" | ||
# Available formats: "json", "parquet" or an instance of Format | ||
format=JSONFormat(compress=True), | ||
destination=S3Destination( | ||
bucket="my-bucket", | ||
# Optional: AWS credentials | ||
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], | ||
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], | ||
region_name="eu-west-2", | ||
# Optional: Additional keyword arguments are passed to the boto3 client | ||
endpoint_url="http://localhost:4566", # for LocalStack testing | ||
) | ||
) | ||
|
||
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest") | ||
topic = app.topic('sink-topic') | ||
|
||
sdf = app.dataframe(topic=topic) | ||
sdf.sink(file_sink) | ||
|
||
if __name__ == "__main__": | ||
app.run() | ||
``` | ||
|
||
!!! note | ||
Instead of passing AWS credentials explicitly, you can set them using environment variables: | ||
```bash | ||
export AWS_ACCESS_KEY_ID="your_access_key" | ||
export AWS_SECRET_ACCESS_KEY="your_secret_key" | ||
export AWS_DEFAULT_REGION="eu-west-2" | ||
``` | ||
Then you can create the destination with just the bucket name: | ||
```python | ||
s3_sink = S3Destination(bucket="my-bucket") | ||
``` | ||
|
||
## S3 Object Organization | ||
|
||
Objects in S3 follow this structure: | ||
``` | ||
my-bucket/ | ||
└── data/ | ||
└── sink_topic/ | ||
├── 0/ | ||
│ ├── 0000000000000000000.jsonl | ||
│ ├── 0000000000000000123.jsonl | ||
│ └── 0000000000000001456.jsonl | ||
└── 1/ | ||
├── 0000000000000000000.jsonl | ||
├── 0000000000000000789.jsonl | ||
└── 0000000000000001012.jsonl | ||
``` | ||
|
||
Each object is named using the batch's starting offset (padded to 19 digits) and the appropriate file extension for the chosen format. | ||
|
||
## Supported Formats | ||
|
||
- **JSON**: Supports appending to existing files | ||
- **Parquet**: Does not support appending (new file created for each batch) | ||
|
||
## Delivery Guarantees | ||
|
||
`FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing. |
60 changes: 29 additions & 31 deletions
60
docs/connectors/sinks/file-sink.md → docs/connectors/sinks/local-file-sink.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,78 +1,76 @@ | ||
# File Sink | ||
# Local File Sink | ||
|
||
!!! info | ||
|
||
This is a **Community** connector. Test it before using in production. | ||
|
||
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. | ||
|
||
This sink writes batches of data to files on disk in various formats. | ||
By default, the data will include the kafka message key, value, and timestamp. | ||
|
||
Currently, it supports the following formats: | ||
|
||
- JSON | ||
- Parquet | ||
This sink writes batches of data to local files in various formats. | ||
By default, the data will include the kafka message key, value, and timestamp. | ||
|
||
## How It Works | ||
|
||
`FileSink` is a batching sink. | ||
`FileSink` with `LocalDestination` is a batching sink that writes data to your local filesystem. | ||
|
||
It batches processed records in memory per topic partition and writes them to files in a specified directory structure. Files are organized by topic and partition, with each batch being written to a separate file named by its starting offset. | ||
It batches processed records in memory per topic partition and writes them to files in a specified directory structure. Files are organized by topic and partition. When append mode is disabled (default), each batch is written to a separate file named by its starting offset. When append mode is enabled, all records for a partition are written to a single file. | ||
|
||
The sink can either create new files for each batch or append to existing files (when using formats that support appending). | ||
|
||
## How To Use | ||
|
||
Create an instance of `FileSink` and pass it to the `StreamingDataFrame.sink()` method. | ||
|
||
For the full description of expected parameters, see the [File Sink API](../../api-reference/sinks.md#filesink) page. | ||
|
||
```python | ||
from quixstreams import Application | ||
from quixstreams.sinks.community.file import FileSink | ||
from quixstreams.sinks.community.file.destinations import LocalDestination | ||
from quixstreams.sinks.community.file.formats import JSONFormat | ||
|
||
# Configure the sink to write JSON files | ||
file_sink = FileSink( | ||
output_dir="./output", | ||
format="json", | ||
append=False # Set to True to append to existing files when possible | ||
# Optional: defaults to current working directory | ||
directory="data", | ||
# Optional: defaults to "json" | ||
# Available formats: "json", "parquet" or an instance of Format | ||
format=JSONFormat(compress=True), | ||
# Optional: defaults to LocalDestination(append=False) | ||
destination=LocalDestination(append=True), | ||
) | ||
|
||
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest") | ||
topic = app.topic('sink_topic') | ||
|
||
# Do some processing here | ||
sdf = app.dataframe(topic=topic).print(metadata=True) | ||
topic = app.topic('sink-topic') | ||
|
||
# Sink results to the FileSink | ||
sdf = app.dataframe(topic=topic) | ||
sdf.sink(file_sink) | ||
|
||
if __name__ == "__main__": | ||
# Start the application | ||
app.run() | ||
``` | ||
|
||
## File Organization | ||
|
||
Files are organized in the following directory structure: | ||
``` | ||
output_dir/ | ||
├── sink_topic/ | ||
│ ├── 0/ | ||
│ │ ├── 0000000000000000000.json | ||
│ │ ├── 0000000000000000123.json | ||
│ │ └── 0000000000000001456.json | ||
│ └── 1/ | ||
│ ├── 0000000000000000000.json | ||
│ ├── 0000000000000000789.json | ||
│ └── 0000000000000001012.json | ||
data/ | ||
└── sink_topic/ | ||
├── 0/ | ||
│ ├── 0000000000000000000.jsonl | ||
│ ├── 0000000000000000123.jsonl | ||
│ └── 0000000000000001456.jsonl | ||
└── 1/ | ||
├── 0000000000000000000.jsonl | ||
├── 0000000000000000789.jsonl | ||
└── 0000000000000001012.jsonl | ||
``` | ||
|
||
Each file is named using the batch's starting offset (padded to 19 digits) and the appropriate file extension for the chosen format. | ||
|
||
## Supported Formats | ||
|
||
- **JSON**: Supports appending to existing files | ||
- **Parquet**: Does not support appending (new file created for each batch) | ||
|
||
## Delivery Guarantees | ||
|
||
`FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,14 @@ | ||
from .formats import InvalidFormatError, JSONFormat, ParquetFormat | ||
from .destinations import Destination, LocalDestination, S3Destination | ||
from .formats import Format, InvalidFormatError, JSONFormat, ParquetFormat | ||
from .sink import FileSink | ||
|
||
__all__ = [ | ||
"FileSink", | ||
"Destination", | ||
"LocalDestination", | ||
"S3Destination", | ||
"Format", | ||
"InvalidFormatError", | ||
"JSONFormat", | ||
"ParquetFormat", | ||
"FileSink", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from .base import Destination | ||
from .local import LocalDestination | ||
from .s3 import S3Destination | ||
|
||
__all__ = ("Destination", "LocalDestination", "S3Destination") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import logging | ||
import re | ||
from abc import ABC, abstractmethod | ||
from pathlib import Path | ||
|
||
from quixstreams.sinks.base import SinkBatch | ||
from quixstreams.sinks.community.file.formats import Format | ||
|
||
__all__ = ("Destination",) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
_UNSAFE_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9 ._]") | ||
|
||
|
||
class Destination(ABC): | ||
"""Abstract base class for defining where and how data should be stored. | ||
Destinations handle the storage of serialized data, whether that's to local | ||
disk, cloud storage, or other locations. They manage the physical writing of | ||
data while maintaining a consistent directory/path structure based on topics | ||
and partitions. | ||
""" | ||
|
||
_base_directory: str = "" | ||
_extension: str = "" | ||
|
||
def set_directory(self, directory: str) -> None: | ||
"""Configure the base directory for storing files. | ||
:param directory: The base directory path where files will be stored. | ||
:raises ValueError: If the directory path contains invalid characters. | ||
Only alphanumeric characters (a-zA-Z0-9), spaces, dots, and | ||
underscores are allowed. | ||
""" | ||
if _UNSAFE_CHARACTERS_REGEX.search(directory): | ||
raise ValueError( | ||
f"Invalid characters in directory path: {directory}. " | ||
f"Only alphanumeric characters (a-zA-Z0-9), spaces ( ), " | ||
"dots (.), and underscores (_) are allowed." | ||
) | ||
self._base_directory = directory | ||
logger.info("Directory set to '%s'", directory) | ||
|
||
def set_extension(self, format: Format) -> None: | ||
"""Set the file extension based on the format. | ||
:param format: The Format instance that defines the file extension. | ||
""" | ||
self._extension = format.file_extension | ||
logger.info("File extension set to '%s'", self._extension) | ||
|
||
@abstractmethod | ||
def write(self, data: bytes, batch: SinkBatch) -> None: | ||
"""Write the serialized data to storage. | ||
:param data: The serialized data to write. | ||
:param batch: The batch information containing topic, partition and offset | ||
details. | ||
""" | ||
... | ||
|
||
def _path(self, batch: SinkBatch) -> Path: | ||
"""Generate the full path where the batch data should be stored. | ||
:param batch: The batch information containing topic, partition and offset | ||
details. | ||
:return: A Path object representing the full file path. | ||
""" | ||
return self._directory(batch) / self._filename(batch) | ||
|
||
def _directory(self, batch: SinkBatch) -> Path: | ||
"""Generate the full directory path for a batch. | ||
Creates a directory structure using the base directory, sanitized topic | ||
name, and partition number. | ||
:param batch: The batch information containing topic and partition details. | ||
:return: A Path object representing the directory where files should be | ||
stored. | ||
""" | ||
topic = _UNSAFE_CHARACTERS_REGEX.sub("_", batch.topic) | ||
return Path(self._base_directory) / topic / str(batch.partition) | ||
|
||
def _filename(self, batch: SinkBatch) -> str: | ||
"""Generate the filename for a batch. | ||
Creates a filename using the batch's starting offset as a zero-padded | ||
number to ensure correct ordering. The offset is padded to cover the max | ||
length of a signed 64-bit integer (19 digits), e.g., '0000000000000123456'. | ||
:param batch: The batch information containing start_offset details. | ||
:return: A string representing the filename with zero-padded offset and | ||
extension. | ||
""" | ||
return f"{batch.start_offset:019d}{self._extension}" |
Oops, something went wrong.