Skip to content

Commit

Permalink
Kafka Replicator Source & Quix Environment Source (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-quix authored Sep 19, 2024
1 parent 102494e commit d09e7fa
Show file tree
Hide file tree
Showing 20 changed files with 1,104 additions and 46 deletions.
3 changes: 3 additions & 0 deletions docs/build/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@
k: None
for k in [
"quixstreams.sources.base",
"quixstreams.sources.csv",
"quixstreams.sources.kafka.kafka",
"quixstreams.sources.kafka.quix",
]
},
}
Expand Down
2 changes: 2 additions & 0 deletions docs/connectors/sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ if __name__ == "__main__":
Quix streams provide a source out of the box.

* [CSVSource](./csv-source.md): A source that reads data from a single CSV file.
* [KafkaReplicatorSource](./kafka-source.md): A source that replicates a topic from a Kafka broker to your application broker.
* [QuixEnvironmentSource](./quix-source.md): A source that replicates a topic from a Quix Cloud environment to your application broker.

You can also implement your own, have a look at [Creating a Custom Source](custom-sources.md) for documentation on how to do that.

Expand Down
42 changes: 42 additions & 0 deletions docs/connectors/sources/kafka-source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Kafka Replicator Source

A source that reads data from a Kafka topic and produce it to another Kafka topic. The two topics can be located on different Kafka clusters.

This source supports exactly-once guarantees.

## How to use the Kafka Replicator Source

To use a Kafka Replicator source, you need to create an instance of `KafkaReplicatorSource` and pass it to the `app.dataframe()` method.

```python
from quixstreams import Application
from quixstreams.sources import KafkaReplicatorSource

def main():
app = Application()
source = KafkaReplicatorSource(
name="my-source",
app_config=app.config,
topic="source-topic",
broker_address="source-broker-address"
)

sdf = app.dataframe(source=source)
sdf.print(metadata=True)

app.run(sdf)

if __name__ == "__main__":
main()
```

## Topic

The Kafka Replicator source only deals with bytes. It reads the remote keys and values as bytes and produces them directly as bytes.
You can configure the key and value deserializer used by the Streaming Dataframe with the `key_deserializer` and `value_deserializer` paramaters.

## Consumer group

The Kafka Replicator consumer group is the source name prefixed by `source-`. Changing the name will reset the source state and it will re-replicate the data based on the configured `auto_offset_reset`. It is not based on the application consumer group, changing the application consumer group will not reset the source.

For more information about consumer group [see the glosary](https://quix.io/docs/kb/glossary.html?h=consumer+group#consumer-group)
34 changes: 34 additions & 0 deletions docs/connectors/sources/quix-source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Quix Environment Source

A specialised [Kafka Source](kafka-source.md) that simplify copying data from a Quix environment.

## How to use the Quix Environment Source

To use a Quix Environment source, you need to create an instance of `QuixEnvironmentSource` and pass it to the `app.dataframe()` method.

```python
from quixstreams import Application
from quixstreams.sources import QuixEnvironmentSource

def main():
app = Application()
source = QuixEnvironmentSource(
name="my-source",
app_config=app.config,
topic="source-topic",
quix_sdk_token="quix-sdk-token",
quix_workspace_id="quix-workspace-id",
)

sdf = app.dataframe(source=source)
sdf.print(metadata=True)

app.run(sdf)

if __name__ == "__main__":
main()
```

## Token

The Quix Environment Source requires the sdk token of the source environment. [Click here](../../../develop/authentication/streaming-token.md) for more information on SDK tokens.
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ nav:
- Sources:
- 'connectors/sources/README.md'
- CSV Source: connectors/sources/csv-source.md
- Kafka Replicator Source: connectors/sources/kafka-source.md
- Quix Source: connectors/sources/quix-source.md
- Creating a Custom Source: connectors/sources/custom-sources.md
- Upgrading Guide:
- Upgrading from Quix Streams v0.5: upgrading-legacy.md
Expand Down
15 changes: 11 additions & 4 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def get_producer(self) -> Producer:
producer.produce(topic=topic.name, key=b"key", value=b"value")
```
"""
self._setup_topics()
self.setup_topics()

return Producer(
broker_address=self._config.broker_address,
Expand Down Expand Up @@ -725,8 +725,11 @@ def get_consumer(self, auto_commit_enable: bool = True) -> Consumer:
# consumer.store_offsets(msg)
```
:param auto_commit_enable: Enable or disable auto commit
Default - True
"""
self._setup_topics()
self.setup_topics()

return Consumer(
broker_address=self._config.broker_address,
Expand Down Expand Up @@ -817,7 +820,7 @@ def _run(self, dataframe: Optional[StreamingDataFrame] = None):
if self.is_quix_app:
self._quix_runtime_init()

self._setup_topics()
self.setup_topics()

exit_stack = contextlib.ExitStack()
exit_stack.enter_context(self._processing_context)
Expand Down Expand Up @@ -881,7 +884,11 @@ def _quix_runtime_init(self):
if self._state_manager.stores:
check_state_management_enabled()

def _setup_topics(self):
def setup_topics(self):
"""
Validate and create the topics
"""

topics_list = ", ".join(
f'"{topic}"' for topic in self._topic_manager.all_topics
)
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/checkpointing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .checkpoint import Checkpoint
from .checkpoint import Checkpoint, BaseCheckpoint
from .exceptions import InvalidStoredOffset
70 changes: 53 additions & 17 deletions quixstreams/checkpointing/checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import time
from abc import abstractmethod
from typing import Dict, Tuple

from confluent_kafka import TopicPartition, KafkaException
Expand All @@ -24,21 +25,19 @@
logger = logging.getLogger(__name__)


class Checkpoint:
class BaseCheckpoint:
"""
Class to keep track of state updates and consumer offsets and to checkpoint these
Base class to keep track of state updates and consumer offsets and to checkpoint these
updates on schedule.
Two implementations exist:
* one for checkpointing the Application in quixstreams/checkpoint/checkpoint.py
* one for checkpointing the kafka source in quixstreams/sources/kafka/checkpoint.py
"""

def __init__(
self,
commit_interval: float,
producer: RowProducer,
consumer: Consumer,
state_manager: StateStoreManager,
sink_manager: SinkManager,
pausing_manager: PausingManager,
exactly_once: bool = False,
commit_every: int = 0,
):
self._created_at = time.monotonic()
Expand All @@ -52,18 +51,9 @@ def __init__(
# Passing zero or lower will flush the checkpoint after each processed message
self._commit_interval = max(commit_interval, 0)

self._state_manager = state_manager
self._consumer = consumer
self._producer = producer
self._exactly_once = exactly_once
self._sink_manager = sink_manager
self._pausing_manager = pausing_manager
self._commit_every = commit_every
self._total_offsets_processed = 0

if self._exactly_once:
self._producer.begin_transaction()

def expired(self) -> bool:
"""
Returns `True` if checkpoint deadline has expired OR
Expand Down Expand Up @@ -107,6 +97,52 @@ def store_offset(self, topic: str, partition: int, offset: int):
self._starting_tp_offsets[tp] = offset
self._total_offsets_processed += 1

@abstractmethod
def close(self):
"""
Perform cleanup (when the checkpoint is empty) instead of committing.
Needed for exactly-once, as Kafka transactions are timeboxed.
"""

@abstractmethod
def commit(self):
"""
Commit the checkpoint.
"""
pass


class Checkpoint(BaseCheckpoint):
"""
Checkpoint implementation used by the application
"""

def __init__(
self,
commit_interval: float,
producer: RowProducer,
consumer: Consumer,
state_manager: StateStoreManager,
sink_manager: SinkManager,
pausing_manager: PausingManager,
exactly_once: bool = False,
commit_every: int = 0,
):
super().__init__(
commit_interval=commit_interval,
commit_every=commit_every,
)

self._state_manager = state_manager
self._consumer = consumer
self._producer = producer
self._sink_manager = sink_manager
self._pausing_manager = pausing_manager
self._exactly_once = exactly_once
if self._exactly_once:
self._producer.begin_transaction()

def get_store_transaction(
self, topic: str, partition: int, store_name: str = DEFAULT_STATE_STORE_NAME
) -> PartitionTransaction:
Expand Down
10 changes: 10 additions & 0 deletions quixstreams/models/topics/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,13 @@ def create_topics(
),
finalize_timeout=finalize_timeout,
)

# support pickling by dropping the inner admin
def __getstate__(self) -> object:
state = self.__dict__.copy()
state.pop("_inner_admin", None)
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._inner_admin = None
2 changes: 1 addition & 1 deletion quixstreams/rowproducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def produce_row(
return
raise

def poll(self, timeout: float = None):
def poll(self, timeout: float = 0):
"""
Polls the producer for events and calls `on_delivery` callbacks.
Expand Down
1 change: 1 addition & 0 deletions quixstreams/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .manager import SourceException
from .multiprocessing import multiprocessing
from .csv import CSVSource
from .kafka import KafkaReplicatorSource, QuixEnvironmentSource
2 changes: 2 additions & 0 deletions quixstreams/sources/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .kafka import *
from .quix import *
88 changes: 88 additions & 0 deletions quixstreams/sources/kafka/checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from confluent_kafka import TopicPartition, KafkaException

from typing import List

from quixstreams.checkpointing import BaseCheckpoint
from quixstreams.checkpointing.exceptions import (
CheckpointProducerTimeout,
CheckpointConsumerCommitError,
)
from quixstreams.models.topics import Topic
from quixstreams.rowconsumer import Consumer
from quixstreams.rowproducer import RowProducer


class Checkpoint(BaseCheckpoint):
"""
Checkpoint implementation used by the KafkaReplicatorSource
"""

def __init__(
self,
producer: RowProducer,
producer_topic: Topic,
consumer: Consumer,
commit_interval: float,
commit_every: int = 0,
flush_timeout: float = 10,
exactly_once: bool = False,
):
super().__init__(commit_interval, commit_every)

self._producer = producer
self._producer_topic = producer_topic
self._consumer = consumer
self._flush_timeout = flush_timeout
self._exactly_once = exactly_once

if self._exactly_once:
self._producer.begin_transaction()

def close(self):
"""
Perform cleanup (when the checkpoint is empty) instead of committing.
Needed for exactly-once, as Kafka transactions are timeboxed.
"""
if self._exactly_once:
self._producer.abort_transaction()

def commit(self):
"""
Commit the checkpoint.
This method will:
1. Flush the producer to ensure everything is delivered.
2. Commit topic offsets.
"""
unproduced_msg_count = self._producer.flush(self._flush_timeout)
if unproduced_msg_count > 0:
raise CheckpointProducerTimeout(
f"'{unproduced_msg_count}' messages failed to be produced before the producer flush timeout"
)

offsets = [
TopicPartition(
topic=self._producer_topic.name,
partition=partition,
offset=offset + 1,
)
for (_, partition), offset in self._tp_offsets.items()
]
self._tp_offsets = {}

try:
self._commit(offsets=offsets)
except KafkaException as e:
raise CheckpointConsumerCommitError(e.args[0]) from None

def _commit(self, offsets: List[TopicPartition]):
if self._exactly_once:
self._producer.commit_transaction(
offsets, self._consumer.consumer_group_metadata()
)
else:
partitions = self._consumer.commit(offsets=offsets, asynchronous=False)
for partition in partitions:
if partition.error:
raise CheckpointConsumerCommitError(partition.error)
Loading

0 comments on commit d09e7fa

Please sign in to comment.