Skip to content

Commit

Permalink
Log warning if topic offset is behind the stored offset
Browse files Browse the repository at this point in the history
  • Loading branch information
daniil-quix committed Oct 26, 2023
1 parent 31eb2f2 commit 95a3983
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 2 deletions.
26 changes: 24 additions & 2 deletions src/StreamingDataFrames/streamingdataframes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

__all__ = ("Application",)


logger = logging.getLogger(__name__)
MessageProcessedCallback = Callable[[str, int, int], None]

Expand Down Expand Up @@ -426,7 +425,30 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
if self._state_manager.stores:
logger.info(f"Rebalancing: assigning state store partitions")
for tp in topic_partitions:
self._state_manager.on_partition_assign(tp)
# Assign store partitions
store_partitions = self._state_manager.on_partition_assign(tp)

# Check if the latest committed offset >= stored offset
# Otherwise, the re-processed messages might use already updated
# state, which can lead to inconsistent outputs
stored_offsets = [
offset
for offset in (
store_tp.get_processed_offset() for store_tp in store_partitions
)
if offset is not None
]
min_stored_offset = min(stored_offsets) + 1 if stored_offsets else None
if min_stored_offset is not None:
tp_committed = self._consumer.committed([tp], timeout=30)[0]
if min_stored_offset > tp_committed.offset:
logger.warning(
f'Warning: offset "{tp_committed.offset}" '
f"for topic partition "
f'"{tp_committed.topic}[{tp_committed.partition}]" '
f'is behind the stored offset "{min_stored_offset}". '
f"It may lead to distortions in produced data."
)

def _on_revoke(self, _, topic_partitions: List[TopicPartition]):
"""
Expand Down
61 changes: 61 additions & 0 deletions src/StreamingDataFrames/tests/test_dataframes/test_app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import time
import uuid
from concurrent.futures import Future
Expand Down Expand Up @@ -620,3 +621,63 @@ def fail(_):
with store.start_partition_transaction(partition=0) as tx:
with tx.with_prefix(message_key):
assert tx.get("total") == total_consumed.result()

def test_on_assign_topic_offset_behind_warning(
self,
app_factory,
producer,
topic_factory,
executor,
state_manager_factory,
tmp_path,
caplog,
):
consumer_group = str(uuid.uuid4())
state_dir = (tmp_path / "state").absolute()
app = app_factory(
consumer_group=consumer_group,
auto_offset_reset="earliest",
state_dir=state_dir,
)

topic_in_name, _ = topic_factory()
topic_in = app.topic(topic_in_name, value_deserializer=JSONDeserializer())

# Set the store partition offset to 9999
state_manager = state_manager_factory(
group_id=consumer_group, state_dir=state_dir
)
with state_manager:
state_manager.register_store(topic_in.name, "default")
state_partitions = state_manager.on_partition_assign(
TopicPartitionStub(topic=topic_in.name, partition=0)
)
with state_manager.start_store_transaction(
topic=topic_in.name, partition=0, offset=9999
):
tx = state_manager.get_store_transaction()
tx.set("key", "value")
assert state_partitions[0].get_processed_offset() == 9999

# Define some stateful function so the App assigns store partitions
done = Future()

def count(_, state: State):
done.set_result(True)

df = app.dataframe(topics_in=[topic_in])
df.apply(count, stateful=True)

# Produce a message to the topic and flush
data = {"key": b"key", "value": dumps({"key": "value"})}
with producer:
producer.produce(topic_in_name, **data)

# Stop app when the future is resolved
executor.submit(_stop_app_on_future, app, done, 10.0)
# Run the application
with patch.object(logging.getLoggerClass(), "warning") as mock:
app.run(df)

assert mock.called
assert "is behind the stored offset" in mock.call_args[0][0]

0 comments on commit 95a3983

Please sign in to comment.