Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source: Amazon Kinesis #646

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ requirements:
- google-cloud-bigquery >=3.26.0,<3.27
- google-cloud-pubsub >=2.23.1,<3
- psycopg2-binary >=2.9.9,<3
- boto3 >=1.35.60,<2.0
- boto3 >=1.35.65,<2.0
- boto3-stubs >=1.35.65,<2.0

test:
imports:
Expand Down
1 change: 1 addition & 0 deletions docs/build/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
"quixstreams.sources.community.file.compressions.gzip",
"quixstreams.sources.community.file.formats.json",
"quixstreams.sources.community.file.formats.parquet",
"quixstreams.sources.community.kinesis.kinesis",
"quixstreams.sources.community.pubsub.pubsub",
]
},
Expand Down
133 changes: 133 additions & 0 deletions docs/connectors/sources/kinesis-source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Amazon Kinesis Source

!!! info

This is a **Community** connector. Test it before using in production.

To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.

This source reads data from an Amazon Kinesis stream, dumping it to a
kafka topic using desired `StreamingDataFrame`-based transformations.


## How To Install

To use the Kinesis sink, you need to install the required dependencies:

```bash
pip install quixstreams[kinesis]
```

## How It Works

`KinesisSource` reads from a Kinesis stream and produces its messages to a Kafka topic.

Records are read in a streaming fashion and committed intermittently, offering
[at-least-once guarantees](#processingdelivery-guarantees).

Each shard in the Kinesis stream is consumed in a round-robin fashion to ensure
reads are equally distributed.

You can learn more details about the [expected kafka message format](#message-data-formatschema) below.

## How To Use


To use Kinesis Source, hand `KinesisSource` to `app.dataframe()`.

For more details around various settings, see [configuration](#configuration).

```python
from quixstreams import Application
from quixstreams.sources.community.kinesis import KinesisSource


kinesis = KinesisSource(
stream_name="<YOUR STREAM>",
aws_access_key_id="<YOUR KEY ID>",
aws_secret_access_key="<YOUR SECRET KEY>",
aws_region="<YOUR REGION>",
auto_offset_reset="earliest", # start from the beginning of the stream (vs end)
)

app = Application(
broker_address="<YOUR BROKER INFO>",
consumer_group="<YOUR GROUP>",
)

sdf = app.dataframe(source=kinesis).print(metadata=True)
# YOUR LOGIC HERE!

if __name__ == "__main__":
app.run()
```

## Configuration

Here are some important configurations to be aware of (see [Kinesis Source API](../../api-reference/sources.md#kinesissource) for all parameters).

### Required:

- `stream_name`: the name of the desired stream to consume.
- `aws_region`: AWS region (ex: us-east-1).
**Note**: can alternatively set the `AWS_REGION` environment variable.
- `aws_access_key_id`: AWS User key ID.
**Note**: can alternatively set the `AWS_ACCESS_KEY_ID` environment variable.
- `aws_secret_access_key`: AWS secret key.
**Note**: can alternatively set the `AWS_SECRET_ACCESS_KEY` environment variable.


### Optional:

- `aws_endpoint_url`: Only fill when testing against a locally-hosted Kinesis instance.
**Note**: can leave other `aws` settings blank when doing so.
**Note**: can alternatively set the `AWS_ENDPOINT_URL_KINESIS` environment variable.
- `commit_interval`: How often to commit stream reads.
**Default**: `5.0s`

## Message Data Format/Schema

This is the default format of messages handled by `Application`:

- Message `key` will be the Kinesis record `PartitionKey` as a `string`.

- Message `value` will be the Kinesis record `Data` in `bytes` (transform accordingly
with your `SDF` as desired).

- Message `timestamp` will be the Kinesis record `ArrivalTimestamp` (ms).


## Processing/Delivery Guarantees

The Kinesis Source offers "at-least-once" guarantees: offsets are managed using
an internal Quix Streams changelog topic.

As such, in rare circumstances where topic flushing ends up failing, messages may be
processed (produced) more than once.

## Topic

The default topic name the Application dumps to is `source-kinesis_<stream name>`.


## Testing Locally

Rather than connect to AWS, you can alternatively test your application using a local Kinesis host via Docker:

1. Execute in terminal:

```bash
docker run --rm -d --name kinesis \
-p 4566:4566 \
-e SERVICES=kinesis \
-e EDGE_PORT=4566 \
-e DEBUG=1 \
localstack/localstack:latest
```

2. Set `aws_endpoint_url` for `KinesisSource` _OR_ the `AWS_ENDPOINT_URL_KINESIS`
environment variable to `http://localhost:4566`

3. Set all other `aws_` parameters for `KinesisSource` to _any_ string.
They will not be used, but they must still be populated!

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ all = [
"google-cloud-bigquery>=3.26.0,<3.27",
"google-cloud-pubsub>=2.23.1,<3",
"psycopg2-binary>=2.9.9,<3",
"boto3>=1.35.60,<2.0",
"boto3>=1.35.65,<2.0",
"boto3-stubs>=1.35.65,<2.0",
]

avro = ["fastavro>=1.8,<2.0"]
Expand All @@ -46,7 +47,7 @@ iceberg_aws = ["pyiceberg[pyarrow,glue]>=0.7,<0.8"]
bigquery = ["google-cloud-bigquery>=3.26.0,<3.27"]
pubsub = ["google-cloud-pubsub>=2.23.1,<3"]
postgresql = ["psycopg2-binary>=2.9.9,<3"]
kinesis = ["boto3>=1.35.60,<2.0"]
kinesis = ["boto3>=1.35.65,<2.0", "boto3-stubs[kinesis]>=1.35.65,<2.0"]

[tool.setuptools.packages.find]
include = ["quixstreams*"]
Expand Down
2 changes: 2 additions & 0 deletions quixstreams/sources/community/kinesis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# ruff: noqa: F403
from .kinesis import *
193 changes: 193 additions & 0 deletions quixstreams/sources/community/kinesis/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import logging
import time
from typing import Callable, Generator, Literal, Optional, TypedDict

from typing_extensions import Self

from quixstreams.sources import StatefulSource

try:
import boto3
from botocore.exceptions import ClientError
from mypy_boto3_kinesis import KinesisClient
from mypy_boto3_kinesis.type_defs import GetShardIteratorOutputTypeDef, ShardTypeDef
from mypy_boto3_kinesis.type_defs import RecordTypeDef as KinesisRecord
except ImportError as exc:
raise ImportError(
f"Package {exc.name} is missing: "
'run "pip install quixstreams[kinesis]" to use KinesisSource'
) from exc


logger = logging.getLogger(__name__)

_OFFSET_RESET_DICT = {"earliest": "TRIM_HORIZON", "latest": "LATEST"}
AutoOffsetResetType = Literal["earliest", "latest"]


class KinesisStreamShardsNotFound(Exception):
"""Raised when the Kinesis Stream has no shards"""


class KinesisCheckpointer:
def __init__(self, stateful_source: StatefulSource, commit_interval: float = 5.0):
self._source = stateful_source
self._last_committed_at = time.monotonic()
self._commit_interval = commit_interval

@property
def last_committed_at(self) -> float:
return self._last_committed_at

def get(self, shard_id: str) -> Optional[str]:
return self._source.state.get(shard_id)

def set(self, shard_id: str, sequence_number: str):
self._source.state.set(shard_id, sequence_number)

def commit(self, force: bool = False):
if (
(now := time.monotonic()) - self._last_committed_at > self._commit_interval
) or force:
self._source.flush()
self._last_committed_at = now


class AWSCredentials(TypedDict):
endpoint_url: Optional[str]
region_name: Optional[str]
aws_access_key_id: Optional[str]
aws_secret_access_key: Optional[str]


class KinesisConsumer:
"""
Consume all shards for a given Kinesis stream in a batched, round-robin fashion.
Also handles checkpointing of said stream (requires a `KinesisCheckpointer`).
"""

def __init__(
self,
stream_name: str,
credentials: AWSCredentials,
message_processor: Callable[[KinesisRecord], None],
checkpointer: KinesisCheckpointer,
auto_offset_reset: AutoOffsetResetType = "latest",
max_records_per_shard: int = 1000,
backoff_secs: float = 5.0,
):
self._stream = stream_name
self._credentials = credentials
self._message_processor = message_processor
self._checkpointer = checkpointer
self._shard_iterators: dict[str, str] = {}
self._shard_backoff: dict[str, float] = {}
self._max_records_per_shard = max_records_per_shard
self._backoff_secs = backoff_secs
self._auto_offset_reset = _OFFSET_RESET_DICT[auto_offset_reset]
self._client: Optional[KinesisClient] = None

def process_shards(self):
"""
Process records from the Stream shards one by one and checkpoint their
sequence numbers.
"""
# Iterate over shards one by one
for shard_id in self._shard_iterators:
# Poll records from each shard
for record in self._poll_records(shard_id=shard_id):
# Process the record
self._message_processor(record)
# Save the sequence number of the processed record
self._checkpointer.set(shard_id, record["SequenceNumber"])

def commit(self, force: bool = False):
"""
Commit the checkpoint and save the progress of the
"""
self._checkpointer.commit(force=force)

def __enter__(self) -> Self:
self._init_client()
self._init_shards()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def _init_client(self):
self._client = boto3.client("kinesis", **self._credentials)

def _list_shards(self) -> list[ShardTypeDef]:
"""List all shards in the stream."""
shards: list[ShardTypeDef] = []
response = self._client.list_shards(StreamName=self._stream)
shards.extend(response["Shards"])
while "NextToken" in response: # handle pagination
response = self._client.list_shards(NextToken=response["NextToken"])
shards.extend(response["Shards"])
return shards

def _get_shard_iterator(self, shard_id: str) -> str:
if sequence_number := self._checkpointer.get(shard_id):
kwargs = {
"ShardIteratorType": "AFTER_SEQUENCE_NUMBER",
"StartingSequenceNumber": sequence_number,
}
else:
kwargs = {
"ShardIteratorType": self._auto_offset_reset,
}
response: GetShardIteratorOutputTypeDef = self._client.get_shard_iterator(
StreamName=self._stream, ShardId=shard_id, **kwargs
)
return response["ShardIterator"]

def _init_shards(self):
if not (shards := [shard["ShardId"] for shard in self._list_shards()]):
raise KinesisStreamShardsNotFound(
f'Shards not found for stream "{self._stream}"'
)
self._shard_iterators = {
shard: self._get_shard_iterator(shard) for shard in shards
}

def _poll_records(self, shard_id: str) -> Generator[KinesisRecord, None, None]:
"""
Poll records from the Kinesis Stream shard.

If the shared is backed off, no records will be returned.

:param shard_id: shard id.
"""
if (
backoff_time := self._shard_backoff.get(shard_id, 0.0)
) and backoff_time > time.monotonic():
# The shard is backed off, exit early
return

try:
response = self._client.get_records(
ShardIterator=self._shard_iterators[shard_id],
Limit=self._max_records_per_shard,
)
except ClientError as e:
error_code = e.response["Error"]["Code"]
logger.error(f"Error reading from shard {shard_id}: {error_code}")
if error_code == "ProvisionedThroughputExceededException":
# The shard is backed off by Kinesis, update the backoff deadline
self._shard_backoff[shard_id] = time.monotonic() + self._backoff_secs
elif error_code == "ExpiredIteratorException":
logger.error(f"Shard iterator expired for shard {shard_id}.")
raise
else:
logger.error(f"Unrecoverable error: {e}")
raise
else:
# Yield records for the shard
for record in response.get("Records", []):
yield record

# Update the shard iterator for the next batch
self._shard_iterators[shard_id] = response["NextShardIterator"]
self._shard_backoff[shard_id] = 0
Loading
Loading