Skip to content

Commit

Permalink
fix issues after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Jun 25, 2024
1 parent 111c17c commit be43ce3
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 16 deletions.
2 changes: 2 additions & 0 deletions quixstreams/kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,15 @@ def __init__(
logger: logging.Logger = logger,
error_callback: Callable[[KafkaError], None] = _default_error_cb,
extra_config: Optional[dict] = None,
flush_timeout: Optional[int] = None,
transactional_id: str = str(uuid.uuid4()),
):
super().__init__(
broker_address=broker_address,
logger=logger,
error_callback=error_callback,
extra_config=extra_config,
flush_timeout=flush_timeout,
)
# remake config to avoid overriding anything in the Application's
# producer config, which is used in Application.get_producer().
Expand Down
11 changes: 5 additions & 6 deletions quixstreams/rowproducer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import Optional, Any, Union, Dict, Tuple, List, cast, Callable
import logging
from time import sleep
from typing import Optional, Any, Union, Dict, Tuple, List, Callable

from confluent_kafka import TopicPartition, KafkaException, KafkaError, Message
from confluent_kafka.admin import GroupMetadata

import logging

from quixstreams.exceptions import QuixException
from .error_callbacks import ProducerErrorCallback, default_on_producer_error
from .kafka.configuration import ConnectionConfig
Expand Down Expand Up @@ -55,14 +55,13 @@ def __init__(
self._producer = TransactionalProducer(
broker_address=broker_address,
extra_config=extra_config,
flush_timeout=flush_timeout

flush_timeout=flush_timeout,
)
else:
self._producer = Producer(
broker_address=broker_address,
extra_config=extra_config,
flush_timeout=flush_timeout
flush_timeout=flush_timeout,
)

self._on_error: Optional[ProducerErrorCallback] = (
Expand Down
5 changes: 3 additions & 2 deletions quixstreams/state/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def needs_recovery_check(self) -> bool:
This does NOT mean that anything actually requires recovering.
"""
is_behind = self.offset < self._changelog_highwater - 1
return (self._changelog_lowwater != self._changelog_highwater) and is_behind
has_consumable_offsets = self._changelog_lowwater != self._changelog_highwater
state_potentially_behind = self._changelog_highwater - 1 > self.offset
return has_consumable_offsets and state_potentially_behind

@property
def needs_offset_update(self) -> bool:
Expand Down
30 changes: 22 additions & 8 deletions tests/test_quixstreams/test_checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,12 @@ def test_commit_with_state_and_changelog_no_updates_success(

@pytest.mark.parametrize("exactly_once", [False, True])
def test_commit_no_offsets_stored_noop(
self, checkpoint_factory, state_manager_factory, topic_factory, rowproducer_mock, exactly_once
self,
checkpoint_factory,
state_manager_factory,
topic_factory,
rowproducer_mock,
exactly_once,
):
topic_name, _ = topic_factory()
consumer_mock = MagicMock(spec_set=Consumer)
Expand All @@ -232,20 +237,24 @@ def test_commit_no_offsets_stored_noop(
checkpoint.commit()

# The producer should not flush
assert not producer_mock.flush.call_count
assert not rowproducer_mock.flush.call_count

# Check nothing is committed
if exactly_once:
# transaction should also be aborted
assert rowproducer_mock.abort_transaction.call_count
assert not producer_mock.commit_transaction.call_count
assert not rowproducer_mock.commit_transaction.call_count
else:
assert not consumer_mock.commit.call_count
assert not rowproducer_mock.flush.call_count

@pytest.mark.parametrize("exactly_once", [False, True])
def test_commit_has_failed_transactions_fails(
self, checkpoint_factory, state_manager_factory, topic_factory, rowproducer_mock, exactly_once
self,
checkpoint_factory,
state_manager_factory,
topic_factory,
rowproducer_mock,
exactly_once,
):
consumer_mock = MagicMock(spec_set=Consumer)
state_manager = state_manager_factory(producer=rowproducer_mock)
Expand Down Expand Up @@ -283,12 +292,17 @@ def test_commit_has_failed_transactions_fails(
assert not rowproducer_mock.flush.call_count

# Check nothing is committed
assert not producer_mock.commit_transaction.call_count
assert not rowproducer_mock.commit_transaction.call_count
assert not consumer_mock.commit.call_count

@pytest.mark.parametrize("exactly_once", [False, True])
def test_commit_producer_flush_fails(
self, checkpoint_factory, state_manager_factory, topic_factory, rowproducer_mock, exactly_once
self,
checkpoint_factory,
state_manager_factory,
topic_factory,
rowproducer_mock,
exactly_once,
):
consumer_mock = MagicMock(spec_set=Consumer)
state_manager = state_manager_factory(producer=rowproducer_mock)
Expand All @@ -315,7 +329,7 @@ def test_commit_producer_flush_fails(
checkpoint.commit()

# Nothing should commit
assert not producer_mock.commit_transaction.call_count
assert not rowproducer_mock.commit_transaction.call_count
assert not consumer_mock.commit.call_count
# The transaction should remain prepared, but not completed
assert tx.prepared
Expand Down

0 comments on commit be43ce3

Please sign in to comment.