Skip to content

Commit

Permalink
Propagate timeout to _retrieve_offsets
Browse files Browse the repository at this point in the history
The default timeout of _retrieve_offsets is infinite, this makes the
Consumer block indefinitely even if poll was called with a timeout.

Propagate the timeout from the Consumer to the Fetcher operation,
removing some of the timeout as more and more sub-operation
consume the total allowed timeout.

During a `poll` operation, the timeout for positions and offsets is
kept separate because poll will always keep running and accumulate
messages until the end of the timeout. We want to be able to poll
messages for a short time (or for zero time, the default) while allowing
some delay when retrieving the positions.
  • Loading branch information
kmichel-aiven committed Oct 5, 2024
1 parent e0ab864 commit d8dbea2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
16 changes: 9 additions & 7 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,18 @@ def send_fetches(self):
self._clean_done_fetch_futures()
return futures

def reset_offsets_if_needed(self, partitions):
def reset_offsets_if_needed(self, partitions, timeout_ms=float("inf")):
"""Lookup and set offsets for any partitions which are awaiting an
explicit reset.
Arguments:
partitions (set of TopicPartitions): the partitions to reset
"""
end_time = time.time() + timeout_ms / 1000
for tp in partitions:
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time())))

def _clean_done_fetch_futures(self):
while True:
Expand All @@ -156,7 +157,7 @@ def in_flight_fetches(self):
self._clean_done_fetch_futures()
return bool(self._fetch_futures)

def update_fetch_positions(self, partitions):
def update_fetch_positions(self, partitions, timeout_ms=float("inf")):
"""Update the fetch positions for the provided partitions.
Arguments:
Expand All @@ -167,6 +168,7 @@ def update_fetch_positions(self, partitions):
partition and no reset policy is available
"""
# reset the fetch position to the committed position
end_time = time.time() + timeout_ms / 1000
for tp in partitions:
if not self._subscriptions.is_assigned(tp):
log.warning("partition %s is not assigned - skipping offset"
Expand All @@ -178,12 +180,12 @@ def update_fetch_positions(self, partitions):
continue

if self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time())))
elif self._subscriptions.assignment[tp].committed is None:
# there's no committed position, so we need to reset with the
# default strategy
self._subscriptions.need_offset_reset(tp)
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time())))
else:
committed = self._subscriptions.assignment[tp].committed.offset
log.debug("Resetting offset for partition %s to the committed"
Expand Down Expand Up @@ -215,7 +217,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
offsets[tp] = offsets[tp][0]
return offsets

def _reset_offset(self, partition):
def _reset_offset(self, partition, timeout_ms):
"""Reset offsets for the given partition using the offset reset strategy.
Arguments:
Expand All @@ -234,7 +236,7 @@ def _reset_offset(self, partition):

log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp})
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms)

if partition in offsets:
offset = offsets[partition][0]
Expand Down
23 changes: 13 additions & 10 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def partitions_for_topic(self, topic):
partitions = cluster.partitions_for_topic(topic)
return partitions

def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
def poll(self, timeout_ms=0, max_records=None, update_offsets=True, *, positions_timeout_ms=float("inf")):
"""Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
Expand Down Expand Up @@ -656,7 +656,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
start = time.time()
remaining = timeout_ms
while not self._closed:
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
records = self._poll_once(remaining, positions_timeout_ms, max_records, update_offsets=update_offsets)
if records:
return records

Expand All @@ -668,7 +668,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):

return {}

def _poll_once(self, timeout_ms, max_records, update_offsets=True):
def _poll_once(self, timeout_ms, positions_timeout_ms, max_records, update_offsets=True):
"""Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.
Expand All @@ -683,7 +683,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions())
self._update_fetch_positions(self._subscription.missing_fetch_positions(), positions_timeout_ms)

# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
Expand Down Expand Up @@ -714,7 +714,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
return records

def position(self, partition):
def position(self, partition, timeout_ms=float("inf")):
"""Get the offset of the next record that will be fetched
Arguments:
Expand All @@ -728,7 +728,7 @@ def position(self, partition):
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
offset = self._subscription.assignment[partition].position
if offset is None:
self._update_fetch_positions([partition])
self._update_fetch_positions([partition], timeout_ms)
offset = self._subscription.assignment[partition].position
return offset

Expand Down Expand Up @@ -1087,7 +1087,7 @@ def _use_consumer_group(self):
return False
return True

def _update_fetch_positions(self, partitions):
def _update_fetch_positions(self, partitions, timeout_ms):
"""Set the fetch position to the committed position (if there is one)
or reset it using the offset reset policy the user has configured.
Expand All @@ -1099,12 +1099,13 @@ def _update_fetch_positions(self, partitions):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
end_time = time.time() + timeout_ms / 1000
# Lookup any positions for partitions which are awaiting reset (which may be the
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
# this check first to avoid an unnecessary lookup of committed offsets (which
# typically occurs when the user is manually assigning partitions and managing
# their own offsets).
self._fetcher.reset_offsets_if_needed(partitions)
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms)

if not self._subscription.has_all_fetch_positions():
# if we still don't have offsets for all partitions, then we should either seek
Expand All @@ -1115,7 +1116,8 @@ def _update_fetch_positions(self, partitions):
self._coordinator.refresh_committed_offsets_if_needed()

# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
update_timeout_ms = max(0.0, 1000 * (end_time - time.time()))
self._fetcher.update_fetch_positions(partitions, update_timeout_ms)

def _message_generator_v2(self):
timeout_ms = 1000 * (self._consumer_timeout - time.time())
Expand Down Expand Up @@ -1145,7 +1147,8 @@ def _message_generator(self):
# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
update_timeout_ms = max(0.0, 1000 * (self._consumer_timeout - time.time()))
self._update_fetch_positions(partitions, update_timeout_ms)

poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms'])
self._client.poll(timeout_ms=poll_ms)
Expand Down
9 changes: 5 additions & 4 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from collections import OrderedDict
from unittest.mock import ANY
import itertools
import time

Expand Down Expand Up @@ -114,11 +115,11 @@ def test_update_fetch_positions(fetcher, topic, mocker):
# partition needs reset, no committed offset
fetcher._subscriptions.need_offset_reset(partition)
fetcher._subscriptions.assignment[partition].awaiting_reset = False
fetcher.update_fetch_positions([partition])
fetcher._reset_offset.assert_called_with(partition)
fetcher.update_fetch_positions([partition], timeout_ms=1234)
fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY)
assert fetcher._subscriptions.assignment[partition].awaiting_reset is True
fetcher.update_fetch_positions([partition])
fetcher._reset_offset.assert_called_with(partition)
fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY)

# partition needs reset, has committed offset
fetcher._reset_offset.reset_mock()
Expand All @@ -139,7 +140,7 @@ def test__reset_offset(fetcher, mocker):
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')

mocked.return_value = {tp: (1001, None)}
fetcher._reset_offset(tp)
fetcher._reset_offset(tp, timeout_ms=1234)
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
assert fetcher._subscriptions.assignment[tp].position == 1001

Expand Down

0 comments on commit d8dbea2

Please sign in to comment.