diff --git a/docs/build/build.py b/docs/build/build.py index 4cb8ad12e..102c626e4 100644 --- a/docs/build/build.py +++ b/docs/build/build.py @@ -123,6 +123,9 @@ "quixstreams.sinks.community.iceberg", "quixstreams.sinks.community.bigquery", "quixstreams.sinks.community.file.sink", + "quixstreams.sinks.community.file.destinations.base", + "quixstreams.sinks.community.file.destinations.local", + "quixstreams.sinks.community.file.destinations.s3", "quixstreams.sinks.community.file.formats.base", "quixstreams.sinks.community.file.formats.json", "quixstreams.sinks.community.file.formats.parquet", diff --git a/docs/connectors/sinks/amazon-s3-sink.md b/docs/connectors/sinks/amazon-s3-sink.md new file mode 100644 index 000000000..9b6b65232 --- /dev/null +++ b/docs/connectors/sinks/amazon-s3-sink.md @@ -0,0 +1,108 @@ +# AmazonS3 Sink + +!!! info + + This is a **Community** connector. Test it before using in production. + + To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. + +This sink writes batches of data to Amazon S3 in various formats. +By default, the data will include the kafka message key, value, and timestamp. + +## How To Install + +To use the S3 sink, you need to install the required dependencies: + +```bash +pip install quixstreams[s3] +``` + +## How It Works + +`FileSink` with `S3Destination` is a batching sink that writes data directly to Amazon S3. + +It batches processed records in memory per topic partition and writes them to S3 objects in a specified bucket and prefix structure. Objects are organized by topic and partition, with each batch being written to a separate object named by its starting offset. + +Batches are written to S3 during the commit phase of processing. This means the size of each batch (and therefore each S3 object) is influenced by your application's commit settings - either through `commit_interval` or the `commit_every` parameters. + +!!! note + + The S3 bucket must already exist and be accessible. The sink does not create the bucket automatically. If the bucket does not exist or access is denied, an error will be raised when initializing the sink. + +## How To Use + +Create an instance of `FileSink` with `S3Destination` and pass it to the `StreamingDataFrame.sink()` method. + +```python +from quixstreams import Application +from quixstreams.sinks.community.file import FileSink +from quixstreams.sinks.community.file.destinations import S3Destination + + +# Configure the sink to write JSON files to S3 +file_sink = FileSink( + # Optional: defaults to current working directory + directory="data", + # Optional: defaults to "json" + # Available formats: "json", "parquet" or an instance of Format + format=JSONFormat(compress=True), + destination=S3Destination( + bucket="my-bucket", + # Optional: AWS credentials + aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], + aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], + region_name="eu-west-2", + # Optional: Additional keyword arguments are passed to the boto3 client + endpoint_url="http://localhost:4566", # for LocalStack testing + ) +) + +app = Application(broker_address='localhost:9092', auto_offset_reset="earliest") +topic = app.topic('sink-topic') + +sdf = app.dataframe(topic=topic) +sdf.sink(file_sink) + +if __name__ == "__main__": + app.run() +``` + +!!! note + Instead of passing AWS credentials explicitly, you can set them using environment variables: + ```bash + export AWS_ACCESS_KEY_ID="your_access_key" + export AWS_SECRET_ACCESS_KEY="your_secret_key" + export AWS_DEFAULT_REGION="eu-west-2" + ``` + Then you can create the destination with just the bucket name: + ```python + s3_sink = S3Destination(bucket="my-bucket") + ``` + +## S3 Object Organization + +Objects in S3 follow this structure: +``` +my-bucket/ +└── data/ + └── sink_topic/ + ├── 0/ + │ ├── 0000000000000000000.jsonl + │ ├── 0000000000000000123.jsonl + │ └── 0000000000000001456.jsonl + └── 1/ + ├── 0000000000000000000.jsonl + ├── 0000000000000000789.jsonl + └── 0000000000000001012.jsonl +``` + +Each object is named using the batch's starting offset (padded to 19 digits) and the appropriate file extension for the chosen format. + +## Supported Formats + +- **JSON**: Supports appending to existing files +- **Parquet**: Does not support appending (new file created for each batch) + +## Delivery Guarantees + +`FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing. \ No newline at end of file diff --git a/docs/connectors/sinks/file-sink.md b/docs/connectors/sinks/local-file-sink.md similarity index 54% rename from docs/connectors/sinks/file-sink.md rename to docs/connectors/sinks/local-file-sink.md index fc900fffd..73caf5f7c 100644 --- a/docs/connectors/sinks/file-sink.md +++ b/docs/connectors/sinks/local-file-sink.md @@ -1,4 +1,4 @@ -# File Sink +# Local File Sink !!! info @@ -6,19 +6,14 @@ To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. -This sink writes batches of data to files on disk in various formats. -By default, the data will include the kafka message key, value, and timestamp. - -Currently, it supports the following formats: - -- JSON -- Parquet +This sink writes batches of data to local files in various formats. +By default, the data will include the kafka message key, value, and timestamp. ## How It Works -`FileSink` is a batching sink. +`FileSink` with `LocalDestination` is a batching sink that writes data to your local filesystem. -It batches processed records in memory per topic partition and writes them to files in a specified directory structure. Files are organized by topic and partition, with each batch being written to a separate file named by its starting offset. +It batches processed records in memory per topic partition and writes them to files in a specified directory structure. Files are organized by topic and partition. When append mode is disabled (default), each batch is written to a separate file named by its starting offset. When append mode is enabled, all records for a partition are written to a single file. The sink can either create new files for each batch or append to existing files (when using formats that support appending). @@ -26,53 +21,56 @@ The sink can either create new files for each batch or append to existing files Create an instance of `FileSink` and pass it to the `StreamingDataFrame.sink()` method. -For the full description of expected parameters, see the [File Sink API](../../api-reference/sinks.md#filesink) page. - ```python from quixstreams import Application from quixstreams.sinks.community.file import FileSink +from quixstreams.sinks.community.file.destinations import LocalDestination +from quixstreams.sinks.community.file.formats import JSONFormat # Configure the sink to write JSON files file_sink = FileSink( - output_dir="./output", - format="json", - append=False # Set to True to append to existing files when possible + # Optional: defaults to current working directory + directory="data", + # Optional: defaults to "json" + # Available formats: "json", "parquet" or an instance of Format + format=JSONFormat(compress=True), + # Optional: defaults to LocalDestination(append=False) + destination=LocalDestination(append=True), ) app = Application(broker_address='localhost:9092', auto_offset_reset="earliest") -topic = app.topic('sink_topic') - -# Do some processing here -sdf = app.dataframe(topic=topic).print(metadata=True) +topic = app.topic('sink-topic') -# Sink results to the FileSink +sdf = app.dataframe(topic=topic) sdf.sink(file_sink) if __name__ == "__main__": - # Start the application app.run() ``` ## File Organization + Files are organized in the following directory structure: ``` -output_dir/ -├── sink_topic/ -│ ├── 0/ -│ │ ├── 0000000000000000000.json -│ │ ├── 0000000000000000123.json -│ │ └── 0000000000000001456.json -│ └── 1/ -│ ├── 0000000000000000000.json -│ ├── 0000000000000000789.json -│ └── 0000000000000001012.json +data/ +└── sink_topic/ + ├── 0/ + │ ├── 0000000000000000000.jsonl + │ ├── 0000000000000000123.jsonl + │ └── 0000000000000001456.jsonl + └── 1/ + ├── 0000000000000000000.jsonl + ├── 0000000000000000789.jsonl + └── 0000000000000001012.jsonl ``` Each file is named using the batch's starting offset (padded to 19 digits) and the appropriate file extension for the chosen format. ## Supported Formats + - **JSON**: Supports appending to existing files - **Parquet**: Does not support appending (new file created for each batch) ## Delivery Guarantees + `FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing. diff --git a/pyproject.toml b/pyproject.toml index 1f64ba568..7090634d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,11 @@ iceberg_aws = ["pyiceberg[pyarrow,glue]>=0.7,<0.8"] bigquery = ["google-cloud-bigquery>=3.26.0,<3.27"] pubsub = ["google-cloud-pubsub>=2.23.1,<3"] postgresql = ["psycopg2-binary>=2.9.9,<3"] -kinesis = ["boto3>=1.35.65,<2.0", "boto3-stubs[kinesis]>=1.35.65,<2.0"] +aws = ["boto3>=1.35.65,<2.0", "boto3-stubs>=1.35.65,<2.0"] +# AWS dependencies are separated by service to support +# different requirements in the future. +kinesis = ["quixstreams[aws]"] +s3 = ["quixstreams[aws]"] redis=["redis[hiredis]>=5.2.0,<6"] [tool.setuptools.packages.find] @@ -100,7 +104,10 @@ log_cli_format = "[%(levelname)s] %(name)s: %(message)s" markers = ["timeit"] [[tool.mypy.overrides]] -module = "confluent_kafka.*" +module = [ + "confluent_kafka.*", + "pyarrow.*", +] ignore_missing_imports = true [[tool.mypy.overrides]] diff --git a/quixstreams/sinks/community/file/__init__.py b/quixstreams/sinks/community/file/__init__.py index 26f92c14a..e6f903890 100644 --- a/quixstreams/sinks/community/file/__init__.py +++ b/quixstreams/sinks/community/file/__init__.py @@ -1,9 +1,14 @@ -from .formats import JSONFormat, ParquetFormat -from .sink import FileSink, InvalidFormatError +from .destinations import Destination, LocalDestination, S3Destination +from .formats import Format, InvalidFormatError, JSONFormat, ParquetFormat +from .sink import FileSink __all__ = [ - "FileSink", + "Destination", + "LocalDestination", + "S3Destination", + "Format", "InvalidFormatError", "JSONFormat", "ParquetFormat", + "FileSink", ] diff --git a/quixstreams/sinks/community/file/destinations/__init__.py b/quixstreams/sinks/community/file/destinations/__init__.py new file mode 100644 index 000000000..764d4ce06 --- /dev/null +++ b/quixstreams/sinks/community/file/destinations/__init__.py @@ -0,0 +1,5 @@ +from .base import Destination +from .local import LocalDestination +from .s3 import S3Destination + +__all__ = ("Destination", "LocalDestination", "S3Destination") diff --git a/quixstreams/sinks/community/file/destinations/base.py b/quixstreams/sinks/community/file/destinations/base.py new file mode 100644 index 000000000..fe269e4b3 --- /dev/null +++ b/quixstreams/sinks/community/file/destinations/base.py @@ -0,0 +1,96 @@ +import logging +import re +from abc import ABC, abstractmethod +from pathlib import Path + +from quixstreams.sinks.base import SinkBatch +from quixstreams.sinks.community.file.formats import Format + +__all__ = ("Destination",) + +logger = logging.getLogger(__name__) + +_UNSAFE_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9 ._]") + + +class Destination(ABC): + """Abstract base class for defining where and how data should be stored. + + Destinations handle the storage of serialized data, whether that's to local + disk, cloud storage, or other locations. They manage the physical writing of + data while maintaining a consistent directory/path structure based on topics + and partitions. + """ + + _base_directory: str = "" + _extension: str = "" + + def set_directory(self, directory: str) -> None: + """Configure the base directory for storing files. + + :param directory: The base directory path where files will be stored. + :raises ValueError: If the directory path contains invalid characters. + Only alphanumeric characters (a-zA-Z0-9), spaces, dots, and + underscores are allowed. + """ + if _UNSAFE_CHARACTERS_REGEX.search(directory): + raise ValueError( + f"Invalid characters in directory path: {directory}. " + f"Only alphanumeric characters (a-zA-Z0-9), spaces ( ), " + "dots (.), and underscores (_) are allowed." + ) + self._base_directory = directory + logger.info("Directory set to '%s'", directory) + + def set_extension(self, format: Format) -> None: + """Set the file extension based on the format. + + :param format: The Format instance that defines the file extension. + """ + self._extension = format.file_extension + logger.info("File extension set to '%s'", self._extension) + + @abstractmethod + def write(self, data: bytes, batch: SinkBatch) -> None: + """Write the serialized data to storage. + + :param data: The serialized data to write. + :param batch: The batch information containing topic, partition and offset + details. + """ + ... + + def _path(self, batch: SinkBatch) -> Path: + """Generate the full path where the batch data should be stored. + + :param batch: The batch information containing topic, partition and offset + details. + :return: A Path object representing the full file path. + """ + return self._directory(batch) / self._filename(batch) + + def _directory(self, batch: SinkBatch) -> Path: + """Generate the full directory path for a batch. + + Creates a directory structure using the base directory, sanitized topic + name, and partition number. + + :param batch: The batch information containing topic and partition details. + :return: A Path object representing the directory where files should be + stored. + """ + topic = _UNSAFE_CHARACTERS_REGEX.sub("_", batch.topic) + return Path(self._base_directory) / topic / str(batch.partition) + + def _filename(self, batch: SinkBatch) -> str: + """Generate the filename for a batch. + + Creates a filename using the batch's starting offset as a zero-padded + number to ensure correct ordering. The offset is padded to cover the max + length of a signed 64-bit integer (19 digits), e.g., '0000000000000123456'. + + :param batch: The batch information containing start_offset details. + :return: A string representing the filename with zero-padded offset and + extension. + """ + return f"{batch.start_offset:019d}{self._extension}" diff --git a/quixstreams/sinks/community/file/destinations/local.py b/quixstreams/sinks/community/file/destinations/local.py new file mode 100644 index 000000000..215b2c725 --- /dev/null +++ b/quixstreams/sinks/community/file/destinations/local.py @@ -0,0 +1,75 @@ +import logging +from pathlib import Path +from typing import Optional + +from quixstreams.sinks.base import SinkBatch +from quixstreams.sinks.community.file.formats import Format + +from .base import Destination + +__all__ = ("LocalDestination",) + +logger = logging.getLogger(__name__) + + +class LocalDestination(Destination): + """A destination that writes data to the local filesystem. + + Handles writing data to local files with support for both creating new files + and appending to existing ones. + """ + + def __init__(self, append: bool = False) -> None: + """Initialize the local destination. + + :param append: If True, append to existing files instead of creating new + ones. Defaults to False. + """ + self._append = append + self._mode = "ab" if append else "wb" + logger.debug("LocalDestination initialized with append=%s", append) + + def set_extension(self, format: Format) -> None: + """Set the file extension and validate append mode compatibility. + + :param format: The Format instance that defines the file extension. + :raises ValueError: If append mode is enabled but the format doesn't + support appending. + """ + if self._append and not format.supports_append: + raise ValueError(f"`{format}` format does not support appending.") + super().set_extension(format) + + def write(self, data: bytes, batch: SinkBatch) -> None: + """Write data to a local file. + + :param data: The serialized data to write. + :param batch: The batch information containing topic and partition details. + """ + path = self._path(batch) + logger.debug("Writing %d bytes to file: %s", len(data), path) + with open(path, self._mode) as f: + f.write(data) + + def _path(self, batch: SinkBatch) -> Path: + """Get the path for writing, creating directories if needed. + + :param batch: The batch information containing topic and partition details. + :return: Path where the data should be written. + """ + directory = self._directory(batch) + directory.mkdir(parents=True, exist_ok=True) + return self._existing_file(directory) or directory / self._filename(batch) + + def _existing_file(self, directory: Path) -> Optional[Path]: + """Find the most recent file in the directory for append mode. + + :param directory: Directory to search for existing files. + :return: Path to the most recent file if in append mode and files exist, + None otherwise. + """ + if self._append and (files := sorted(directory.iterdir())): + file = files[-1] + logger.debug("Found existing file for append: %s", file) + return file + return None diff --git a/quixstreams/sinks/community/file/destinations/s3.py b/quixstreams/sinks/community/file/destinations/s3.py new file mode 100644 index 000000000..5492ace05 --- /dev/null +++ b/quixstreams/sinks/community/file/destinations/s3.py @@ -0,0 +1,88 @@ +import logging +from os import getenv +from typing import Optional + +import boto3 + +from quixstreams.sinks import SinkBatch +from quixstreams.sinks.community.file.destinations.base import Destination + +logger = logging.getLogger(__name__) + + +class S3BucketNotFoundError(Exception): + """Raised when the specified S3 bucket does not exist.""" + + +class S3BucketAccessDeniedError(Exception): + """Raised when the specified S3 bucket access is denied.""" + + +class S3Destination(Destination): + """A destination that writes data to Amazon S3. + + Handles writing data to S3 buckets using the AWS SDK. Credentials can be + provided directly or via environment variables. + """ + + def __init__( + self, + bucket: str, + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"), + region_name: Optional[str] = getenv("AWS_REGION", getenv("AWS_DEFAULT_REGION")), + **kwargs, + ) -> None: + """Initialize the S3 destination. + + :param bucket: Name of the S3 bucket to write to. + :param aws_access_key_id: AWS access key ID. Defaults to AWS_ACCESS_KEY_ID + environment variable. + :param aws_secret_access_key: AWS secret access key. Defaults to + AWS_SECRET_ACCESS_KEY environment variable. + :param region_name: AWS region name. Defaults to AWS_REGION or + AWS_DEFAULT_REGION environment variable. + :param kwargs: Additional keyword arguments passed to boto3.client. + :raises S3BucketNotFoundError: If the specified bucket doesn't exist. + :raises S3BucketAccessDeniedError: If access to the bucket is denied. + """ + self._bucket = bucket + self._s3 = boto3.client( + "s3", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region_name, + **kwargs, + ) + self._validate_bucket() + logger.debug("S3Destination initialized with bucket=%s", bucket) + + def _validate_bucket(self) -> None: + """Validate that the bucket exists and is accessible. + + :raises S3BucketNotFoundError: If the specified bucket doesn't exist. + :raises S3BucketAccessDeniedError: If access to the bucket is denied. + """ + bucket = self._bucket + logger.debug("Validating access to bucket: %s", bucket) + try: + self._s3.head_bucket(Bucket=bucket) + except self._s3.exceptions.ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code == "403": + raise S3BucketAccessDeniedError(f"S3 bucket access denied: {bucket}") + elif error_code == "404": + raise S3BucketNotFoundError(f"S3 bucket not found: {bucket}") + raise + + def write(self, data: bytes, batch: SinkBatch) -> None: + """Write data to S3. + + :param data: The serialized data to write. + :param batch: The batch information containing topic and partition details. + """ + key = str(self._path(batch)) + logger.debug( + "Writing %d bytes to S3 bucket=%s, path=%s", len(data), self._bucket, key + ) + self._s3.put_object(Bucket=self._bucket, Key=key, Body=data) diff --git a/quixstreams/sinks/community/file/formats/__init__.py b/quixstreams/sinks/community/file/formats/__init__.py index 3e1e415fb..ed489f1dd 100644 --- a/quixstreams/sinks/community/file/formats/__init__.py +++ b/quixstreams/sinks/community/file/formats/__init__.py @@ -1,5 +1,42 @@ +from typing import Literal, Union + from .base import Format from .json import JSONFormat from .parquet import ParquetFormat -__all__ = ["Format", "JSONFormat", "ParquetFormat"] +__all__ = ("Format", "FormatName", "JSONFormat", "ParquetFormat", "resolve_format") + +FormatName = Literal["json", "parquet"] + +_FORMATS: dict[FormatName, Format] = { + "json": JSONFormat(), + "parquet": ParquetFormat(), +} + + +class InvalidFormatError(Exception): + """ + Raised when the format is specified incorrectly. + """ + + +def resolve_format(format: Union[FormatName, Format]) -> Format: + """ + Resolves the format into a `Format` instance. + + :param format: The format to resolve, either a format name ("json", + "parquet") or a `Format` instance. + :return: An instance of `Format` corresponding to the specified format. + :raises InvalidFormatError: If the format name is invalid. + """ + if isinstance(format, Format): + return format + elif format_obj := _FORMATS.get(format): + return format_obj + + allowed_formats = ", ".join(FormatName.__args__) # type: ignore[attr-defined] + raise InvalidFormatError( + f'Invalid format name "{format}". ' + f"Allowed values: {allowed_formats}, " + f"or an instance of a subclass of `Format`." + ) diff --git a/quixstreams/sinks/community/file/formats/json.py b/quixstreams/sinks/community/file/formats/json.py index f3a52be74..13b6cfcc2 100644 --- a/quixstreams/sinks/community/file/formats/json.py +++ b/quixstreams/sinks/community/file/formats/json.py @@ -47,7 +47,7 @@ def __init__( if self._compress: self._file_extension += ".gz" - self._writer_arguments = {"compact": True} + self._writer_arguments: dict[str, Any] = {"compact": True} # If `dumps` is provided, `compact` will be ignored if dumps is not None: diff --git a/quixstreams/sinks/community/file/sink.py b/quixstreams/sinks/community/file/sink.py index 12d6a0521..8f9a35529 100644 --- a/quixstreams/sinks/community/file/sink.py +++ b/quixstreams/sinks/community/file/sink.py @@ -1,163 +1,67 @@ -import logging -import re -from pathlib import Path -from typing import Literal, Union +from typing import Optional, Union -from quixstreams.sinks import BatchingSink, SinkBatch +from quixstreams.sinks import BatchingSink, SinkBackpressureError, SinkBatch -from .formats import Format, JSONFormat, ParquetFormat +from .destinations import Destination, LocalDestination +from .formats import Format, FormatName, resolve_format -__all__ = ["FileSink", "InvalidFormatError"] - -logger = logging.getLogger(__name__) - -FormatName = Literal["json", "parquet"] - -_FORMATS: dict[FormatName, Format] = { - "json": JSONFormat(), - "parquet": ParquetFormat(), -} - -_UNSAFE_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9 ._]") - - -class InvalidFormatError(Exception): - """ - Raised when the format is specified incorrectly. - """ +__all__ = ("FileSink",) class FileSink(BatchingSink): - """ - Writes batches of data to files on disk using specified formats. + """A sink that writes data batches to files using configurable formats and + destinations. - Messages are grouped by their topic and partition. Data from messages with - the same topic and partition are saved in the same directory. Each batch of - messages is serialized and saved to a file within that directory. Files are - named using the batch's starting offset to ensure uniqueness and order. + The sink groups messages by their topic and partition, ensuring data from the + same source is stored together. Each batch is serialized using the specified + format (e.g., JSON, Parquet) before being written to the configured + destination. - If `append` is set to `True`, the sink will attempt to append data to an - existing file rather than creating a new one. This is only supported for - formats that allow appending. + The destination determines the storage location and write behavior. By default, + it uses LocalDestination for writing to the local filesystem, but can be + configured to use other storage backends (e.g., cloud storage). """ def __init__( - self, output_dir: str, format: Union[FormatName, Format], append: bool = False + self, + directory: str = "", + format: Union[FormatName, Format] = "json", + destination: Optional[Destination] = None, ) -> None: - """ - Initializes the FileSink. - - :param output_dir: The directory where files will be written. - :param format: The data serialization format to use. This can be either a - format name ("json", "parquet") or an instance of a `Format` - subclass. - :param append: If `True`, data will be appended to existing files when possible. - Note that not all formats support appending. Defaults to `False`. - :raises ValueError: If `append` is `True` but the specified format does not - support appending. + """Initialize the FileSink with the specified configuration. + + :param directory: Base directory path for storing files. Defaults to + current directory. + :param format: Data serialization format, either as a string + ("json", "parquet") or a Format instance. + :param destination: Storage destination handler. Defaults to + LocalDestination if not specified. """ super().__init__() - self._format = self._resolve_format(format) - self._output_dir = output_dir # TODO: validate - if append and not self._format.supports_append: - raise ValueError(f"`{format}` format does not support appending.") - self._append = append - self._file_mode = "ab" if append else "wb" - logger.info(f"Files will be written to '{self._output_dir}'.") + self._format = resolve_format(format) + self._destination = destination or LocalDestination() + self._destination.set_directory(directory) + self._destination.set_extension(self._format) def write(self, batch: SinkBatch) -> None: - """ - Writes a batch of data to files on disk, grouping data by topic and partition. + """Write a batch of data using the configured format and destination. - If `append` is `True` and an existing file is found, data will be appended to - the last file. Otherwise, a new file is created based on the batch's starting - offset. + The method performs the following steps: + 1. Serializes the batch data using the configured format + 2. Writes the serialized data to the destination + 3. Handles any write failures by raising a backpressure error :param batch: The batch of data to write. + :raises SinkBackpressureError: If the write operation fails, indicating + that the sink needs backpressure with a 5-second retry delay. """ - file_path = self._get_file_path(batch) - - # Serialize messages using the specified format data = self._format.serialize(batch) - # Write data to the file - with open(file_path, self._file_mode) as f: - f.write(data) - - logger.info(f"Wrote {batch.size} records to file '{file_path}'.") - - def _get_file_path(self, batch: SinkBatch) -> Path: - """ - Generate and return the file path for storing data related to the given batch. - - The file path is constructed based on the output directory, with sanitized - topic and partition names to ensure valid file paths. If appending is enabled - and existing files are found in the directory, the latest existing file is - returned for appending data. Otherwise, a new file path is generated using - the batch's starting offset, padded to 19 digits, and appended with the - appropriate file extension. - - :param batch: The batch object containing attributes: - - topic (str): The name of the topic associated with the batch. - - partition (int): The partition number of the batch. - - start_offset (int): The starting offset of the batch. - :return: The file path where the batch data should be stored. - - Notes: - - Unsafe characters in `topic` and `partition` are replaced with underscores - `_` to ensure valid directory names. - - The directory structure is organized as: `output_dir/topic/partition/`. - - File names are based on the starting offset of the batch, padded to 19 - digits (e.g., `0000000000000123456`), to accommodate the maximum length - of a signed 64-bit integer. - - If appending is enabled (`self._append` is `True`) and existing files are - present, data will be appended to the latest existing file. - """ - directory = Path(self._output_dir) - directory /= _UNSAFE_CHARACTERS_REGEX.sub("_", batch.topic) - directory /= _UNSAFE_CHARACTERS_REGEX.sub("_", str(batch.partition)) - directory.mkdir(parents=True, exist_ok=True) - - if self._append and (existing_files := self._get_existing_files(directory)): - return existing_files[-1] - else: - # Generate filename based on the batch's starting offset - # Padded to cover max length of a signed 64-bit integer (19 digits) - # e.g., 0000000000000123456 - padded_offset = str(batch.start_offset).zfill(19) - return directory / (padded_offset + self._format.file_extension) - - def _get_existing_files(self, directory: Path) -> list[Path]: - """ - Retrieves a sorted list of existing files in the given directory that match - the current format's file extension. - - :param directory: The directory to search for existing files. - :return: A list of Path objects to existing files, sorted by name. - """ - return sorted( - path - for path in directory.iterdir() - if path.suffix == self._format.file_extension - ) - - def _resolve_format(self, format: Union[FormatName, Format]) -> Format: - """ - Resolves the format into a `Format` instance. - - :param format: The format to resolve, either a format name ("json", - "parquet") or a `Format` instance. - :return: An instance of `Format` corresponding to the specified format. - :raises InvalidFormatError: If the format name is invalid. - """ - if isinstance(format, Format): - return format - elif format_obj := _FORMATS.get(format): - return format_obj - - allowed_formats = ", ".join(FormatName.__args__) - raise InvalidFormatError( - f'Invalid format name "{format}". ' - f"Allowed values: {allowed_formats}, " - f"or an instance of a subclass of `Format`." - ) + try: + self._destination.write(data, batch) + except Exception as e: + raise SinkBackpressureError( + retry_after=5.0, + topic=batch.topic, + partition=batch.partition, + ) from e