diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7ff9daf7b..f5ed6f8bf 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -131,17 +131,18 @@ def send_fetches(self): self._clean_done_fetch_futures() return futures - def reset_offsets_if_needed(self, partitions): + def reset_offsets_if_needed(self, partitions, timeout_ms=float("inf")): """Lookup and set offsets for any partitions which are awaiting an explicit reset. Arguments: partitions (set of TopicPartitions): the partitions to reset """ + end_time = time.time() + timeout_ms / 1000 for tp in partitions: # TODO: If there are several offsets to reset, we could submit offset requests in parallel if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp): - self._reset_offset(tp) + self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time()))) def _clean_done_fetch_futures(self): while True: @@ -156,7 +157,7 @@ def in_flight_fetches(self): self._clean_done_fetch_futures() return bool(self._fetch_futures) - def update_fetch_positions(self, partitions): + def update_fetch_positions(self, partitions, timeout_ms=float("inf")): """Update the fetch positions for the provided partitions. Arguments: @@ -167,6 +168,7 @@ def update_fetch_positions(self, partitions): partition and no reset policy is available """ # reset the fetch position to the committed position + end_time = time.time() + timeout_ms / 1000 for tp in partitions: if not self._subscriptions.is_assigned(tp): log.warning("partition %s is not assigned - skipping offset" @@ -178,12 +180,12 @@ def update_fetch_positions(self, partitions): continue if self._subscriptions.is_offset_reset_needed(tp): - self._reset_offset(tp) + self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time()))) elif self._subscriptions.assignment[tp].committed is None: # there's no committed position, so we need to reset with the # default strategy self._subscriptions.need_offset_reset(tp) - self._reset_offset(tp) + self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time()))) else: committed = self._subscriptions.assignment[tp].committed.offset log.debug("Resetting offset for partition %s to the committed" @@ -215,7 +217,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): offsets[tp] = offsets[tp][0] return offsets - def _reset_offset(self, partition): + def _reset_offset(self, partition, timeout_ms): """Reset offsets for the given partition using the offset reset strategy. Arguments: @@ -234,7 +236,7 @@ def _reset_offset(self, partition): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offsets = self._retrieve_offsets({partition: timestamp}) + offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms) if partition in offsets: offset = offsets[partition][0] diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 969969932..72546788a 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -615,7 +615,7 @@ def partitions_for_topic(self, topic): partitions = cluster.partitions_for_topic(topic) return partitions - def poll(self, timeout_ms=0, max_records=None, update_offsets=True): + def poll(self, timeout_ms=0, max_records=None, update_offsets=True, *, positions_timeout_ms=float("inf")): """Fetch data from assigned topics / partitions. Records are fetched and returned in batches by topic-partition. @@ -656,7 +656,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True): start = time.time() remaining = timeout_ms while not self._closed: - records = self._poll_once(remaining, max_records, update_offsets=update_offsets) + records = self._poll_once(remaining, positions_timeout_ms, max_records, update_offsets=update_offsets) if records: return records @@ -668,7 +668,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True): return {} - def _poll_once(self, timeout_ms, max_records, update_offsets=True): + def _poll_once(self, timeout_ms, positions_timeout_ms, max_records, update_offsets=True): """Do one round of polling. In addition to checking for new data, this does any needed heart-beating, auto-commits, and offset updates. @@ -683,7 +683,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for if not self._subscription.has_all_fetch_positions(): - self._update_fetch_positions(self._subscription.missing_fetch_positions()) + self._update_fetch_positions(self._subscription.missing_fetch_positions(), positions_timeout_ms) # If data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately @@ -714,7 +714,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) return records - def position(self, partition): + def position(self, partition, timeout_ms=float("inf")): """Get the offset of the next record that will be fetched Arguments: @@ -728,7 +728,7 @@ def position(self, partition): assert self._subscription.is_assigned(partition), 'Partition is not assigned' offset = self._subscription.assignment[partition].position if offset is None: - self._update_fetch_positions([partition]) + self._update_fetch_positions([partition], timeout_ms) offset = self._subscription.assignment[partition].position return offset @@ -1087,7 +1087,7 @@ def _use_consumer_group(self): return False return True - def _update_fetch_positions(self, partitions): + def _update_fetch_positions(self, partitions, timeout_ms): """Set the fetch position to the committed position (if there is one) or reset it using the offset reset policy the user has configured. @@ -1099,12 +1099,13 @@ def _update_fetch_positions(self, partitions): NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined. """ + end_time = time.time() + timeout_ms / 1000 # Lookup any positions for partitions which are awaiting reset (which may be the # case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do # this check first to avoid an unnecessary lookup of committed offsets (which # typically occurs when the user is manually assigning partitions and managing # their own offsets). - self._fetcher.reset_offsets_if_needed(partitions) + self._fetcher.reset_offsets_if_needed(partitions, timeout_ms) if not self._subscription.has_all_fetch_positions(): # if we still don't have offsets for all partitions, then we should either seek @@ -1115,7 +1116,8 @@ def _update_fetch_positions(self, partitions): self._coordinator.refresh_committed_offsets_if_needed() # Then, do any offset lookups in case some positions are not known - self._fetcher.update_fetch_positions(partitions) + update_timeout_ms = max(0.0, 1000 * (end_time - time.time())) + self._fetcher.update_fetch_positions(partitions, update_timeout_ms) def _message_generator_v2(self): timeout_ms = 1000 * (self._consumer_timeout - time.time()) @@ -1145,7 +1147,8 @@ def _message_generator(self): # Fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): partitions = self._subscription.missing_fetch_positions() - self._update_fetch_positions(partitions) + update_timeout_ms = max(0.0, 1000 * (self._consumer_timeout - time.time())) + self._update_fetch_positions(partitions, update_timeout_ms) poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms']) self._client.poll(timeout_ms=poll_ms) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 697f8be1f..9b584f860 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -4,6 +4,7 @@ import pytest from collections import OrderedDict +from unittest.mock import ANY import itertools import time @@ -114,11 +115,11 @@ def test_update_fetch_positions(fetcher, topic, mocker): # partition needs reset, no committed offset fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False - fetcher.update_fetch_positions([partition]) - fetcher._reset_offset.assert_called_with(partition) + fetcher.update_fetch_positions([partition], timeout_ms=1234) + fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY) assert fetcher._subscriptions.assignment[partition].awaiting_reset is True fetcher.update_fetch_positions([partition]) - fetcher._reset_offset.assert_called_with(partition) + fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY) # partition needs reset, has committed offset fetcher._reset_offset.reset_mock() @@ -139,7 +140,7 @@ def test__reset_offset(fetcher, mocker): mocked = mocker.patch.object(fetcher, '_retrieve_offsets') mocked.return_value = {tp: (1001, None)} - fetcher._reset_offset(tp) + fetcher._reset_offset(tp, timeout_ms=1234) assert not fetcher._subscriptions.assignment[tp].awaiting_reset assert fetcher._subscriptions.assignment[tp].position == 1001