Skip to content

Commit

Permalink
Propagate timeout to _retrieve_offsets
Browse files Browse the repository at this point in the history
The default timeout of _retrieve_offsets is infinite, this makes the
Consumer block indefinitely even if poll was called with a timeout.

Propagate the timeout from the Consumer to the Fetcher operation,
removing some of the timeout as more and more sub-operation
consume the total allowed timeout.
  • Loading branch information
kmichel-aiven committed Oct 5, 2024
1 parent e0ab864 commit 3a9434b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
16 changes: 9 additions & 7 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,18 @@ def send_fetches(self):
self._clean_done_fetch_futures()
return futures

def reset_offsets_if_needed(self, partitions):
def reset_offsets_if_needed(self, partitions, timeout_ms=float("inf")):
"""Lookup and set offsets for any partitions which are awaiting an
explicit reset.
Arguments:
partitions (set of TopicPartitions): the partitions to reset
"""
end_time = time.time() + timeout_ms / 1000
for tp in partitions:
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=1000 * (end_time - time.time()))

def _clean_done_fetch_futures(self):
while True:
Expand All @@ -156,7 +157,7 @@ def in_flight_fetches(self):
self._clean_done_fetch_futures()
return bool(self._fetch_futures)

def update_fetch_positions(self, partitions):
def update_fetch_positions(self, partitions, timeout_ms=float("inf")):
"""Update the fetch positions for the provided partitions.
Arguments:
Expand All @@ -167,6 +168,7 @@ def update_fetch_positions(self, partitions):
partition and no reset policy is available
"""
# reset the fetch position to the committed position
end_time = time.time() + timeout_ms / 1000
for tp in partitions:
if not self._subscriptions.is_assigned(tp):
log.warning("partition %s is not assigned - skipping offset"
Expand All @@ -178,12 +180,12 @@ def update_fetch_positions(self, partitions):
continue

if self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=1000 * (end_time - time.time()))
elif self._subscriptions.assignment[tp].committed is None:
# there's no committed position, so we need to reset with the
# default strategy
self._subscriptions.need_offset_reset(tp)
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=1000 * (end_time - time.time()))
else:
committed = self._subscriptions.assignment[tp].committed.offset
log.debug("Resetting offset for partition %s to the committed"
Expand Down Expand Up @@ -215,7 +217,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
offsets[tp] = offsets[tp][0]
return offsets

def _reset_offset(self, partition):
def _reset_offset(self, partition, timeout_ms):
"""Reset offsets for the given partition using the offset reset strategy.
Arguments:
Expand All @@ -234,7 +236,7 @@ def _reset_offset(self, partition):

log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp})
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms)

if partition in offsets:
offset = offsets[partition][0]
Expand Down
19 changes: 12 additions & 7 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,14 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
Returns:
dict: Map of topic to list of records (may be empty).
"""
end_time = time.time() + timeout_ms / 1000
self._coordinator.poll()

# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions())
update_timeout_ms = 1000 * (end_time - time.time())
self._update_fetch_positions(self._subscription.missing_fetch_positions(), update_timeout_ms)

# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
Expand Down Expand Up @@ -714,7 +716,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
return records

def position(self, partition):
def position(self, partition, timeout_ms=float("inf")):
"""Get the offset of the next record that will be fetched
Arguments:
Expand All @@ -728,7 +730,7 @@ def position(self, partition):
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
offset = self._subscription.assignment[partition].position
if offset is None:
self._update_fetch_positions([partition])
self._update_fetch_positions([partition], timeout_ms)
offset = self._subscription.assignment[partition].position
return offset

Expand Down Expand Up @@ -1087,7 +1089,7 @@ def _use_consumer_group(self):
return False
return True

def _update_fetch_positions(self, partitions):
def _update_fetch_positions(self, partitions, timeout_ms):
"""Set the fetch position to the committed position (if there is one)
or reset it using the offset reset policy the user has configured.
Expand All @@ -1099,12 +1101,13 @@ def _update_fetch_positions(self, partitions):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
end_time = time.time() + timeout_ms / 1000
# Lookup any positions for partitions which are awaiting reset (which may be the
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
# this check first to avoid an unnecessary lookup of committed offsets (which
# typically occurs when the user is manually assigning partitions and managing
# their own offsets).
self._fetcher.reset_offsets_if_needed(partitions)
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms)

if not self._subscription.has_all_fetch_positions():
# if we still don't have offsets for all partitions, then we should either seek
Expand All @@ -1115,7 +1118,8 @@ def _update_fetch_positions(self, partitions):
self._coordinator.refresh_committed_offsets_if_needed()

# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
update_timeout_ms = 1000 * (end_time - time.time())
self._fetcher.update_fetch_positions(partitions, update_timeout_ms)

def _message_generator_v2(self):
timeout_ms = 1000 * (self._consumer_timeout - time.time())
Expand Down Expand Up @@ -1145,7 +1149,8 @@ def _message_generator(self):
# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
update_timeout_ms = 1000 * (self._consumer_timeout - time.time())
self._update_fetch_positions(partitions, update_timeout_ms)

poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms'])
self._client.poll(timeout_ms=poll_ms)
Expand Down
9 changes: 5 additions & 4 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from collections import OrderedDict
from unittest.mock import ANY
import itertools
import time

Expand Down Expand Up @@ -114,11 +115,11 @@ def test_update_fetch_positions(fetcher, topic, mocker):
# partition needs reset, no committed offset
fetcher._subscriptions.need_offset_reset(partition)
fetcher._subscriptions.assignment[partition].awaiting_reset = False
fetcher.update_fetch_positions([partition])
fetcher._reset_offset.assert_called_with(partition)
fetcher.update_fetch_positions([partition], timeout_ms=1234)
fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY)
assert fetcher._subscriptions.assignment[partition].awaiting_reset is True
fetcher.update_fetch_positions([partition])
fetcher._reset_offset.assert_called_with(partition)
fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY)

# partition needs reset, has committed offset
fetcher._reset_offset.reset_mock()
Expand All @@ -139,7 +140,7 @@ def test__reset_offset(fetcher, mocker):
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')

mocked.return_value = {tp: (1001, None)}
fetcher._reset_offset(tp)
fetcher._reset_offset(tp, timeout_ms=1234)
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
assert fetcher._subscriptions.assignment[tp].position == 1001

Expand Down

0 comments on commit 3a9434b

Please sign in to comment.