Skip to content

Commit

Permalink
further clean up of recovery partition revoking
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Jun 25, 2024
1 parent 026d034 commit 3391ec8
Showing 1 changed file with 18 additions and 22 deletions.
40 changes: 18 additions & 22 deletions quixstreams/state/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def do_recovery(self):
After, will resume normal `Application` processing.
"""
logger.info("Beginning the recovery process...")
logger.info("Beginning recovery check...")
self._running = True
self._consumer.resume(
[
Expand All @@ -312,7 +312,7 @@ def do_recovery(self):
self._running = False
self._consumer.resume(self._consumer.assignment())
else:
logger.debug("Recovery interrupted; stopping.")
logger.debug("Recovery process interrupted; stopping.")

def _generate_recovery_partitions(
self,
Expand Down Expand Up @@ -366,7 +366,7 @@ def assign_partition(
for rp in recovery_partitions:
changelog_name, partition = rp.changelog_name, rp.partition_num
if rp.needs_recovery_check:
logger.debug(f"Performing recovery check for {rp}")
logger.debug(f"Adding a recovery check for {rp}")
self._recovery_partitions.setdefault(partition, {})[changelog_name] = rp
# note: technically it should be rp.offset + 1, but to remain backwards
# compatible with <v2.7 +1 ALOS offsetting, it remains rp.offset.
Expand Down Expand Up @@ -396,28 +396,25 @@ def assign_partition(
# and wait for Application to start recovery
self._consumer.pause(self._consumer.assignment())

def _revoke_recovery_partitions(
self,
recovery_partitions: List[RecoveryPartition],
partition_num: int,
):
def _revoke_recovery_partitions(self, recovery_partitions: List[RecoveryPartition]):
"""
For revoking a specific set of RecoveryPartitions.
Also cleans up any remnant empty dictionary references for its partition number.
Revokes all provided RecoveryPartition.
Also cleans up any remnant empty dictionary references.
:param recovery_partitions: a list of `RecoveryPartition`
:param partition_num: partition number
"""
partition_nums = {rp.partition_num for rp in recovery_partitions}
self._consumer.incremental_unassign(
[
ConfluentPartition(p.changelog_name, p.partition_num)
for p in recovery_partitions
ConfluentPartition(rp.changelog_name, rp.partition_num)
for rp in recovery_partitions
]
)
for rp in recovery_partitions:
self._recovery_partitions[partition_num].pop(rp.changelog_name)
if not self._recovery_partitions[partition_num]:
del self._recovery_partitions[partition_num]
del self._recovery_partitions[rp.partition_num][rp.changelog_name]
for partition_num in partition_nums:
if not self._recovery_partitions[partition_num]:
del self._recovery_partitions[partition_num]
if self.recovering:
logger.debug("Resuming recovery process...")

Expand All @@ -429,23 +426,22 @@ def revoke_partition(self, partition_num: int):
"""
if changelogs := self._recovery_partitions.get(partition_num, {}):
logger.debug(f"Stopping recovery for {changelogs}")
self._revoke_recovery_partitions(list(changelogs.values()), partition_num)
self._revoke_recovery_partitions(list(changelogs.values()))

def _update_recovery_status(self):
revokes = {}
rp_revokes = []
for rp in dict_values(self._recovery_partitions):
position = self._consumer.position(
[ConfluentPartition(rp.changelog_name, rp.partition_num)]
)[0].offset
rp.set_recovery_consume_position(position)
if rp.finished_recovery_check:
revokes.setdefault(rp.partition_num, []).append(rp)
rp_revokes.append(rp)
if rp.had_recovery_changes:
logger.info(f"Finished recovering {rp}")
logger.info(f"Recovery successful for {rp}")
else:
logger.debug(f"No recovery was required for {rp}")
for partition_num, rp_list in revokes.items():
self._revoke_recovery_partitions(rp_list, partition_num)
self._revoke_recovery_partitions(rp_revokes)

def _recovery_loop(self):
"""
Expand Down

0 comments on commit 3391ec8

Please sign in to comment.