From 58b2e5e99056c81453056c393eacaec9ca49e9e2 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Wed, 20 Nov 2024 16:19:22 -0500 Subject: [PATCH 1/5] transferred changes from diff branch --- docs/build/build.py | 1 + docs/connectors/sources/kinesis-source.md | 131 ++++++++++++ pyproject.toml | 2 +- .../sources/community/kinesis/__init__.py | 2 + .../sources/community/kinesis/consumer.py | 190 ++++++++++++++++++ .../sources/community/kinesis/kinesis.py | 162 +++++++++++++++ 6 files changed, 487 insertions(+), 1 deletion(-) create mode 100644 docs/connectors/sources/kinesis-source.md create mode 100644 quixstreams/sources/community/kinesis/__init__.py create mode 100644 quixstreams/sources/community/kinesis/consumer.py create mode 100644 quixstreams/sources/community/kinesis/kinesis.py diff --git a/docs/build/build.py b/docs/build/build.py index 08eddaaa8..2ea7d7299 100644 --- a/docs/build/build.py +++ b/docs/build/build.py @@ -143,6 +143,7 @@ "quixstreams.sources.community.file.compressions.gzip", "quixstreams.sources.community.file.formats.json", "quixstreams.sources.community.file.formats.parquet", + "quixstreams.sources.community.kinesis.kinesis", "quixstreams.sources.community.pubsub.pubsub", ] }, diff --git a/docs/connectors/sources/kinesis-source.md b/docs/connectors/sources/kinesis-source.md new file mode 100644 index 000000000..f584ab5d1 --- /dev/null +++ b/docs/connectors/sources/kinesis-source.md @@ -0,0 +1,131 @@ +# Amazon Kinesis Source + +!!! info + + This is a **Community** connector. Test it before using in production. + + To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. + +This source reads data from an Amazon Kinesis stream, dumping it to a +kafka topic using desired `StreamingDataFrame`-based transformations. + + +## How To Install + +To use the Kinesis sink, you need to install the required dependencies: + +```bash +pip install quixstreams[kinesis] +``` + +## How It Works + +`KinesisSource` reads from a Kinesis stream and produces its messages to a Kafka topic. + +Records are read in a streaming fashion and committed intermittently, offering +[at-least-once guarantees](#processingdelivery-guarantees). + +Each shard in the Kinesis stream is consumed in a round-robin fashion to ensure +reads are equally distributed. + +You can learn more details about the [expected kafka message format](#message-data-formatschema) below. + +## How To Use + + +To use Kinesis Source, hand `KinesisSource` to `app.dataframe()`. + +For more details around various settings, see [configuration](#configuration). + +```python +from quixstreams import Application +from quixstreams.sources.community.kinesis import KinesisSource + + +kinesis = KinesisSource( + stream_name="", + aws_access_key_id="", + aws_secret_access_key="", + aws_region="", + auto_offset_reset="earliest", # start from the beginning of the stream (vs end) +) + +app = Application( + broker_address="", + consumer_group="", +) + +sdf = app.dataframe(source=kinesis).print(metadata=True) +# YOUR LOGIC HERE! + +if __name__ == "__main__": + app.run() +``` + +## Configuration + +Here are some important configurations to be aware of (see [Kinesis Source API](../../api-reference/sources.md#kinesissource) for all parameters). + +### Required: + +- `stream_name`: the name of the desired stream to consume. +- `aws_region`: AWS region (ex: us-east-1). + **Note**: can alternatively set the `AWS_REGION` environment variable. +- `aws_access_key_id`: AWS User key ID. + **Note**: can alternatively set the `AWS_ACCESS_KEY_ID` environment variable. +- `aws_secret_access_key`: AWS secret key. + **Note**: can alternatively set the `AWS_SECRET_ACCESS_KEY` environment variable. + + +### Optional: + +- `aws_endpoint_url`: Only fill when testing against a locally-hosted Kinesis instance. + **Note**: can leave other `aws` settings blank when doing so. + **Note**: can alternatively set the `AWS_ENDPOINT_URL_KINESIS` environment variable. +- `commit_interval`: How often to commit stream reads. + **Default**: `5.0s` + +## Message Data Format/Schema + +This is the default format of messages handled by `Application`: + +- Message `key` will be the Kinesis record `PartitionKey` as a `string`. + +- Message `value` will be the Kinesis record `Data` in `bytes` (transform accordingly + with your `SDF` as desired). + +- Message `timestamp` will be the Kinesis record `ArrivalTimestamp` (ms). + + +## Processing/Delivery Guarantees + +The Kinesis Source offers "at-least-once" guarantees: offsets are managed using +an internal Quix Streams changelog topic. + +As such, in rare circumstances where topic flushing ends up failing, messages may be +processed (produced) more than once. + +## Topic + +The default topic name the Application dumps to is `source-kinesis_`. + + +## Testing Locally + +Rather than connect to AWS, you can alternatively test your application using +a local Kinesis host via docker: + +1. Set `aws_endpoint_url` for `KinesisSource` _OR_ the `AWS_ENDPOINT_URL_KINESIS` + environment variable to: + + `localhost:8085` + +2. execute in terminal: + + `docker run --rm -d --name kinesis \ + -p 4566:4566 \ + -e SERVICES=kinesis \ + -e EDGE_PORT=4566 \ + -e DEBUG=1 \ + localstack/localstack:latest +` diff --git a/pyproject.toml b/pyproject.toml index 1cb40618c..043706c34 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ iceberg_aws = ["pyiceberg[pyarrow,glue]>=0.7,<0.8"] bigquery = ["google-cloud-bigquery>=3.26.0,<3.27"] pubsub = ["google-cloud-pubsub>=2.23.1,<3"] postgresql = ["psycopg2-binary>=2.9.9,<3"] -kinesis = ["boto3>=1.35.60,<2.0"] +kinesis = ["boto3>=1.35.65,<2.0", "boto3-stubs[kinesis]>=1.35.65,<2.0"] [tool.setuptools.packages.find] include = ["quixstreams*"] diff --git a/quixstreams/sources/community/kinesis/__init__.py b/quixstreams/sources/community/kinesis/__init__.py new file mode 100644 index 000000000..15f8bd32c --- /dev/null +++ b/quixstreams/sources/community/kinesis/__init__.py @@ -0,0 +1,2 @@ +# ruff: noqa: F403 +from .kinesis import * diff --git a/quixstreams/sources/community/kinesis/consumer.py b/quixstreams/sources/community/kinesis/consumer.py new file mode 100644 index 000000000..3dce7f330 --- /dev/null +++ b/quixstreams/sources/community/kinesis/consumer.py @@ -0,0 +1,190 @@ +import logging +import time +from typing import Callable, Literal, Optional, Protocol + +from typing_extensions import Self + +try: + import boto3 + from botocore.exceptions import ClientError + from mypy_boto3_kinesis import KinesisClient + from mypy_boto3_kinesis.type_defs import GetShardIteratorOutputTypeDef, ShardTypeDef + from mypy_boto3_kinesis.type_defs import RecordTypeDef as KinesisRecord +except ImportError as exc: + raise ImportError( + f"Package {exc.name} is missing: " + 'run "pip install quixstreams[kinesis]" to use KinesisSource' + ) from exc + + +logger = logging.getLogger(__name__) + +_OFFSET_RESET_DICT = {"earliest": "TRIM_HORIZON", "latest": "LATEST"} +AutoOffsetResetType = Literal["earliest", "latest"] + + +class KinesisCheckpointer(Protocol): + @property + def last_committed_at(self) -> float: + yield ... + + def get(self, key: str) -> Optional[str]: ... + + def set(self, key: str, value: str): ... + + def commit(self, force: bool = False): ... + + +class Authentication: + def __init__( + self, + aws_region: Optional[str] = None, + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None, + aws_endpoint_url: Optional[str] = None, + ): + """ + :param aws_region: The AWS region. + NOTE: can alternatively set the AWS_REGION environment variable + :param aws_access_key_id: the AWS access key ID. + NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable + :param aws_secret_access_key: the AWS secret access key. + NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable + :param aws_endpoint_url: the endpoint URL to use; only required for connecting + to a locally hosted Kinesis. + NOTE: can alternatively set the AWS_ENDPOINT_URL_KINESIS environment variable + """ + self.auth = { + "endpoint_url": aws_endpoint_url, + "region_name": aws_region, + "aws_access_key_id": aws_access_key_id, + "aws_secret_access_key": aws_secret_access_key, + } + + +class KinesisConsumer: + """ + Consume all shards for a given Kinesis stream in a batched, round-robin fashion. + Also handles checkpointing of said stream (requires a `KinesisCheckpointer`). + """ + + def __init__( + self, + stream_name: str, + auth: Authentication, + message_processor: Callable[[KinesisRecord], None], + checkpointer: KinesisCheckpointer, + auto_offset_reset: AutoOffsetResetType = "latest", + max_records_per_shard: int = 1000, + backoff_secs: float = 5.0, + ): + self._stream = stream_name + self._auth = auth + self._message_processor = message_processor + self._checkpointer = checkpointer + self._shard_iterators: dict[str, str] = {} + self._shard_backoff: dict[str, float] = {} + self._max_records_per_shard = max_records_per_shard + self._backoff_secs = backoff_secs + self._auto_offset_reset = _OFFSET_RESET_DICT[auto_offset_reset] + self._client: Optional[KinesisClient] = None + + def __enter__(self) -> Self: + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def _init_client(self): + self._client = boto3.client("kinesis", **self._auth.auth) + + def _process_record(self, shard_id: str, record: KinesisRecord): + self._message_processor(record) + self._checkpointer.set(shard_id, record["SequenceNumber"]) + + def _list_shards(self) -> list[ShardTypeDef]: + """List all shards in the stream.""" + shards: list[ShardTypeDef] = [] + response = self._client.list_shards(StreamName=self._stream) + shards.extend(response["Shards"]) + while "NextToken" in response: # handle pagination + response = self._client.list_shards(NextToken=response["NextToken"]) + shards.extend(response["Shards"]) + return shards + + def _get_shard_iterator(self, shard_id: str): + if sequence_number := self._checkpointer.get(shard_id): + additional_kwargs = { + "ShardIteratorType": "AFTER_SEQUENCE_NUMBER", + "StartingSequenceNumber": sequence_number, + } + else: + additional_kwargs = { + "ShardIteratorType": self._auto_offset_reset, + } + response: GetShardIteratorOutputTypeDef = self._client.get_shard_iterator( + StreamName=self._stream, ShardId=shard_id, **additional_kwargs + ) + return response["ShardIterator"] + + def _init_shards(self): + if not (shards := [shard["ShardId"] for shard in self._list_shards()]): + raise ValueError(f"No shards for stream {self._stream}") + self._shard_iterators = { + shard: self._get_shard_iterator(shard) for shard in shards + } + + def _poll_and_process_shard(self, shard_id): + """Read records from a shard.""" + if ( + backoff_time := self._shard_backoff.get(shard_id) + ) and time.monotonic() < backoff_time: + return + try: + response = self._client.get_records( + ShardIterator=self._shard_iterators[shard_id], + Limit=self._max_records_per_shard, + ) + + for record in response.get("Records", []): + self._process_record(shard_id, record) + + # Update the shard iterator for the next batch + self._shard_iterators[shard_id] = response["NextShardIterator"] + self._shard_backoff[shard_id] = 0 + + except ClientError as e: + error_code = e.response["Error"]["Code"] + logger.error(f"Error reading from shard {shard_id}: {error_code}") + if error_code == "ProvisionedThroughputExceededException": + self._shard_backoff[shard_id] = time.monotonic() + self._backoff_secs + elif error_code == "ExpiredIteratorException": + logger.error(f"Shard iterator expired for shard {shard_id}.") + raise + else: + logger.error(f"Unrecoverable error: {e}") + raise + + def start(self): + self._init_client() + self._init_shards() + + def poll_and_process_shards(self): + for shard in self._shard_iterators: + self._poll_and_process_shard(shard) + + def commit(self, force: bool = False): + self._checkpointer.commit(force=force) + + def run(self): + """For running _without_ using Quix Streams Source framework.""" + try: + self.start() + while True: + self.poll_and_process_shards() + self.commit() + except Exception as e: + logger.debug(f"KinesisConsumer encountered an error: {e}") + finally: + logger.debug("Stopping KinesisConsumer...") diff --git a/quixstreams/sources/community/kinesis/kinesis.py b/quixstreams/sources/community/kinesis/kinesis.py new file mode 100644 index 000000000..1d05f8705 --- /dev/null +++ b/quixstreams/sources/community/kinesis/kinesis.py @@ -0,0 +1,162 @@ +import time +from typing import Optional + +from quixstreams.models.topics import Topic +from quixstreams.sources.base import StatefulSource + +from .consumer import ( + Authentication, + AutoOffsetResetType, + KinesisCheckpointer, + KinesisConsumer, + KinesisRecord, +) + +__all__ = ("KinesisSource",) + + +class KinesisSource(StatefulSource): + """ + NOTE: Requires `pip install quixstreams[kinesis]` to work. + + This source reads data from an Amazon Kinesis stream, dumping it to a + kafka topic using desired `StreamingDataFrame`-based transformations. + + Provides "at-least-once" guarantees. + + The incoming message value will be in bytes, so transform in your SDF accordingly. + + Example Usage: + + ```python + from quixstreams import Application + from quixstreams.sources.community.kinesis import KinesisSource + + + kinesis = KinesisSource( + stream_name="", + aws_access_key_id="", + aws_secret_access_key="", + aws_region="", + auto_offset_reset="earliest", # start from the beginning of the stream (vs end) + ) + + app = Application( + broker_address="", + consumer_group="", + ) + + sdf = app.dataframe(source=kinesis).print(metadata=True) + # YOUR LOGIC HERE! + + if __name__ == "__main__": + app.run() + ``` + """ + + def __init__( + self, + stream_name: str, + aws_region: Optional[str] = None, + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None, + aws_endpoint_url: Optional[str] = None, + shutdown_timeout: float = 10, + auto_offset_reset: AutoOffsetResetType = "latest", + max_records_per_shard: int = 1000, + commit_interval: float = 5.0, + retry_backoff_secs: float = 5.0, + ): + """ + :param stream_name: name of the desired Kinesis stream to consume. + :param aws_region: The AWS region. + NOTE: can alternatively set the AWS_REGION environment variable + :param aws_access_key_id: the AWS access key ID. + NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable + :param aws_secret_access_key: the AWS secret access key. + NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable + :param aws_endpoint_url: the endpoint URL to use; only required for connecting + to a locally hosted Kinesis. + NOTE: can alternatively set the AWS_ENDPOINT_URL_KINESIS environment variable + :param shutdown_timeout: + :param auto_offset_reset: When no previous offset has been recorded, whether to + start from the beginning ("earliest") or end ("latest") of the stream. + :param max_records_per_shard: During round-robin consumption, how many records + to consume per shard (partition) per consume (NOT per-commit). + :param commit_interval: the time between commits + :param retry_backoff_secs: how long to back off from doing HTTP calls for a + shard when Kinesis consumer encounters handled/expected errors. + """ + self._stream_name = stream_name + self._auth = Authentication( + aws_endpoint_url=aws_endpoint_url, + aws_region=aws_region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + self._auto_offset_reset = auto_offset_reset + self._max_records_per_shard = max_records_per_shard + self._retry_backoff_secs = retry_backoff_secs + self._checkpointer = SourceCheckpointer(self, commit_interval) + super().__init__( + name=f"kinesis_{self._stream_name}", shutdown_timeout=shutdown_timeout + ) + + def default_topic(self) -> Topic: + return Topic( + name=self.name, + key_deserializer="str", + value_deserializer="bytes", + key_serializer="str", + value_serializer="bytes", + ) + + def _handle_kinesis_message(self, message: KinesisRecord): + serialized_msg = self._producer_topic.serialize( + key=message["PartitionKey"], + value=message["Data"], + timestamp_ms=int(message["ApproximateArrivalTimestamp"].timestamp() * 1000), + ) + self.produce( + key=serialized_msg.key, + value=serialized_msg.value, + timestamp=serialized_msg.timestamp, + ) + + def run(self): + with KinesisConsumer( + stream_name=self._stream_name, + auth=self._auth, + message_processor=self._handle_kinesis_message, + auto_offset_reset=self._auto_offset_reset, + checkpointer=self._checkpointer, + max_records_per_shard=self._max_records_per_shard, + backoff_secs=self._retry_backoff_secs, + ) as consumer: + while self._running: + consumer.poll_and_process_shards() + consumer.commit() + + +class SourceCheckpointer(KinesisCheckpointer): + def __init__(self, stateful_source: StatefulSource, commit_interval: float = 5.0): + self._source = stateful_source + self._last_committed_at = time.monotonic() + self._commit_interval = commit_interval + + @property + def last_committed_at(self) -> float: + return self._last_committed_at + + def get(self, key: str) -> Optional[str]: + return self._source.state.get(key) + + def set(self, key: str, value: str): + self._source.state.set(key, value) + + def commit(self, force: bool = False): + if ( + (now := time.monotonic()) - self._last_committed_at > self._commit_interval + ) or force: + self._source.flush() + self._last_committed_at = now From e52fb80a774f64a4e63a63f2b86fd98d94644de1 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Wed, 20 Nov 2024 16:45:07 -0500 Subject: [PATCH 2/5] update docs around testing locally --- docs/connectors/sources/kinesis-source.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/connectors/sources/kinesis-source.md b/docs/connectors/sources/kinesis-source.md index f584ab5d1..91ae9a1cc 100644 --- a/docs/connectors/sources/kinesis-source.md +++ b/docs/connectors/sources/kinesis-source.md @@ -120,7 +120,10 @@ a local Kinesis host via docker: `localhost:8085` -2. execute in terminal: +2. Set all other `aws_` parameters for `KinesisSource` to _any_ string. +They will not be used, but they must still be populated! + +3. execute in terminal: `docker run --rm -d --name kinesis \ -p 4566:4566 \ From 649f418ef596428689697fb3e573eea0c8ef2117 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Thu, 21 Nov 2024 16:57:36 +0100 Subject: [PATCH 3/5] Simplify the Kinesis Source implementation (#650) --- .../sources/community/kinesis/consumer.py | 159 +++++++++--------- .../sources/community/kinesis/kinesis.py | 48 ++---- 2 files changed, 94 insertions(+), 113 deletions(-) diff --git a/quixstreams/sources/community/kinesis/consumer.py b/quixstreams/sources/community/kinesis/consumer.py index 3dce7f330..ec1cb3458 100644 --- a/quixstreams/sources/community/kinesis/consumer.py +++ b/quixstreams/sources/community/kinesis/consumer.py @@ -1,9 +1,11 @@ import logging import time -from typing import Callable, Literal, Optional, Protocol +from typing import Callable, Generator, Literal, Optional, TypedDict from typing_extensions import Self +from quixstreams.sources import StatefulSource + try: import boto3 from botocore.exceptions import ClientError @@ -23,43 +25,39 @@ AutoOffsetResetType = Literal["earliest", "latest"] -class KinesisCheckpointer(Protocol): +class KinesisStreamShardsNotFound(Exception): + """Raised when the Kinesis Stream has no shards""" + + +class KinesisCheckpointer: + def __init__(self, stateful_source: StatefulSource, commit_interval: float = 5.0): + self._source = stateful_source + self._last_committed_at = time.monotonic() + self._commit_interval = commit_interval + @property def last_committed_at(self) -> float: - yield ... + return self._last_committed_at - def get(self, key: str) -> Optional[str]: ... + def get(self, shard_id: str) -> Optional[str]: + return self._source.state.get(shard_id) - def set(self, key: str, value: str): ... + def set(self, shard_id: str, sequence_number: str): + self._source.state.set(shard_id, sequence_number) - def commit(self, force: bool = False): ... + def commit(self, force: bool = False): + if ( + (now := time.monotonic()) - self._last_committed_at > self._commit_interval + ) or force: + self._source.flush() + self._last_committed_at = now -class Authentication: - def __init__( - self, - aws_region: Optional[str] = None, - aws_access_key_id: Optional[str] = None, - aws_secret_access_key: Optional[str] = None, - aws_endpoint_url: Optional[str] = None, - ): - """ - :param aws_region: The AWS region. - NOTE: can alternatively set the AWS_REGION environment variable - :param aws_access_key_id: the AWS access key ID. - NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable - :param aws_secret_access_key: the AWS secret access key. - NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable - :param aws_endpoint_url: the endpoint URL to use; only required for connecting - to a locally hosted Kinesis. - NOTE: can alternatively set the AWS_ENDPOINT_URL_KINESIS environment variable - """ - self.auth = { - "endpoint_url": aws_endpoint_url, - "region_name": aws_region, - "aws_access_key_id": aws_access_key_id, - "aws_secret_access_key": aws_secret_access_key, - } +class AWSCredentials(TypedDict): + endpoint_url: Optional[str] + region_name: Optional[str] + aws_access_key_id: Optional[str] + aws_secret_access_key: Optional[str] class KinesisConsumer: @@ -71,7 +69,7 @@ class KinesisConsumer: def __init__( self, stream_name: str, - auth: Authentication, + credentials: AWSCredentials, message_processor: Callable[[KinesisRecord], None], checkpointer: KinesisCheckpointer, auto_offset_reset: AutoOffsetResetType = "latest", @@ -79,7 +77,7 @@ def __init__( backoff_secs: float = 5.0, ): self._stream = stream_name - self._auth = auth + self._credentials = credentials self._message_processor = message_processor self._checkpointer = checkpointer self._shard_iterators: dict[str, str] = {} @@ -89,19 +87,36 @@ def __init__( self._auto_offset_reset = _OFFSET_RESET_DICT[auto_offset_reset] self._client: Optional[KinesisClient] = None + def process_shards(self): + """ + Process records from the Stream shards one by one and checkpoint their + sequence numbers. + """ + # Iterate over shards one by one + for shard_id in self._shard_iterators: + # Poll records from each shard + for record in self._poll_records(shard_id=shard_id): + # Process the record + self._message_processor(record) + # Save the sequence number of the processed record + self._checkpointer.set(shard_id, record["SequenceNumber"]) + + def commit(self, force: bool = False): + """ + Commit the checkpoint and save the progress of the + """ + self._checkpointer.commit(force=force) + def __enter__(self) -> Self: - self.start() + self._init_client() + self._init_shards() return self def __exit__(self, exc_type, exc_val, exc_tb): pass def _init_client(self): - self._client = boto3.client("kinesis", **self._auth.auth) - - def _process_record(self, shard_id: str, record: KinesisRecord): - self._message_processor(record) - self._checkpointer.set(shard_id, record["SequenceNumber"]) + self._client = boto3.client("kinesis", **self._credentials) def _list_shards(self) -> list[ShardTypeDef]: """List all shards in the stream.""" @@ -113,51 +128,54 @@ def _list_shards(self) -> list[ShardTypeDef]: shards.extend(response["Shards"]) return shards - def _get_shard_iterator(self, shard_id: str): + def _get_shard_iterator(self, shard_id: str) -> str: if sequence_number := self._checkpointer.get(shard_id): - additional_kwargs = { + kwargs = { "ShardIteratorType": "AFTER_SEQUENCE_NUMBER", "StartingSequenceNumber": sequence_number, } else: - additional_kwargs = { + kwargs = { "ShardIteratorType": self._auto_offset_reset, } response: GetShardIteratorOutputTypeDef = self._client.get_shard_iterator( - StreamName=self._stream, ShardId=shard_id, **additional_kwargs + StreamName=self._stream, ShardId=shard_id, **kwargs ) return response["ShardIterator"] def _init_shards(self): if not (shards := [shard["ShardId"] for shard in self._list_shards()]): - raise ValueError(f"No shards for stream {self._stream}") + raise KinesisStreamShardsNotFound( + f'Shards not found for stream "{self._stream}"' + ) self._shard_iterators = { shard: self._get_shard_iterator(shard) for shard in shards } - def _poll_and_process_shard(self, shard_id): - """Read records from a shard.""" + def _poll_records(self, shard_id: str) -> Generator[KinesisRecord, None, None]: + """ + Poll records from the Kinesis Stream shard. + + If the shared is backed off, no records will be returned. + + :param shard_id: shard id. + """ if ( - backoff_time := self._shard_backoff.get(shard_id) - ) and time.monotonic() < backoff_time: + backoff_time := self._shard_backoff.get(shard_id, 0.0) + ) and backoff_time > time.monotonic(): + # The shard is backed off, exit early return + try: response = self._client.get_records( ShardIterator=self._shard_iterators[shard_id], Limit=self._max_records_per_shard, ) - - for record in response.get("Records", []): - self._process_record(shard_id, record) - - # Update the shard iterator for the next batch - self._shard_iterators[shard_id] = response["NextShardIterator"] - self._shard_backoff[shard_id] = 0 - except ClientError as e: error_code = e.response["Error"]["Code"] logger.error(f"Error reading from shard {shard_id}: {error_code}") if error_code == "ProvisionedThroughputExceededException": + # The shard is backed off by Kinesis, update the backoff deadline self._shard_backoff[shard_id] = time.monotonic() + self._backoff_secs elif error_code == "ExpiredIteratorException": logger.error(f"Shard iterator expired for shard {shard_id}.") @@ -165,26 +183,11 @@ def _poll_and_process_shard(self, shard_id): else: logger.error(f"Unrecoverable error: {e}") raise + else: + # Yield records for the shard + for record in response.get("Records", []): + yield record - def start(self): - self._init_client() - self._init_shards() - - def poll_and_process_shards(self): - for shard in self._shard_iterators: - self._poll_and_process_shard(shard) - - def commit(self, force: bool = False): - self._checkpointer.commit(force=force) - - def run(self): - """For running _without_ using Quix Streams Source framework.""" - try: - self.start() - while True: - self.poll_and_process_shards() - self.commit() - except Exception as e: - logger.debug(f"KinesisConsumer encountered an error: {e}") - finally: - logger.debug("Stopping KinesisConsumer...") + # Update the shard iterator for the next batch + self._shard_iterators[shard_id] = response["NextShardIterator"] + self._shard_backoff[shard_id] = 0 diff --git a/quixstreams/sources/community/kinesis/kinesis.py b/quixstreams/sources/community/kinesis/kinesis.py index 1d05f8705..9ef3c8c85 100644 --- a/quixstreams/sources/community/kinesis/kinesis.py +++ b/quixstreams/sources/community/kinesis/kinesis.py @@ -1,12 +1,11 @@ -import time from typing import Optional from quixstreams.models.topics import Topic from quixstreams.sources.base import StatefulSource from .consumer import ( - Authentication, AutoOffsetResetType, + AWSCredentials, KinesisCheckpointer, KinesisConsumer, KinesisRecord, @@ -88,16 +87,19 @@ def __init__( shard when Kinesis consumer encounters handled/expected errors. """ self._stream_name = stream_name - self._auth = Authentication( - aws_endpoint_url=aws_endpoint_url, - aws_region=aws_region, - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - ) + self._credentials: AWSCredentials = { + "endpoint_url": aws_endpoint_url, + "region_name": aws_region, + "aws_access_key_id": aws_access_key_id, + "aws_secret_access_key": aws_secret_access_key, + } + self._auto_offset_reset = auto_offset_reset self._max_records_per_shard = max_records_per_shard self._retry_backoff_secs = retry_backoff_secs - self._checkpointer = SourceCheckpointer(self, commit_interval) + self._checkpointer = KinesisCheckpointer( + stateful_source=self, commit_interval=commit_interval + ) super().__init__( name=f"kinesis_{self._stream_name}", shutdown_timeout=shutdown_timeout ) @@ -126,7 +128,7 @@ def _handle_kinesis_message(self, message: KinesisRecord): def run(self): with KinesisConsumer( stream_name=self._stream_name, - auth=self._auth, + credentials=self._credentials, message_processor=self._handle_kinesis_message, auto_offset_reset=self._auto_offset_reset, checkpointer=self._checkpointer, @@ -134,29 +136,5 @@ def run(self): backoff_secs=self._retry_backoff_secs, ) as consumer: while self._running: - consumer.poll_and_process_shards() + consumer.process_shards() consumer.commit() - - -class SourceCheckpointer(KinesisCheckpointer): - def __init__(self, stateful_source: StatefulSource, commit_interval: float = 5.0): - self._source = stateful_source - self._last_committed_at = time.monotonic() - self._commit_interval = commit_interval - - @property - def last_committed_at(self) -> float: - return self._last_committed_at - - def get(self, key: str) -> Optional[str]: - return self._source.state.get(key) - - def set(self, key: str, value: str): - self._source.state.set(key, value) - - def commit(self, force: bool = False): - if ( - (now := time.monotonic()) - self._last_committed_at > self._commit_interval - ) or force: - self._source.flush() - self._last_committed_at = now From 21569ec1f48650937be6c199821859abc34f91c2 Mon Sep 17 00:00:00 2001 From: Tim Sawicki <136370015+tim-quix@users.noreply.github.com> Date: Thu, 21 Nov 2024 11:04:34 -0500 Subject: [PATCH 4/5] Update docs/connectors/sources/kinesis-source.md Co-authored-by: Remy Gwaramadze --- docs/connectors/sources/kinesis-source.md | 31 +++++++++++------------ 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/docs/connectors/sources/kinesis-source.md b/docs/connectors/sources/kinesis-source.md index 91ae9a1cc..627e0246b 100644 --- a/docs/connectors/sources/kinesis-source.md +++ b/docs/connectors/sources/kinesis-source.md @@ -112,23 +112,22 @@ The default topic name the Application dumps to is `source-kinesis_ ## Testing Locally -Rather than connect to AWS, you can alternatively test your application using -a local Kinesis host via docker: +Rather than connect to AWS, you can alternatively test your application using a local Kinesis host via Docker: -1. Set `aws_endpoint_url` for `KinesisSource` _OR_ the `AWS_ENDPOINT_URL_KINESIS` - environment variable to: - - `localhost:8085` +1. Execute in terminal: -2. Set all other `aws_` parameters for `KinesisSource` to _any_ string. -They will not be used, but they must still be populated! + ```bash + docker run --rm -d --name kinesis \ + -p 4566:4566 \ + -e SERVICES=kinesis \ + -e EDGE_PORT=4566 \ + -e DEBUG=1 \ + localstack/localstack:latest + ``` -3. execute in terminal: +2. Set `aws_endpoint_url` for `KinesisSource` _OR_ the `AWS_ENDPOINT_URL_KINESIS` + environment variable to `http://localhost:4566` + +3. Set all other `aws_` parameters for `KinesisSource` to _any_ string. +They will not be used, but they must still be populated! - `docker run --rm -d --name kinesis \ - -p 4566:4566 \ - -e SERVICES=kinesis \ - -e EDGE_PORT=4566 \ - -e DEBUG=1 \ - localstack/localstack:latest -` From 802b08504be22d55231260df7e30e4fe9a06aeeb Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Fri, 22 Nov 2024 12:38:36 -0500 Subject: [PATCH 5/5] final tweaks before merging --- conda/meta.yaml | 3 ++- pyproject.toml | 3 ++- quixstreams/sources/community/kinesis/kinesis.py | 9 +++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/conda/meta.yaml b/conda/meta.yaml index 920347849..71426793e 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -27,7 +27,8 @@ requirements: - google-cloud-bigquery >=3.26.0,<3.27 - google-cloud-pubsub >=2.23.1,<3 - psycopg2-binary >=2.9.9,<3 - - boto3 >=1.35.60,<2.0 + - boto3 >=1.35.65,<2.0 + - boto3-stubs >=1.35.65,<2.0 test: imports: diff --git a/pyproject.toml b/pyproject.toml index 043706c34..57247787f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,8 @@ all = [ "google-cloud-bigquery>=3.26.0,<3.27", "google-cloud-pubsub>=2.23.1,<3", "psycopg2-binary>=2.9.9,<3", - "boto3>=1.35.60,<2.0", + "boto3>=1.35.65,<2.0", + "boto3-stubs>=1.35.65,<2.0", ] avro = ["fastavro>=1.8,<2.0"] diff --git a/quixstreams/sources/community/kinesis/kinesis.py b/quixstreams/sources/community/kinesis/kinesis.py index 9ef3c8c85..92518c0fd 100644 --- a/quixstreams/sources/community/kinesis/kinesis.py +++ b/quixstreams/sources/community/kinesis/kinesis.py @@ -1,3 +1,4 @@ +from os import getenv from typing import Optional from quixstreams.models.topics import Topic @@ -56,10 +57,10 @@ class KinesisSource(StatefulSource): def __init__( self, stream_name: str, - aws_region: Optional[str] = None, - aws_access_key_id: Optional[str] = None, - aws_secret_access_key: Optional[str] = None, - aws_endpoint_url: Optional[str] = None, + aws_region: Optional[str] = getenv("AWS_REGION"), + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"), + aws_endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_KINESIS"), shutdown_timeout: float = 10, auto_offset_reset: AutoOffsetResetType = "latest", max_records_per_shard: int = 1000,