From 3391ec86e285f8c93e6564554a25dbae11267db4 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Tue, 25 Jun 2024 16:59:36 -0400 Subject: [PATCH] further clean up of recovery partition revoking --- quixstreams/state/recovery.py | 40 ++++++++++++++++------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/quixstreams/state/recovery.py b/quixstreams/state/recovery.py index e7ccbe450..911cb7d2d 100644 --- a/quixstreams/state/recovery.py +++ b/quixstreams/state/recovery.py @@ -298,7 +298,7 @@ def do_recovery(self): After, will resume normal `Application` processing. """ - logger.info("Beginning the recovery process...") + logger.info("Beginning recovery check...") self._running = True self._consumer.resume( [ @@ -312,7 +312,7 @@ def do_recovery(self): self._running = False self._consumer.resume(self._consumer.assignment()) else: - logger.debug("Recovery interrupted; stopping.") + logger.debug("Recovery process interrupted; stopping.") def _generate_recovery_partitions( self, @@ -366,7 +366,7 @@ def assign_partition( for rp in recovery_partitions: changelog_name, partition = rp.changelog_name, rp.partition_num if rp.needs_recovery_check: - logger.debug(f"Performing recovery check for {rp}") + logger.debug(f"Adding a recovery check for {rp}") self._recovery_partitions.setdefault(partition, {})[changelog_name] = rp # note: technically it should be rp.offset + 1, but to remain backwards # compatible with