Skip to content

Commit

Permalink
Pass AWS credentials parameters to KinesisSink (#649)
Browse files Browse the repository at this point in the history
* Pass AWS credentials parameters to KinesisSink

* Use getenv for default AWS credentials
  • Loading branch information
gwaramadze authored Nov 25, 2024
1 parent 45ee8a1 commit 0f99168
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 6 deletions.
46 changes: 42 additions & 4 deletions docs/connectors/sinks/amazon-kinesis-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ topic = app.topic("topic-name")
# Configure the sink
kinesis_sink = KinesisSink(
stream_name="<stream name>",
# Optional: AWS credentials
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
region_name="eu-west-2",
# Optional: customize serialization
value_serializer=str,
key_serializer=str,
# Optional: Additional keyword arguments are passed to the boto3 client
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
region_name="us-west-2",
endpoint_url="http://localhost:4566", # for LocalStack testing
)

sdf = app.dataframe(topic=topic)
Expand All @@ -63,9 +65,24 @@ if __name__ == "__main__":
app.run()
```

### Configuration Options
!!! note
Instead of passing AWS credentials explicitly, you can set them using environment variables:
```bash
export AWS_ACCESS_KEY_ID="your_access_key"
export AWS_SECRET_ACCESS_KEY="your_secret_key"
export AWS_DEFAULT_REGION="eu-west-2"
```
Then you can create the sink with just the stream name:
```python
kinesis_sink = KinesisSink(stream_name="<stream name>")
```

## Configuration Options

- `stream_name`: The name of the Kinesis stream
- `aws_access_key_id`: AWS access key ID for authentication
- `aws_secret_access_key`: AWS secret access key for authentication
- `region_name`: AWS region name (e.g., "us-west-2")
- `value_serializer`: Function to serialize message values to string (default: `json.dumps`)
- `key_serializer`: Function to serialize message keys to string (default: `bytes.decode`)
- Additional keyword arguments are passed to the `boto3.client`
Expand All @@ -83,3 +100,24 @@ The sink provides **at-least-once** delivery guarantees, which means:
- This ensures no messages are lost, but some might be delivered more than once

This behavior makes the sink reliable but the downstream systems must be prepared to handle duplicate messages. If your application requires exactly-once semantics, you'll need to implement deduplication logic in your consumer.

## Testing Locally

Rather than connect to AWS, you can alternatively test your application using a local Kinesis host via Docker:

1. Execute in terminal:

```bash
docker run --rm -d --name kinesis \
-p 4566:4566 \
-e SERVICES=kinesis \
-e EDGE_PORT=4566 \
-e DEBUG=1 \
localstack/localstack:latest
```

2. Set `endpoint_url` for `KinesisSink` _OR_ the `AWS_ENDPOINT_URL_KINESIS`
environment variable to `http://localhost:4566`

3. Set all other `aws_` parameters for `KinesisSink` to _any_ string.
They will not be used, but they must still be populated!
18 changes: 16 additions & 2 deletions quixstreams/sinks/community/kinesis.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
from collections import defaultdict
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from typing import Any, Callable
from os import getenv
from typing import Any, Callable, Optional

try:
import boto3
Expand All @@ -27,6 +28,9 @@ class KinesisSink(BaseSink):
def __init__(
self,
stream_name: str,
aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"),
region_name: Optional[str] = getenv("AWS_REGION", getenv("AWS_DEFAULT_REGION")),
value_serializer: Callable[[Any], str] = json.dumps,
key_serializer: Callable[[Any], str] = bytes.decode,
**kwargs,
Expand All @@ -35,14 +39,16 @@ def __init__(
Initialize the KinesisSink.
:param stream_name: Kinesis stream name.
:param aws_access_key_id: AWS access key ID.
:param aws_secret_access_key: AWS secret access key.
:param region_name: AWS region name (e.g., 'us-east-1').
:param value_serializer: Function to serialize the value to string
(defaults to json.dumps).
:param key_serializer: Function to serialize the key to string
(defaults to bytes.decode).
:param kwargs: Additional keyword arguments passed to boto3.client.
"""
self._stream_name = stream_name
self._kinesis = boto3.client("kinesis", **kwargs)
self._value_serializer = value_serializer
self._key_serializer = key_serializer

Expand All @@ -53,6 +59,14 @@ def __init__(
# that records are sent in order at the expense of throughput.
self._executor = ThreadPoolExecutor(max_workers=1)

self._kinesis = boto3.client(
"kinesis",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
**kwargs,
)

# Check if the Kinesis stream exists
try:
self._kinesis.describe_stream(StreamName=self._stream_name)
Expand Down

0 comments on commit 0f99168

Please sign in to comment.