From 0ac2ff1ab4085c7c8c0d10e712752056febd81af Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 27 Nov 2024 13:52:34 +0100 Subject: [PATCH] Update documentation (#648) Co-authored-by: daniil-quix <133032822+daniil-quix@users.noreply.github.com> --- docs/api-reference/application.md | 10 +- docs/api-reference/quixstreams.md | 302 +++++++++++++++++++++++++++++- docs/api-reference/sinks.md | 155 +++++++++++++++ docs/api-reference/sources.md | 95 ++++++++++ 4 files changed, 552 insertions(+), 10 deletions(-) diff --git a/docs/api-reference/application.md b/docs/api-reference/application.md index 7b7b1314e..bbb7e3878 100644 --- a/docs/api-reference/application.md +++ b/docs/api-reference/application.md @@ -494,7 +494,7 @@ Default: the source default def run(dataframe: Optional[StreamingDataFrame] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L679) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L685) Start processing data from Kafka using provided `StreamingDataFrame` @@ -530,7 +530,7 @@ app.run() def setup_topics() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L802) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L808) Validate and create the topics @@ -542,7 +542,7 @@ Validate and create the topics class ApplicationConfig(BaseSettings) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L978) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L984) Immutable object holding the application configuration @@ -565,7 +565,7 @@ def settings_customise_sources( ) -> Tuple[PydanticBaseSettingsSource, ...] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1013) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1019) Included to ignore reading/setting values from the environment @@ -579,7 +579,7 @@ Included to ignore reading/setting values from the environment def copy(**kwargs) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1026) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1032) Update the application config and return a copy diff --git a/docs/api-reference/quixstreams.md b/docs/api-reference/quixstreams.md index f85e9a953..6ece99d7a 100644 --- a/docs/api-reference/quixstreams.md +++ b/docs/api-reference/quixstreams.md @@ -3823,12 +3823,155 @@ During this timeout, a request can be retried according to the client's default retrying policy. - `kwargs`: Additional keyword arguments passed to `bigquery.Client`. + + +## quixstreams.sinks.community.kinesis + + + +### KinesisStreamNotFoundError + +```python +class KinesisStreamNotFoundError(Exception) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L23) + +Raised when the specified Kinesis stream does not exist. + + + +### KinesisSink + +```python +class KinesisSink(BaseSink) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L27) + + + +#### KinesisSink.\_\_init\_\_ + +```python +def __init__(stream_name: str, + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv( + "AWS_SECRET_ACCESS_KEY"), + region_name: Optional[str] = getenv("AWS_REGION", + getenv("AWS_DEFAULT_REGION")), + value_serializer: Callable[[Any], str] = json.dumps, + key_serializer: Callable[[Any], str] = bytes.decode, + **kwargs) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L28) + +Initialize the KinesisSink. + +**Arguments**: + +- `stream_name`: Kinesis stream name. +- `aws_access_key_id`: AWS access key ID. +- `aws_secret_access_key`: AWS secret access key. +- `region_name`: AWS region name (e.g., 'us-east-1'). +- `value_serializer`: Function to serialize the value to string +(defaults to json.dumps). +- `key_serializer`: Function to serialize the key to string +(defaults to bytes.decode). +- `kwargs`: Additional keyword arguments passed to boto3.client. + + + +#### KinesisSink.add + +```python +def add(value: Any, key: Any, timestamp: int, + headers: list[tuple[str, HeaderValue]], topic: str, partition: int, + offset: int) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L80) + +Buffer a record for the Kinesis stream. + +Records are buffered until the batch size reaches 500, at which point +they are sent immediately. If the batch size is less than 500, records +will be sent when the flush method is called. + + + +#### KinesisSink.flush + +```python +def flush(topic: str, partition: int) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L110) + +Flush all buffered records for a given topic-partition. + +This method sends any outstanding records that have not yet been sent +because the batch size was less than 500. It waits for all futures to +complete, ensuring that all records are successfully sent to the Kinesis +stream. + ## quixstreams.sinks.community This module contains Sinks developed and maintained by the members of Quix Streams community. + + +## quixstreams.sinks.community.redis + + + +### RedisSink + +```python +class RedisSink(BatchingSink) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/redis.py#L21) + + + +#### RedisSink.\_\_init\_\_ + +```python +def __init__(host: str, + port: int, + db: int, + value_serializer: Callable[[Any], Union[bytes, str]] = json.dumps, + key_serializer: Optional[Callable[[Any, Any], Union[bytes, + str]]] = None, + password: Optional[str] = None, + socket_timeout: float = 30.0, + **kwargs) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/redis.py#L22) + +A connector to sink processed data to Redis. + +It batches the processed records in memory per topic partition, and flushes them to Redis at the checkpoint. + +**Arguments**: + +- `host`: Redis host. +- `port`: Redis port. +- `db`: Redis DB number. +- `value_serializer`: a callable to serialize the value to string or bytes +(defaults to json.dumps). +- `key_serializer`: an optional callable to serialize the key to string or bytes. +If not provided, the Kafka message key will be used as is. +- `password`: Redis password, optional. +- `socket_timeout`: Redis socket timeout. +Default - 30s. +- `kwargs`: Additional keyword arguments passed to the `redis.Redis` instance. + ## quixstreams.sinks.community.iceberg @@ -10110,7 +10253,7 @@ Default: the source default def run(dataframe: Optional[StreamingDataFrame] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L679) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L685) Start processing data from Kafka using provided `StreamingDataFrame` @@ -10142,7 +10285,7 @@ app.run() def setup_topics() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L802) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L808) Validate and create the topics @@ -10154,7 +10297,7 @@ Validate and create the topics class ApplicationConfig(BaseSettings) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L978) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L984) Immutable object holding the application configuration @@ -10175,7 +10318,7 @@ def settings_customise_sources( ) -> Tuple[PydanticBaseSettingsSource, ...] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1013) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1019) Included to ignore reading/setting values from the environment @@ -10187,7 +10330,7 @@ Included to ignore reading/setting values from the environment def copy(**kwargs) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1026) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1032) Update the application config and return a copy @@ -10461,6 +10604,155 @@ Default - `"excel"`. ## quixstreams.sources + + +## quixstreams.sources.community.kinesis.kinesis + + + +### KinesisSource + +```python +class KinesisSource(StatefulSource) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/kinesis/kinesis.py#L18) + +NOTE: Requires `pip install quixstreams[kinesis]` to work. + +This source reads data from an Amazon Kinesis stream, dumping it to a +kafka topic using desired `StreamingDataFrame`-based transformations. + +Provides "at-least-once" guarantees. + +The incoming message value will be in bytes, so transform in your SDF accordingly. + +Example Usage: + +```python +from quixstreams import Application +from quixstreams.sources.community.kinesis import KinesisSource + + +kinesis = KinesisSource( + stream_name="", + aws_access_key_id="", + aws_secret_access_key="", + aws_region="", + auto_offset_reset="earliest", # start from the beginning of the stream (vs end) +) + +app = Application( + broker_address="", + consumer_group="", +) + +sdf = app.dataframe(source=kinesis).print(metadata=True) +# YOUR LOGIC HERE! + +if __name__ == "__main__": + app.run() +``` + + + +#### KinesisSource.\_\_init\_\_ + +```python +def __init__( + stream_name: str, + aws_region: Optional[str] = getenv("AWS_REGION"), + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"), + aws_endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_KINESIS"), + shutdown_timeout: float = 10, + auto_offset_reset: AutoOffsetResetType = "latest", + max_records_per_shard: int = 1000, + commit_interval: float = 5.0, + retry_backoff_secs: float = 5.0) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/kinesis/kinesis.py#L57) + +**Arguments**: + +- `stream_name`: name of the desired Kinesis stream to consume. +- `aws_region`: The AWS region. +NOTE: can alternatively set the AWS_REGION environment variable +- `aws_access_key_id`: the AWS access key ID. +NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable +- `aws_secret_access_key`: the AWS secret access key. +NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable +- `aws_endpoint_url`: the endpoint URL to use; only required for connecting +to a locally hosted Kinesis. +NOTE: can alternatively set the AWS_ENDPOINT_URL_KINESIS environment variable +- `shutdown_timeout`: +- `auto_offset_reset`: When no previous offset has been recorded, whether to +start from the beginning ("earliest") or end ("latest") of the stream. +- `max_records_per_shard`: During round-robin consumption, how many records +to consume per shard (partition) per consume (NOT per-commit). +- `commit_interval`: the time between commits +- `retry_backoff_secs`: how long to back off from doing HTTP calls for a +shard when Kinesis consumer encounters handled/expected errors. + + + +## quixstreams.sources.community.kinesis + + + +## quixstreams.sources.community.kinesis.consumer + + + +### KinesisStreamShardsNotFound + +```python +class KinesisStreamShardsNotFound(Exception) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/kinesis/consumer.py#L28) + +Raised when the Kinesis Stream has no shards + + + +### KinesisConsumer + +```python +class KinesisConsumer() +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/kinesis/consumer.py#L63) + +Consume all shards for a given Kinesis stream in a batched, round-robin fashion. +Also handles checkpointing of said stream (requires a `KinesisCheckpointer`). + + + +#### KinesisConsumer.process\_shards + +```python +def process_shards() +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/kinesis/consumer.py#L90) + +Process records from the Stream shards one by one and checkpoint their +sequence numbers. + + + +#### KinesisConsumer.commit + +```python +def commit(force: bool = False) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/kinesis/consumer.py#L104) + +Commit the checkpoint and save the progress of the + ## quixstreams.sources.community.file.formats.parquet diff --git a/docs/api-reference/sinks.md b/docs/api-reference/sinks.md index aa9c2b47b..ef7bfd98c 100644 --- a/docs/api-reference/sinks.md +++ b/docs/api-reference/sinks.md @@ -1124,3 +1124,158 @@ A connector to sink topic data to PostgreSQL. - `ddl_timeout`: Timeout for DDL operations such as table creation or schema updates. - `kwargs`: Additional parameters for `psycopg2.connect`. + + +## quixstreams.sinks.community.kinesis + + + +### KinesisStreamNotFoundError + +```python +class KinesisStreamNotFoundError(Exception) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L23) + +Raised when the specified Kinesis stream does not exist. + + + +### KinesisSink + +```python +class KinesisSink(BaseSink) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L27) + + + +

+ +#### KinesisSink.\_\_init\_\_ + +```python +def __init__(stream_name: str, + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv( + "AWS_SECRET_ACCESS_KEY"), + region_name: Optional[str] = getenv("AWS_REGION", + getenv("AWS_DEFAULT_REGION")), + value_serializer: Callable[[Any], str] = json.dumps, + key_serializer: Callable[[Any], str] = bytes.decode, + **kwargs) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L28) + +Initialize the KinesisSink. + + +
+***Arguments:*** + +- `stream_name`: Kinesis stream name. +- `aws_access_key_id`: AWS access key ID. +- `aws_secret_access_key`: AWS secret access key. +- `region_name`: AWS region name (e.g., 'us-east-1'). +- `value_serializer`: Function to serialize the value to string +(defaults to json.dumps). +- `key_serializer`: Function to serialize the key to string +(defaults to bytes.decode). +- `kwargs`: Additional keyword arguments passed to boto3.client. + + + +

+ +#### KinesisSink.add + +```python +def add(value: Any, key: Any, timestamp: int, + headers: list[tuple[str, HeaderValue]], topic: str, partition: int, + offset: int) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L80) + +Buffer a record for the Kinesis stream. + +Records are buffered until the batch size reaches 500, at which point +they are sent immediately. If the batch size is less than 500, records +will be sent when the flush method is called. + + + +

+ +#### KinesisSink.flush + +```python +def flush(topic: str, partition: int) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L110) + +Flush all buffered records for a given topic-partition. + +This method sends any outstanding records that have not yet been sent +because the batch size was less than 500. It waits for all futures to +complete, ensuring that all records are successfully sent to the Kinesis +stream. + + + +## quixstreams.sinks.community.redis + + + +### RedisSink + +```python +class RedisSink(BatchingSink) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/redis.py#L21) + + + +

+ +#### RedisSink.\_\_init\_\_ + +```python +def __init__(host: str, + port: int, + db: int, + value_serializer: Callable[[Any], Union[bytes, str]] = json.dumps, + key_serializer: Optional[Callable[[Any, Any], Union[bytes, + str]]] = None, + password: Optional[str] = None, + socket_timeout: float = 30.0, + **kwargs) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/redis.py#L22) + +A connector to sink processed data to Redis. + +It batches the processed records in memory per topic partition, and flushes them to Redis at the checkpoint. + + +
+***Arguments:*** + +- `host`: Redis host. +- `port`: Redis port. +- `db`: Redis DB number. +- `value_serializer`: a callable to serialize the value to string or bytes +(defaults to json.dumps). +- `key_serializer`: an optional callable to serialize the key to string or bytes. +If not provided, the Kafka message key will be used as is. +- `password`: Redis password, optional. +- `socket_timeout`: Redis socket timeout. +Default - 30s. +- `kwargs`: Additional keyword arguments passed to the `redis.Redis` instance. + diff --git a/docs/api-reference/sources.md b/docs/api-reference/sources.md index 7c3c6165f..c0a3dd47f 100644 --- a/docs/api-reference/sources.md +++ b/docs/api-reference/sources.md @@ -971,6 +971,101 @@ with {_key: str, _value: dict, _timestamp: int}. ## quixstreams.sources.community.file.formats.parquet + + +## quixstreams.sources.community.kinesis.kinesis + + + +### KinesisSource + +```python +class KinesisSource(StatefulSource) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/kinesis/kinesis.py#L18) + +NOTE: Requires `pip install quixstreams[kinesis]` to work. + +This source reads data from an Amazon Kinesis stream, dumping it to a +kafka topic using desired `StreamingDataFrame`-based transformations. + +Provides "at-least-once" guarantees. + +The incoming message value will be in bytes, so transform in your SDF accordingly. + +Example Usage: + +```python +from quixstreams import Application +from quixstreams.sources.community.kinesis import KinesisSource + + +kinesis = KinesisSource( + stream_name="", + aws_access_key_id="", + aws_secret_access_key="", + aws_region="", + auto_offset_reset="earliest", # start from the beginning of the stream (vs end) +) + +app = Application( + broker_address="", + consumer_group="", +) + +sdf = app.dataframe(source=kinesis).print(metadata=True) +# YOUR LOGIC HERE! + +if __name__ == "__main__": + app.run() +``` + + + +

+ +#### KinesisSource.\_\_init\_\_ + +```python +def __init__( + stream_name: str, + aws_region: Optional[str] = getenv("AWS_REGION"), + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"), + aws_endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_KINESIS"), + shutdown_timeout: float = 10, + auto_offset_reset: AutoOffsetResetType = "latest", + max_records_per_shard: int = 1000, + commit_interval: float = 5.0, + retry_backoff_secs: float = 5.0) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/kinesis/kinesis.py#L57) + + +
+***Arguments:*** + +- `stream_name`: name of the desired Kinesis stream to consume. +- `aws_region`: The AWS region. +NOTE: can alternatively set the AWS_REGION environment variable +- `aws_access_key_id`: the AWS access key ID. +NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable +- `aws_secret_access_key`: the AWS secret access key. +NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable +- `aws_endpoint_url`: the endpoint URL to use; only required for connecting +to a locally hosted Kinesis. +NOTE: can alternatively set the AWS_ENDPOINT_URL_KINESIS environment variable +- `shutdown_timeout`: +- `auto_offset_reset`: When no previous offset has been recorded, whether to +start from the beginning ("earliest") or end ("latest") of the stream. +- `max_records_per_shard`: During round-robin consumption, how many records +to consume per shard (partition) per consume (NOT per-commit). +- `commit_interval`: the time between commits +- `retry_backoff_secs`: how long to back off from doing HTTP calls for a +shard when Kinesis consumer encounters handled/expected errors. + ## quixstreams.sources.community.pubsub.pubsub