Skip to content

Commit

Permalink
Add StateStoreManager and connect state to Application (#204)
Browse files Browse the repository at this point in the history
- Add StateStoreManager to connect state and Application
- Update the RocksDB storage backend
- Always propagate errors from rebalancing callbacks
  • Loading branch information
daniil-quix authored Oct 23, 2023
1 parent 99da535 commit 2e96c51
Show file tree
Hide file tree
Showing 26 changed files with 1,524 additions and 169 deletions.
103 changes: 90 additions & 13 deletions src/StreamingDataFrames/streamingdataframes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
from .platforms.quix import QuixKafkaConfigsBuilder
from .rowconsumer import RowConsumer
from .rowproducer import RowProducer
from .state import StateStoreManager
from .state.rocksdb import RocksDBOptionsType

__all__ = ("Application",)


logger = logging.getLogger(__name__)
MessageProcessedCallback = Callable[[str, int, int], None]

Expand All @@ -35,6 +38,8 @@ def __init__(
partitioner: Partitioner = "murmur2",
consumer_extra_config: Optional[dict] = None,
producer_extra_config: Optional[dict] = None,
state_dir: Optional[str] = None,
rocksdb_options: Optional[RocksDBOptionsType] = None,
on_consumer_error: Optional[ConsumerErrorCallback] = None,
on_processing_error: Optional[ProcessingErrorCallback] = None,
on_producer_error: Optional[ProducerErrorCallback] = None,
Expand Down Expand Up @@ -75,6 +80,11 @@ def __init__(
will be passed to `confluent_kafka.Consumer` as is.
:param producer_extra_config: A dictionary with additional options that
will be passed to `confluent_kafka.Producer` as is.
:param state_dir: path to the application state directory, optional.
It should be passed if the application uses stateful operations, otherwise
the exception will be raised.
:param rocksdb_options: RocksDB options.
If `None`, the default options will be used.
:param consumer_poll_timeout: timeout for `RowConsumer.poll()`. Default - 1.0s
:param producer_poll_timeout: timeout for `RowProducer.poll()`. Default - 0s.
:param on_message_processed: a callback triggered when message is successfully
Expand Down Expand Up @@ -114,6 +124,13 @@ def __init__(
self._on_processing_error = on_processing_error or default_on_processing_error
self._on_message_processed = on_message_processed
self._quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None
self._state_manager: Optional[StateStoreManager] = None
if state_dir:
self._state_manager = StateStoreManager(
group_id=consumer_group,
state_dir=state_dir,
rocksdb_options=rocksdb_options,
)

def set_quix_config_builder(self, config_builder: QuixKafkaConfigsBuilder):
self._quix_config_builder = config_builder
Expand All @@ -128,6 +145,8 @@ def Quix(
partitioner: Partitioner = "murmur2",
consumer_extra_config: Optional[dict] = None,
producer_extra_config: Optional[dict] = None,
state_dir: Optional[str] = None,
rocksdb_options: Optional[RocksDBOptionsType] = None,
on_consumer_error: Optional[ConsumerErrorCallback] = None,
on_processing_error: Optional[ProcessingErrorCallback] = None,
on_producer_error: Optional[ProducerErrorCallback] = None,
Expand Down Expand Up @@ -163,6 +182,11 @@ def Quix(
will be passed to `confluent_kafka.Consumer` as is.
:param producer_extra_config: A dictionary with additional options that
will be passed to `confluent_kafka.Producer` as is.
:param state_dir: path to the application state directory, optional.
It should be passed if the application uses stateful operations, otherwise
the exception will be raised.
:param rocksdb_options: RocksDB options.
If `None`, the default options will be used.
:param consumer_poll_timeout: timeout for `RowConsumer.poll()`. Default - 1.0s
:param producer_poll_timeout: timeout for `RowProducer.poll()`. Default - 0s.
:param on_message_processed: a callback triggered when message is successfully
Expand Down Expand Up @@ -207,6 +231,8 @@ def Quix(
on_message_processed=on_message_processed,
consumer_poll_timeout=consumer_poll_timeout,
producer_poll_timeout=producer_poll_timeout,
state_dir=state_dir,
rocksdb_options=rocksdb_options,
)
# Inject Quix config builder to use it in other methods
app.set_quix_config_builder(quix_config_builder)
Expand Down Expand Up @@ -270,6 +296,10 @@ def stop(self):
"""
self._running = False

@property
def is_stateful(self) -> bool:
return bool(self._state_manager and self._state_manager.stores)

def run(
self,
dataframe: StreamingDataFrame,
Expand All @@ -279,11 +309,14 @@ def run(
:param dataframe: instance of `StreamingDataFrame`
"""
logger.debug("Starting application")
logger.info("Start processing of the streaming dataframe")

exit_stack = contextlib.ExitStack()
exit_stack.enter_context(self._producer)
exit_stack.enter_context(self._consumer)
if self.is_stateful:
exit_stack.enter_context(self._state_manager)

exit_stack.callback(
lambda *_: logger.debug("Closing Kafka consumers & producers")
)
Expand All @@ -293,7 +326,12 @@ def run(
logger.info("Start processing of the streaming dataframe")

# Subscribe to topics in Kafka and start polling
self._consumer.subscribe(list(dataframe.topics_in.values()))
self._consumer.subscribe(
list(dataframe.topics_in.values()),
on_assign=self._on_assign,
on_revoke=self._on_revoke,
on_lost=self._on_lost,
)
# Start polling Kafka for messages and callbacks
self._running = True
while self._running:
Expand All @@ -310,22 +348,32 @@ def run(
continue

first_row = rows[0]

for row in rows:
try:
dataframe.process(row=row)
except Exception as exc:
# TODO: This callback might be triggered because of Producer
# errors too because they happen within ".process()"
to_suppress = self._on_processing_error(exc, row, logger)
if not to_suppress:
raise

topic_name, partition, offset = (
first_row.topic,
first_row.partition,
first_row.offset,
)

if self.is_stateful:
# Store manager has stores registered, starting a transaction
state_transaction = self._state_manager.start_store_transaction(
topic=topic_name, partition=partition, offset=offset
)
else:
# The application is stateless, use noop transaction
state_transaction = contextlib.nullcontext()

with state_transaction:
for row in rows:
try:
dataframe.process(row=row)
except Exception as exc:
# TODO: This callback might be triggered because of Producer
# errors too because they happen within ".process()"
to_suppress = self._on_processing_error(exc, row, logger)
if not to_suppress:
raise

# Store the message offset after it's successfully processed
self._consumer.store_offsets(
offsets=[
Expand All @@ -341,3 +389,32 @@ def run(
self._on_message_processed(topic_name, partition, offset)

logger.info("Stop processing of the streaming dataframe")

def _on_assign(self, _, topic_partitions: List[TopicPartition]):
"""
Assign new topic partitions to consumer and state.
:param topic_partitions: list of `TopicPartition` from Kafka
"""
if self.is_stateful:
logger.info(f"Rebalancing: assigning state store partitions")
for tp in topic_partitions:
self._state_manager.on_partition_assign(tp)

def _on_revoke(self, _, topic_partitions: List[TopicPartition]):
"""
Revoke partitions from consumer and state
"""
if self.is_stateful:
logger.info(f"Rebalancing: revoking state store partitions")
for tp in topic_partitions:
self._state_manager.on_partition_revoke(tp)

def _on_lost(self, _, topic_partitions: List[TopicPartition]):
"""
Dropping lost partitions from consumer and state
"""
if self.is_stateful:
logger.info(f"Rebalancing: dropping lost state store partitions")
for tp in topic_partitions:
self._state_manager.on_partition_lost(tp)
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .base import *
from .assignment import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from .base import QuixException

__all__ = ("PartitionAssignmentError", "KafkaPartitionError")


class PartitionAssignmentError(QuixException):
"""
Error happened during partition rebalancing.
Raised from `on_assign`, `on_revoke` and `on_lost` callbacks
"""


class KafkaPartitionError(QuixException):
...
42 changes: 42 additions & 0 deletions src/StreamingDataFrames/streamingdataframes/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
)
from confluent_kafka.admin import ClusterMetadata

from streamingdataframes.exceptions import PartitionAssignmentError, KafkaPartitionError

__all__ = (
"Consumer",
"AutoOffsetReset",
Expand Down Expand Up @@ -47,6 +49,22 @@ def _default_on_commit_cb(
on_commit(error, partitions)


def _wrap_assignment_errors(func):
"""
Wrap exceptions raised from "on_assign", "on_revoke" and "on_lost" callbacks
into `PartitionAssignmentError`
"""

@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
raise PartitionAssignmentError("Error during partition assignment") from exc

return wrapper


class Consumer:
def __init__(
self,
Expand Down Expand Up @@ -158,30 +176,54 @@ def subscribe(
assigned or revoked.
"""

@_wrap_assignment_errors
def _on_assign_wrapper(consumer: Consumer, partitions: List[TopicPartition]):
for partition in partitions:
if partition.error:
raise KafkaPartitionError(
f"Kafka partition error "
f'(topic "{partition.topic}", '
f'partition "{partition.partition}"): '
f"{partition.error}"
)
logger.debug(
"Assigned partition to a consumer",
extra={"topic": partition.topic, "partition": partition.partition},
)
if on_assign is not None:
on_assign(consumer, partitions)

@_wrap_assignment_errors
def _on_revoke_wrapper(consumer: Consumer, partitions: List[TopicPartition]):
for partition in partitions:
if partition.error:
raise KafkaPartitionError(
f"Kafka partition error "
f'(topic "{partition.topic}", '
f'partition "{partition.partition}"): '
f"{partition.error}"
)
logger.debug(
"Revoking partition from a consumer",
extra={"topic": partition.topic, "partition": partition.partition},
)
if on_revoke is not None:
on_revoke(consumer, partitions)

@_wrap_assignment_errors
def _on_lost_wrapper(consumer: Consumer, partitions: List[TopicPartition]):
for partition in partitions:
logger.debug(
"Consumer lost a partition",
extra={"topic": partition.topic, "partition": partition.partition},
)
if partition.error:
raise KafkaPartitionError(
f"Kafka partition error "
f'(topic "{partition.topic}", '
f'partition "{partition.partition}"): '
f"{partition.error}"
)
if on_lost is not None:
on_lost(consumer, partitions)

Expand Down
5 changes: 4 additions & 1 deletion src/StreamingDataFrames/streamingdataframes/rowconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing_extensions import Protocol

from .error_callbacks import ConsumerErrorCallback, default_on_consumer_error
from .exceptions import QuixException
from .exceptions import QuixException, PartitionAssignmentError
from .kafka import Consumer, AssignmentStrategy, AutoOffsetReset
from .kafka.consumer import RebalancingCallback
from .models import Topic, Row
Expand Down Expand Up @@ -159,6 +159,9 @@ def poll_row(self, timeout: float = None) -> Union[Row, List[Row], None]:
"""
try:
msg = self.poll(timeout=timeout)
except PartitionAssignmentError:
# Always propagate errors happened during assignment
raise
except Exception as exc:
to_suppress = self._on_error(exc, None, logger)
if to_suppress:
Expand Down
2 changes: 2 additions & 0 deletions src/StreamingDataFrames/streamingdataframes/state/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .manager import *
from .types import *
13 changes: 13 additions & 0 deletions src/StreamingDataFrames/streamingdataframes/state/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from streamingdataframes.exceptions import QuixException


class PartitionNotAssignedError(QuixException):
...


class StoreNotRegisteredError(QuixException):
...


class InvalidStoreTransactionStateError(QuixException):
...
Loading

0 comments on commit 2e96c51

Please sign in to comment.