Skip to content

Commit

Permalink
Stateful sources implementation (#615)
Browse files Browse the repository at this point in the history
Enhance source connectors by allowing access to a kafka backed state

* Implement stateful sources

* Added support for creating changelog topics without a related "data" topic

Stateful sources don't have an input topic on which to associate the changelog topic. The topic and recovery manager need to support a changelog topic with a topic set to None.

* Added support for custom changelog topic configuration (instead of relying on the "data" topic configuration)

When creating the changelog topic we use a configuration similar to the data topic configuration (replication factor, number of partitions, ...). As the data topic is in-existant for stateful sources we implement a way to create a changelog topic with a passed configuration.
  • Loading branch information
quentin-quix authored Nov 20, 2024
1 parent 27e85d2 commit 38a4fd6
Show file tree
Hide file tree
Showing 14 changed files with 470 additions and 130 deletions.
74 changes: 48 additions & 26 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def __init__(
if broker_address:
# If broker_address is passed to the app it takes priority over any quix configuration
self._is_quix_app = False
topic_manager_factory = TopicManager
self._topic_manager_factory = TopicManager
if isinstance(broker_address, str):
broker_address = ConnectionConfig(bootstrap_servers=broker_address)
else:
Expand Down Expand Up @@ -245,7 +245,7 @@ def __init__(
f"{quix_app_source} detected; "
f"the application will connect to Quix Cloud brokers"
)
topic_manager_factory = functools.partial(
self._topic_manager_factory = functools.partial(
QuixTopicManager, quix_config_builder=quix_config_builder
)
# Check if the state dir points to the mounted PVC while running on Quix
Expand Down Expand Up @@ -288,30 +288,12 @@ def __init__(
self._on_message_processed = on_message_processed
self._on_processing_error = on_processing_error or default_on_processing_error

self._consumer = RowConsumer(
broker_address=self._config.broker_address,
consumer_group=self._config.consumer_group,
auto_offset_reset=self._config.auto_offset_reset,
auto_commit_enable=False, # Disable auto commit and manage commits manually
extra_config=self._config.consumer_extra_config,
on_error=on_consumer_error,
)
self._consumer = self._get_rowconsumer(on_error=on_consumer_error)
self._producer = self._get_rowproducer(on_error=on_producer_error)
self._running = False
self._failed = False

if not topic_manager:
topic_manager = topic_manager_factory(
topic_admin=TopicAdmin(
broker_address=self._config.broker_address,
extra_config=self._config.producer_extra_config,
),
consumer_group=self._config.consumer_group,
timeout=self._config.request_timeout,
create_timeout=self._config.topic_create_timeout,
auto_create_topics=self._config.auto_create_topics,
)
self._topic_manager = topic_manager
self._topic_manager = topic_manager or self._get_topic_manager()

producer = None
recovery_manager = None
Expand Down Expand Up @@ -369,6 +351,23 @@ def Quix(cls, *args, **kwargs):
'"Quix__Sdk__Token" environment variable'
)

def _get_topic_manager(self) -> TopicManager:
"""
Create a TopicAdmin using the application config
Used to create the application topic admin as well as the sources topic admins
"""
return self._topic_manager_factory(
topic_admin=TopicAdmin(
broker_address=self._config.broker_address,
extra_config=self._config.producer_extra_config,
),
consumer_group=self._config.consumer_group,
timeout=self._config.request_timeout,
create_timeout=self._config.topic_create_timeout,
auto_create_topics=self._config.auto_create_topics,
)

def topic(
self,
name: str,
Expand Down Expand Up @@ -579,6 +578,24 @@ def get_producer(self) -> Producer:
extra_config=self._config.producer_extra_config,
)

def _get_rowconsumer(
self, on_error: Optional[ConsumerErrorCallback] = None
) -> RowConsumer:
"""
Create a RowConsumer using the application config
Used to create the application consumer as well as the sources consumers
"""

return RowConsumer(
broker_address=self._config.broker_address,
consumer_group=self._config.consumer_group,
auto_offset_reset=self._config.auto_offset_reset,
auto_commit_enable=False, # Disable auto commit and manage commits manually
extra_config=self._config.consumer_extra_config,
on_error=on_error,
)

def get_consumer(self, auto_commit_enable: bool = True) -> Consumer:
"""
Create and return a pre-configured Consumer instance.
Expand Down Expand Up @@ -650,9 +667,13 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic
if not topic:
topic = self._topic_manager.register(source.default_topic())

producer = self._get_rowproducer(transactional=False)
source.configure(topic, producer)
self._source_manager.register(source)
self._source_manager.register(
source,
topic,
self._get_rowproducer(transactional=False),
self._get_rowconsumer(),
self._get_topic_manager(),
)
return topic

def run(self, dataframe: Optional[StreamingDataFrame] = None):
Expand Down Expand Up @@ -879,7 +900,8 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
stored_offsets = [
offset
for offset in (
store_tp.get_processed_offset() for store_tp in store_partitions
store_tp.get_processed_offset()
for store_tp in store_partitions.values()
)
if offset is not None
]
Expand Down
60 changes: 38 additions & 22 deletions quixstreams/models/topics/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TopicManager:
_max_topic_name_len = 255

_groupby_extra_config_imports_defaults = {"retention.bytes", "retention.ms"}
_changelog_extra_config_defaults = {"cleanup.policy": "compact"}
_changelog_extra_config_override = {"cleanup.policy": "compact"}
_changelog_extra_config_imports_defaults = {"retention.bytes", "retention.ms"}

def __init__(
Expand All @@ -67,7 +67,7 @@ def __init__(
self._consumer_group = consumer_group
self._topics: Dict[str, Topic] = {}
self._repartition_topics: Dict[str, Topic] = {}
self._changelog_topics: Dict[str, Dict[str, Topic]] = {}
self._changelog_topics: Dict[Optional[str], Dict[str, Topic]] = {}
self._timeout = timeout
self._create_timeout = create_timeout
self._auto_create_topics = auto_create_topics
Expand Down Expand Up @@ -101,7 +101,7 @@ def repartition_topics(self) -> Dict[str, Topic]:
return self._repartition_topics

@property
def changelog_topics(self) -> Dict[str, Dict[str, Topic]]:
def changelog_topics(self) -> Dict[Optional[str], Dict[str, Topic]]:
"""
Note: `Topic`s are the changelogs.
Expand Down Expand Up @@ -152,7 +152,7 @@ def _format_nested_name(self, topic_name: str) -> str:
def _internal_name(
self,
topic_type: Literal["changelog", "repartition"],
topic_name: str,
topic_name: Optional[str],
suffix: str,
) -> str:
"""
Expand All @@ -163,13 +163,19 @@ def _internal_name(
The internal format is <{TYPE}__{GROUP}--{NAME}--{SUFFIX}>
:param topic_type: topic type, added as prefix (changelog, repartition)
:param topic_name: name of consumed topic (app input topic)
:param topic_name: name of consumed topic, if exist (app input topic)
:param suffix: a unique descriptor related to topic type, added as suffix
:return: formatted topic name
"""
nested_name = self._format_nested_name(topic_name)
return f"{topic_type}__{'--'.join([self._consumer_group, nested_name, suffix])}"

if topic_name is None:
parts = [self._consumer_group, suffix]
else:
nested_name = self._format_nested_name(topic_name)
parts = [self._consumer_group, nested_name, suffix]

return f"{topic_type}__{'--'.join(parts)}"

def _create_topics(
self, topics: List[Topic], timeout: float, create_timeout: float
Expand Down Expand Up @@ -341,13 +347,14 @@ def repartition_topic(

def changelog_topic(
self,
topic_name: str,
topic_name: Optional[str],
store_name: str,
config: Optional[TopicConfig] = None,
timeout: Optional[float] = None,
) -> Topic:
"""
Performs all the logic necessary to generate a changelog topic based on a
"source topic" (aka input/consumed topic).
Performs all the logic necessary to generate a changelog topic based on an
optional "source topic" (aka input/consumed topic).
Its main goal is to ensure partition counts of the to-be generated changelog
match the source topic, and ensure the changelog topic is compacted. Also
Expand All @@ -366,23 +373,32 @@ def changelog_topic(
> NOTE: normally contain any prefixes added by TopicManager.topic()
:param store_name: name of the store this changelog belongs to
(default, rolling10s, etc.)
:param config: the changelog topic configuration. Default to `topic_name` configuration or TopicManager default
:param timeout: config lookup timeout (seconds); Default 30
:return: `Topic` object (which is also stored on the TopicManager)
"""
if config is None:
if topic_name is None:
config = self.topic_config(
num_partitions=self.default_num_partitions,
replication_factor=self.default_replication_factor,
)
else:
source_topic_config = self._get_source_topic_config(
topic_name,
extras_imports=self._changelog_extra_config_imports_defaults,
timeout=timeout if timeout is not None else self._timeout,
)

source_topic_config = self._get_source_topic_config(
topic_name,
extras_imports=self._changelog_extra_config_imports_defaults,
timeout=timeout if timeout is not None else self._timeout,
)
source_topic_config.extra_config.update(self._changelog_extra_config_defaults)
config = self.topic_config(
num_partitions=source_topic_config.num_partitions,
replication_factor=source_topic_config.replication_factor,
extra_config=source_topic_config.extra_config,
)

changelog_config = self.topic_config(
num_partitions=source_topic_config.num_partitions,
replication_factor=source_topic_config.replication_factor,
extra_config=source_topic_config.extra_config,
)
# always override some default configuration
config.extra_config.update(self._changelog_extra_config_override)

topic = self._finalize_topic(
Topic(
Expand All @@ -391,7 +407,7 @@ def changelog_topic(
value_serializer="bytes",
key_deserializer="bytes",
value_deserializer="bytes",
config=changelog_config,
config=config,
)
)
self._changelog_topics.setdefault(topic_name, {})[store_name] = topic
Expand Down
10 changes: 9 additions & 1 deletion quixstreams/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from .base import BaseSource, Source, SourceException, SourceManager, multiprocessing
from .base import (
BaseSource,
Source,
SourceException,
SourceManager,
StatefulSource,
multiprocessing,
)
from .core.csv import CSVSource
from .core.kafka import KafkaReplicatorSource, QuixEnvironmentSource

Expand All @@ -11,4 +18,5 @@
"Source",
"SourceException",
"SourceManager",
"StatefulSource",
]
3 changes: 2 additions & 1 deletion quixstreams/sources/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from .exceptions import SourceException
from .manager import SourceManager
from .multiprocessing import multiprocessing
from .source import BaseSource, Source
from .source import BaseSource, Source, StatefulSource

__all__ = (
"Source",
"BaseSource",
"multiprocessing",
"SourceManager",
"SourceException",
"StatefulSource",
)
Loading

0 comments on commit 38a4fd6

Please sign in to comment.