diff --git a/pyproject.toml b/pyproject.toml index 4f4cd5f62..c6beab872 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,6 +114,7 @@ ignore_missing_imports = true module = [ "quixstreams.sinks.community.*", "quixstreams.sources.community.*", + "quixstreams.models.serializers.quix.*", ] ignore_errors = true @@ -121,7 +122,6 @@ ignore_errors = true module = [ "quixstreams.core.*", "quixstreams.dataframe.*", - "quixstreams.models.*", "quixstreams.platforms.*", "quixstreams.rowproducer.*" ] diff --git a/quixstreams/error_callbacks.py b/quixstreams/error_callbacks.py index d231be89b..927515563 100644 --- a/quixstreams/error_callbacks.py +++ b/quixstreams/error_callbacks.py @@ -1,18 +1,18 @@ import logging from typing import Callable, Optional -from .models import ConfluentKafkaMessageProto, Row +from .models import RawConfluentKafkaMessageProto, Row ProcessingErrorCallback = Callable[[Exception, Optional[Row], logging.Logger], bool] ConsumerErrorCallback = Callable[ - [Exception, Optional[ConfluentKafkaMessageProto], logging.Logger], bool + [Exception, Optional[RawConfluentKafkaMessageProto], logging.Logger], bool ] ProducerErrorCallback = Callable[[Exception, Optional[Row], logging.Logger], bool] def default_on_consumer_error( exc: Exception, - message: Optional[ConfluentKafkaMessageProto], + message: Optional[RawConfluentKafkaMessageProto], logger: logging.Logger, ): topic, partition, offset = None, None, None diff --git a/quixstreams/kafka/consumer.py b/quixstreams/kafka/consumer.py index 1f5f6842b..bdaece6fd 100644 --- a/quixstreams/kafka/consumer.py +++ b/quixstreams/kafka/consumer.py @@ -1,7 +1,7 @@ import functools import logging import typing -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import Any, Callable, List, Optional, Tuple, Union, cast from confluent_kafka import ( Consumer as ConfluentConsumer, @@ -14,8 +14,13 @@ from confluent_kafka.admin import ClusterMetadata, GroupMetadata from quixstreams.exceptions import KafkaPartitionError, PartitionAssignmentError +from quixstreams.models.types import ( + RawConfluentKafkaMessageProto, + SuccessfulConfluentKafkaMessageProto, +) from .configuration import ConnectionConfig +from .exceptions import KafkaConsumerException __all__ = ( "BaseConsumer", @@ -65,6 +70,14 @@ def wrapper(*args, **kwargs): return wrapper +def raise_for_msg_error( + msg: RawConfluentKafkaMessageProto, +) -> SuccessfulConfluentKafkaMessageProto: + if msg.error(): + raise KafkaConsumerException(error=msg.error()) + return cast(SuccessfulConfluentKafkaMessageProto, msg) + + class BaseConsumer: def __init__( self, @@ -129,7 +142,9 @@ def __init__( } self._inner_consumer: Optional[ConfluentConsumer] = None - def poll(self, timeout: Optional[float] = None) -> Optional[Message]: + def poll( + self, timeout: Optional[float] = None + ) -> Optional[RawConfluentKafkaMessageProto]: """ Consumes a single message, calls callbacks and returns events. diff --git a/quixstreams/models/rows.py b/quixstreams/models/rows.py index 7633a9a33..a618ee7d0 100644 --- a/quixstreams/models/rows.py +++ b/quixstreams/models/rows.py @@ -19,7 +19,7 @@ def __init__( key: Optional[Any], timestamp: int, context: MessageContext, - headers: KafkaHeaders = None, + headers: KafkaHeaders, ): self.value = value self.key = key diff --git a/quixstreams/models/serializers/avro.py b/quixstreams/models/serializers/avro.py index 712b64a03..01c27f441 100644 --- a/quixstreams/models/serializers/avro.py +++ b/quixstreams/models/serializers/avro.py @@ -148,6 +148,12 @@ def __init__( ) super().__init__() + + if schema is None and schema_registry_client_config is None: + raise TypeError( + "One of `schema` or `schema_registry_client_config` is required" + ) + self._schema = parse_schema(schema) if schema else None self._reader_schema = parse_schema(reader_schema) if reader_schema else None self._return_record_name = return_record_name @@ -174,17 +180,19 @@ def __call__( return self._schema_registry_deserializer(value, ctx) except (SchemaRegistryError, _SerializationError, EOFError) as exc: raise SerializationError(str(exc)) from exc + elif self._schema is not None: + try: + return schemaless_reader( # type: ignore + BytesIO(value), + self._schema, + reader_schema=self._reader_schema, + return_record_name=self._return_record_name, + return_record_name_override=self._return_record_name_override, + return_named_type=self._return_named_type, + return_named_type_override=self._return_named_type_override, + handle_unicode_errors=self._handle_unicode_errors, + ) + except EOFError as exc: + raise SerializationError(str(exc)) from exc - try: - return schemaless_reader( - BytesIO(value), - self._schema, - reader_schema=self._reader_schema, - return_record_name=self._return_record_name, - return_record_name_override=self._return_record_name_override, - return_named_type=self._return_named_type, - return_named_type_override=self._return_named_type_override, - handle_unicode_errors=self._handle_unicode_errors, - ) - except EOFError as exc: - raise SerializationError(str(exc)) from exc + raise SerializationError("no schema found") diff --git a/quixstreams/models/serializers/base.py b/quixstreams/models/serializers/base.py index 64a49f414..02dde1ba7 100644 --- a/quixstreams/models/serializers/base.py +++ b/quixstreams/models/serializers/base.py @@ -9,7 +9,7 @@ ) from typing_extensions import Literal, TypeAlias -from ..types import HeadersMapping, KafkaHeaders +from ..types import Headers, HeadersMapping, KafkaHeaders __all__ = ( "SerializationContext", @@ -33,8 +33,8 @@ class SerializationContext(_SerializationContext): def __init__( self, topic: str, - field: MessageField, - headers: KafkaHeaders = None, + field: str, + headers: Union[KafkaHeaders, Headers] = None, ) -> None: self.topic = topic self.field = field diff --git a/quixstreams/models/serializers/json.py b/quixstreams/models/serializers/json.py index 1646217a2..7ce507783 100644 --- a/quixstreams/models/serializers/json.py +++ b/quixstreams/models/serializers/json.py @@ -1,5 +1,5 @@ import json -from typing import Any, Callable, Iterable, Mapping, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Iterable, Mapping, Optional, Union from confluent_kafka.schema_registry import SchemaRegistryClient, SchemaRegistryError from confluent_kafka.schema_registry.json_schema import ( @@ -10,7 +10,6 @@ ) from confluent_kafka.serialization import SerializationError as _SerializationError from jsonschema import Draft202012Validator, ValidationError -from jsonschema.protocols import Validator from quixstreams.utils.json import ( dumps as default_dumps, @@ -26,6 +25,9 @@ SchemaRegistrySerializationConfig, ) +if TYPE_CHECKING: + from jsonschema.validators import _Validator + __all__ = ("JSONSerializer", "JSONDeserializer") @@ -34,7 +36,7 @@ def __init__( self, dumps: Callable[[Any], Union[str, bytes]] = default_dumps, schema: Optional[Mapping] = None, - validator: Optional[Validator] = None, + validator: Optional["_Validator"] = None, schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None, schema_registry_serialization_config: Optional[ SchemaRegistrySerializationConfig @@ -121,7 +123,7 @@ def __init__( self, loads: Callable[[Union[bytes, bytearray]], Any] = default_loads, schema: Optional[Mapping] = None, - validator: Optional[Validator] = None, + validator: Optional["_Validator"] = None, schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None, ): """ diff --git a/quixstreams/models/serializers/protobuf.py b/quixstreams/models/serializers/protobuf.py index bc5c5646e..65f6c3ba0 100644 --- a/quixstreams/models/serializers/protobuf.py +++ b/quixstreams/models/serializers/protobuf.py @@ -1,4 +1,4 @@ -from typing import Dict, Iterable, Mapping, Optional, Union +from typing import Dict, Iterable, Mapping, Optional, Type, Union from confluent_kafka.schema_registry import SchemaRegistryClient, SchemaRegistryError from confluent_kafka.schema_registry.protobuf import ( @@ -24,7 +24,7 @@ class ProtobufSerializer(Serializer): def __init__( self, - msg_type: Message, + msg_type: Type[Message], deterministic: bool = False, ignore_unknown_fields: bool = False, schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None, @@ -110,7 +110,7 @@ def __call__( class ProtobufDeserializer(Deserializer): def __init__( self, - msg_type: Message, + msg_type: Type[Message], use_integers_for_enums: bool = False, preserving_proto_field_name: bool = False, to_dict: bool = True, diff --git a/quixstreams/models/topics/admin.py b/quixstreams/models/topics/admin.py index 7719a05a8..b332c660c 100644 --- a/quixstreams/models/topics/admin.py +++ b/quixstreams/models/topics/admin.py @@ -7,10 +7,7 @@ from confluent_kafka.admin import ( AdminClient, ConfigResource, - KafkaException, # type: ignore -) -from confluent_kafka.admin import ( - NewTopic as ConfluentTopic, # type: ignore + KafkaException, ) from confluent_kafka.admin import ( TopicMetadata as ConfluentTopicMetadata, @@ -26,25 +23,6 @@ __all__ = ("TopicAdmin",) -def convert_topic_list(topics: List[Topic]) -> List[ConfluentTopic]: - """ - Converts `Topic`s to `ConfluentTopic`s as required for Confluent's - `AdminClient.create_topic()`. - - :param topics: list of `Topic`s - :return: list of confluent_kafka `ConfluentTopic`s - """ - return [ - ConfluentTopic( - topic=topic.name, - num_partitions=topic.config.num_partitions, - replication_factor=topic.config.replication_factor, - config=topic.config.extra_config, - ) - for topic in topics - ] - - def confluent_topic_config(topic: str) -> ConfigResource: return ConfigResource(2, topic) @@ -207,12 +185,12 @@ def create_topics( for topic in topics_to_create: logger.info( f'Creating a new topic "{topic.name}" ' - f'with config: "{topic.config.as_dict()}"' + f'with config: "{topic.config.as_dict() if topic.config is not None else {}}"' ) self._finalize_create( self.admin_client.create_topics( - convert_topic_list(topics_to_create), + [topic.as_newtopic() for topic in topics_to_create], request_timeout=timeout, ), finalize_timeout=finalize_timeout, diff --git a/quixstreams/models/topics/manager.py b/quixstreams/models/topics/manager.py index fbaba21d5..348134e37 100644 --- a/quixstreams/models/topics/manager.py +++ b/quixstreams/models/topics/manager.py @@ -40,7 +40,7 @@ class TopicManager: # Default topic params default_num_partitions = 1 default_replication_factor = 1 - default_extra_config = {} + default_extra_config: dict[str, str] = {} # Max topic name length for the new topics _max_topic_name_len = 255 @@ -207,9 +207,15 @@ def _get_source_topic_config( :return: a TopicConfig """ + topic_config = self._admin.inspect_topics([topic_name], timeout=timeout)[ topic_name - ] or deepcopy(self._non_changelog_topics[topic_name].config) + ] + if topic_config is None and topic_name in self._non_changelog_topics: + topic_config = deepcopy(self._non_changelog_topics[topic_name].config) + + if topic_config is None: + raise RuntimeError(f"No configuration can be found for topic {topic_name}") # Copy only certain configuration values from original topic if extras_imports: @@ -475,10 +481,17 @@ def validate_all_topics(self, timeout: Optional[float] = None): for source_name in self._non_changelog_topics.keys(): source_cfg = actual_configs[source_name] + if source_cfg is None: + raise TopicNotFoundError(f"Topic {source_name} not found on the broker") + # For any changelog topics, validate the amount of partitions and # replication factor match with the source topic for changelog in self.changelog_topics.get(source_name, {}).values(): changelog_cfg = actual_configs[changelog.name] + if changelog_cfg is None: + raise TopicNotFoundError( + f"Topic {changelog_cfg} not found on the broker" + ) if changelog_cfg.num_partitions != source_cfg.num_partitions: raise TopicConfigurationMismatch( diff --git a/quixstreams/models/topics/topic.py b/quixstreams/models/topics/topic.py index f07f7d56e..0a9d22589 100644 --- a/quixstreams/models/topics/topic.py +++ b/quixstreams/models/topics/topic.py @@ -2,6 +2,8 @@ import logging from typing import Any, Callable, List, Optional, Union +from confluent_kafka.admin import NewTopic + from quixstreams.models.messagecontext import MessageContext from quixstreams.models.messages import KafkaMessage from quixstreams.models.rows import Row @@ -23,9 +25,9 @@ from quixstreams.models.timestamps import TimestampType from quixstreams.models.topics.utils import merge_headers from quixstreams.models.types import ( - ConfluentKafkaMessageProto, Headers, KafkaHeaders, + SuccessfulConfluentKafkaMessageProto, ) __all__ = ("Topic", "TopicConfig", "TimestampExtractor") @@ -48,7 +50,7 @@ class TopicConfig: num_partitions: int replication_factor: int - extra_config: dict = dataclasses.field(default_factory=dict) + extra_config: dict[str, str] = dataclasses.field(default_factory=dict) def as_dict(self): return dataclasses.asdict(self) @@ -137,6 +139,23 @@ def __clone__( timestamp_extractor=timestamp_extractor or self._timestamp_extractor, ) + def as_newtopic(self) -> NewTopic: + """ + Converts `Topic`s to `NewTopic`s as required for Confluent's + `AdminClient.create_topic()`. + + :return: confluent_kafka `NewTopic`s + """ + if self.config is None: + return NewTopic(topic=self.name) + + return NewTopic( + topic=self.name, + num_partitions=self.config.num_partitions, + replication_factor=self.config.replication_factor, + config=self.config.extra_config, + ) + def row_serialize(self, row: Row, key: Any) -> KafkaMessage: """ Serialize Row to a Kafka message structure @@ -178,7 +197,7 @@ def row_serialize(self, row: Row, key: Any) -> KafkaMessage: ) def row_deserialize( - self, message: ConfluentKafkaMessageProto + self, message: SuccessfulConfluentKafkaMessageProto ) -> Union[Row, List[Row], None]: """ Deserialize incoming Kafka message to a Row. @@ -223,7 +242,7 @@ def row_deserialize( message.partition(), message.offset(), ) - return + return None timestamp_type, timestamp_ms = message.timestamp() message_context = MessageContext( @@ -234,7 +253,7 @@ def row_deserialize( leader_epoch=message.leader_epoch(), ) - if self._value_deserializer.split_values: + if value_deserialized is not None and self._value_deserializer.split_values: # The expected value from this serializer is Iterable and each item # should be processed as a separate message rows = [] @@ -299,7 +318,7 @@ def serialize( timestamp=timestamp_ms, ) - def deserialize(self, message: ConfluentKafkaMessageProto): + def deserialize(self, message: SuccessfulConfluentKafkaMessageProto): if (key := message.key()) is not None: if self._key_deserializer: key_ctx = SerializationContext( diff --git a/quixstreams/models/topics/utils.py b/quixstreams/models/topics/utils.py index d3d8452cf..4064a4740 100644 --- a/quixstreams/models/topics/utils.py +++ b/quixstreams/models/topics/utils.py @@ -1,5 +1,8 @@ +from typing import List + from quixstreams.models.types import ( HeadersMapping, + HeadersTuple, HeadersTuples, KafkaHeaders, ) @@ -25,12 +28,12 @@ def merge_headers(original: KafkaHeaders, other: HeadersMapping) -> HeadersTuple # Make a shallow copy of "other" to pop keys from it other = other.copy() - new_headers = [] + new_headers: List[HeadersTuple] = [] # Iterate over original headers and put them to a new list with values from # the "other" dict if the key is found for header, value in original: if header in other: - value = other.pop(header) + continue new_headers.append((header, value)) # Append the new headers to the list new_headers.extend(other.items()) diff --git a/quixstreams/models/types.py b/quixstreams/models/types.py index f34816521..97e666744 100644 --- a/quixstreams/models/types.py +++ b/quixstreams/models/types.py @@ -1,23 +1,58 @@ -from typing import List, Mapping, Optional, Sequence, Tuple, Union +from typing import List, Optional, Sequence, Tuple, Union +from confluent_kafka import KafkaError from typing_extensions import Protocol MessageKey = Optional[Union[str, bytes]] -MessageValue = Union[str, bytes] +MessageValue = Optional[Union[str, bytes]] HeadersValue = Union[str, bytes] -HeadersMapping = Mapping[str, HeadersValue] -HeadersTuples = Sequence[Tuple[str, HeadersValue]] +HeadersMapping = dict[str, HeadersValue] +HeadersTuple = Tuple[str, HeadersValue] +HeadersTuples = Sequence[HeadersTuple] Headers = Union[HeadersTuples, HeadersMapping] KafkaHeaders = Optional[List[Tuple[str, bytes]]] -class ConfluentKafkaMessageProto(Protocol): +class RawConfluentKafkaMessageProto(Protocol): """ An interface of `confluent_kafka.Message`. - Use it to not depend on exact implementation and simplify testing. + Use it to not depend on exact implementation and simplify testing and type hints. + + Instances of `confluent_kafka.Message` cannot be directly created from Python, + see https://github.com/confluentinc/confluent-kafka-python/issues/1535. + """ + + def headers(self, *args, **kwargs) -> KafkaHeaders: ... + + def key(self, *args, **kwargs) -> MessageKey: ... + + def offset(self, *args, **kwargs) -> Optional[int]: ... + + def partition(self, *args, **kwargs) -> Optional[int]: ... + + def timestamp(self, *args, **kwargs) -> Tuple[int, int]: ... + + def topic(self, *args, **kwargs) -> str: ... + + def value(self, *args, **kwargs) -> MessageValue: ... + + def latency(self, *args, **kwargs) -> Optional[float]: ... + + def leader_epoch(self, *args, **kwargs) -> Optional[int]: ... + + def error(self) -> Optional[KafkaError]: ... + + def __len__(self) -> int: ... + + +class SuccessfulConfluentKafkaMessageProto(Protocol): + """ + An interface of `confluent_kafka.Message` for successful message (messages that don't include an error) + + Use it to not depend on exact implementation and simplify testing and type hints. Instances of `confluent_kafka.Message` cannot be directly created from Python, see https://github.com/confluentinc/confluent-kafka-python/issues/1535. @@ -30,16 +65,18 @@ def key(self, *args, **kwargs) -> MessageKey: ... def offset(self, *args, **kwargs) -> int: ... - def partition(self, *args, **kwargs) -> Optional[int]: ... + def partition(self, *args, **kwargs) -> int: ... def timestamp(self, *args, **kwargs) -> Tuple[int, int]: ... def topic(self, *args, **kwargs) -> str: ... - def value(self, *args, **kwargs) -> Optional[MessageValue]: ... + def value(self, *args, **kwargs) -> MessageValue: ... def latency(self, *args, **kwargs) -> Optional[float]: ... def leader_epoch(self, *args, **kwargs) -> Optional[int]: ... + def error(self) -> None: ... + def __len__(self) -> int: ... diff --git a/quixstreams/rowconsumer.py b/quixstreams/rowconsumer.py index a9bea2016..734f6c923 100644 --- a/quixstreams/rowconsumer.py +++ b/quixstreams/rowconsumer.py @@ -6,8 +6,7 @@ from .error_callbacks import ConsumerErrorCallback, default_on_consumer_error from .exceptions import PartitionAssignmentError from .kafka import AutoOffsetReset, BaseConsumer, ConnectionConfig -from .kafka.consumer import RebalancingCallback -from .kafka.exceptions import KafkaConsumerException +from .kafka.consumer import RebalancingCallback, raise_for_msg_error from .models import Row, Topic from .models.serializers.exceptions import IgnoreMessage @@ -130,9 +129,7 @@ def poll_row(self, timeout: Optional[float] = None) -> Union[Row, List[Row], Non topic_name = msg.topic() try: - if msg.error(): - raise KafkaConsumerException(error=msg.error()) - + msg = raise_for_msg_error(msg) topic = self._topics[topic_name] row_or_rows = topic.row_deserialize(message=msg) diff --git a/quixstreams/state/base/partition.py b/quixstreams/state/base/partition.py index 568bf7da9..b5999acab 100644 --- a/quixstreams/state/base/partition.py +++ b/quixstreams/state/base/partition.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Literal, Optional, Union -from quixstreams.models import ConfluentKafkaMessageProto +from quixstreams.models import SuccessfulConfluentKafkaMessageProto from quixstreams.state.exceptions import ColumnFamilyHeaderMissing from quixstreams.state.metadata import ( CHANGELOG_CF_MESSAGE_HEADER, @@ -47,7 +47,7 @@ def close(self): ... @abstractmethod def _recover_from_changelog_message( self, - changelog_message: ConfluentKafkaMessageProto, + changelog_message: SuccessfulConfluentKafkaMessageProto, cf_name: str, processed_offset: Optional[int], committed_offset: int, @@ -121,7 +121,9 @@ def begin(self) -> PartitionTransaction: ) def recover_from_changelog_message( - self, changelog_message: ConfluentKafkaMessageProto, committed_offset: int + self, + changelog_message: SuccessfulConfluentKafkaMessageProto, + committed_offset: int, ) -> None: """ Updates state from a given changelog message. diff --git a/quixstreams/state/memory/partition.py b/quixstreams/state/memory/partition.py index bc1c9d5b2..25641abb3 100644 --- a/quixstreams/state/memory/partition.py +++ b/quixstreams/state/memory/partition.py @@ -2,7 +2,7 @@ import logging from typing import Any, Dict, Literal, Optional, Union -from quixstreams.models import ConfluentKafkaMessageProto +from quixstreams.models import RawConfluentKafkaMessageProto from quixstreams.state.base import PartitionTransactionCache, StorePartition from quixstreams.state.exceptions import ColumnFamilyDoesNotExist from quixstreams.state.metadata import METADATA_CF_NAME, Marker @@ -95,7 +95,7 @@ def write( def _recover_from_changelog_message( self, - changelog_message: ConfluentKafkaMessageProto, + changelog_message: RawConfluentKafkaMessageProto, cf_name: str, processed_offset: Optional[int], committed_offset: int, diff --git a/quixstreams/state/recovery.py b/quixstreams/state/recovery.py index 490b409b9..66e066556 100644 --- a/quixstreams/state/recovery.py +++ b/quixstreams/state/recovery.py @@ -4,7 +4,8 @@ from confluent_kafka import TopicPartition as ConfluentPartition from quixstreams.kafka import BaseConsumer -from quixstreams.models import ConfluentKafkaMessageProto, Topic +from quixstreams.kafka.consumer import raise_for_msg_error +from quixstreams.models import SuccessfulConfluentKafkaMessageProto, Topic from quixstreams.models.topics import TopicConfig, TopicManager from quixstreams.models.types import Headers from quixstreams.rowproducer import RowProducer @@ -106,7 +107,7 @@ def had_recovery_changes(self) -> bool: return self._initial_offset != self.offset def recover_from_changelog_message( - self, changelog_message: ConfluentKafkaMessageProto + self, changelog_message: SuccessfulConfluentKafkaMessageProto ): """ Recover the StorePartition using a message read from its respective changelog. @@ -432,7 +433,7 @@ def _update_recovery_status(self): logger.debug(f"No recovery was required for {rp}") self._revoke_recovery_partitions(rp_revokes) - def _recovery_loop(self): + def _recovery_loop(self) -> None: """ Perform the recovery loop, which continues updating state with changelog messages until recovery is "complete" (i.e. no assigned `RecoveryPartition`s). @@ -443,6 +444,7 @@ def _recovery_loop(self): if (msg := self._consumer.poll(1)) is None: self._update_recovery_status() else: + msg = raise_for_msg_error(msg) rp = self._recovery_partitions[msg.partition()][msg.topic()] rp.recover_from_changelog_message(changelog_message=msg) diff --git a/quixstreams/state/rocksdb/partition.py b/quixstreams/state/rocksdb/partition.py index 1a2fee173..191687749 100644 --- a/quixstreams/state/rocksdb/partition.py +++ b/quixstreams/state/rocksdb/partition.py @@ -4,7 +4,7 @@ from rocksdict import AccessType, ColumnFamily, Rdict, WriteBatch -from quixstreams.models import ConfluentKafkaMessageProto +from quixstreams.models import SuccessfulConfluentKafkaMessageProto from quixstreams.state.base import PartitionTransactionCache, StorePartition from quixstreams.state.exceptions import ColumnFamilyDoesNotExist from quixstreams.state.metadata import METADATA_CF_NAME, Marker @@ -76,7 +76,7 @@ def _changelog_recover_flush(self, changelog_offset: int, batch: WriteBatch): def _recover_from_changelog_message( self, - changelog_message: ConfluentKafkaMessageProto, + changelog_message: SuccessfulConfluentKafkaMessageProto, cf_name: str, processed_offset: Optional[int], committed_offset: int, diff --git a/tests/utils.py b/tests/utils.py index db1176183..b78b64c9c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -99,6 +99,9 @@ def latency(self, *args, **kwargs) -> Optional[float]: def leader_epoch(self, *args, **kwargs) -> Optional[int]: return self._leader_epoch + def error(self) -> None: + return None + def __len__(self) -> int: return len(self._value)