From 0f99168628c15397afa0af98aea38a35b56ca062 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Mon, 25 Nov 2024 12:22:33 +0100 Subject: [PATCH] Pass AWS credentials parameters to KinesisSink (#649) * Pass AWS credentials parameters to KinesisSink * Use getenv for default AWS credentials --- docs/connectors/sinks/amazon-kinesis-sink.md | 46 ++++++++++++++++++-- quixstreams/sinks/community/kinesis.py | 18 +++++++- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/docs/connectors/sinks/amazon-kinesis-sink.md b/docs/connectors/sinks/amazon-kinesis-sink.md index 31b9017cf..262b4288b 100644 --- a/docs/connectors/sinks/amazon-kinesis-sink.md +++ b/docs/connectors/sinks/amazon-kinesis-sink.md @@ -47,13 +47,15 @@ topic = app.topic("topic-name") # Configure the sink kinesis_sink = KinesisSink( stream_name="", + # Optional: AWS credentials + aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], + aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], + region_name="eu-west-2", # Optional: customize serialization value_serializer=str, key_serializer=str, # Optional: Additional keyword arguments are passed to the boto3 client - aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], - aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], - region_name="us-west-2", + endpoint_url="http://localhost:4566", # for LocalStack testing ) sdf = app.dataframe(topic=topic) @@ -63,9 +65,24 @@ if __name__ == "__main__": app.run() ``` -### Configuration Options +!!! note + Instead of passing AWS credentials explicitly, you can set them using environment variables: + ```bash + export AWS_ACCESS_KEY_ID="your_access_key" + export AWS_SECRET_ACCESS_KEY="your_secret_key" + export AWS_DEFAULT_REGION="eu-west-2" + ``` + Then you can create the sink with just the stream name: + ```python + kinesis_sink = KinesisSink(stream_name="") + ``` + +## Configuration Options - `stream_name`: The name of the Kinesis stream +- `aws_access_key_id`: AWS access key ID for authentication +- `aws_secret_access_key`: AWS secret access key for authentication +- `region_name`: AWS region name (e.g., "us-west-2") - `value_serializer`: Function to serialize message values to string (default: `json.dumps`) - `key_serializer`: Function to serialize message keys to string (default: `bytes.decode`) - Additional keyword arguments are passed to the `boto3.client` @@ -83,3 +100,24 @@ The sink provides **at-least-once** delivery guarantees, which means: - This ensures no messages are lost, but some might be delivered more than once This behavior makes the sink reliable but the downstream systems must be prepared to handle duplicate messages. If your application requires exactly-once semantics, you'll need to implement deduplication logic in your consumer. + +## Testing Locally + +Rather than connect to AWS, you can alternatively test your application using a local Kinesis host via Docker: + +1. Execute in terminal: + + ```bash + docker run --rm -d --name kinesis \ + -p 4566:4566 \ + -e SERVICES=kinesis \ + -e EDGE_PORT=4566 \ + -e DEBUG=1 \ + localstack/localstack:latest + ``` + +2. Set `endpoint_url` for `KinesisSink` _OR_ the `AWS_ENDPOINT_URL_KINESIS` + environment variable to `http://localhost:4566` + +3. Set all other `aws_` parameters for `KinesisSink` to _any_ string. +They will not be used, but they must still be populated! diff --git a/quixstreams/sinks/community/kinesis.py b/quixstreams/sinks/community/kinesis.py index cfcf48f36..fd3441fbb 100644 --- a/quixstreams/sinks/community/kinesis.py +++ b/quixstreams/sinks/community/kinesis.py @@ -1,7 +1,8 @@ import json from collections import defaultdict from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait -from typing import Any, Callable +from os import getenv +from typing import Any, Callable, Optional try: import boto3 @@ -27,6 +28,9 @@ class KinesisSink(BaseSink): def __init__( self, stream_name: str, + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"), + region_name: Optional[str] = getenv("AWS_REGION", getenv("AWS_DEFAULT_REGION")), value_serializer: Callable[[Any], str] = json.dumps, key_serializer: Callable[[Any], str] = bytes.decode, **kwargs, @@ -35,6 +39,9 @@ def __init__( Initialize the KinesisSink. :param stream_name: Kinesis stream name. + :param aws_access_key_id: AWS access key ID. + :param aws_secret_access_key: AWS secret access key. + :param region_name: AWS region name (e.g., 'us-east-1'). :param value_serializer: Function to serialize the value to string (defaults to json.dumps). :param key_serializer: Function to serialize the key to string @@ -42,7 +49,6 @@ def __init__( :param kwargs: Additional keyword arguments passed to boto3.client. """ self._stream_name = stream_name - self._kinesis = boto3.client("kinesis", **kwargs) self._value_serializer = value_serializer self._key_serializer = key_serializer @@ -53,6 +59,14 @@ def __init__( # that records are sent in order at the expense of throughput. self._executor = ThreadPoolExecutor(max_workers=1) + self._kinesis = boto3.client( + "kinesis", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region_name, + **kwargs, + ) + # Check if the Kinesis stream exists try: self._kinesis.describe_stream(StreamName=self._stream_name)