From 46fee86aa44355d9dbec0c4b7b7eace5ffff4397 Mon Sep 17 00:00:00 2001 From: wookievx Date: Sun, 4 Aug 2024 19:06:53 +0200 Subject: [PATCH 1/4] feat: Implemented RebalanceRevokeMode This is potential solution for https://github.com/fd4s/fs2-kafka/issues/1200. Allowing for opting-in for graceful revoke handling (waiting for all the streams to finish, which should imply all the commits as well) --- docs/src/main/mdoc/consumers.md | 43 +++++++ .../scala/fs2/kafka/ConsumerSettings.scala | 45 +++++++- .../main/scala/fs2/kafka/KafkaConsumer.scala | 107 +++++++++++++----- .../scala/fs2/kafka/RebalanceRevokeMode.scala | 42 +++++++ .../kafka/internal/KafkaConsumerActor.scala | 4 +- 5 files changed, 206 insertions(+), 35 deletions(-) create mode 100644 modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala diff --git a/docs/src/main/mdoc/consumers.md b/docs/src/main/mdoc/consumers.md index 2e04dcb9b..ff3859282 100644 --- a/docs/src/main/mdoc/consumers.md +++ b/docs/src/main/mdoc/consumers.md @@ -547,6 +547,49 @@ You may notice, that actual graceful shutdown implementation requires a decent a Also note, that even if you implement a graceful shutdown your application may fall with an error. And in this case, a graceful shutdown will not be invoked. It means that your application should be ready to an _at least once_ semantic even when a graceful shutdown is implemented. Or, if you need an _exactly once_ semantic, consider using [transactions](transactions.md). +### Graceful partition revoke + +In addition to graceful shutdown of hole consumer there is an option to configure your consumer to wait for the streams +to finish processing partition before "releasing" it. Behavior can be enabled via the following settings: +```scala mdoc:silent +object WithGracefulPartitionRevoke extends IOApp.Simple { + + val run: IO[Unit] = { + def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] = + IO(println(s"Processing record: $record")) + + def run(consumer: KafkaConsumer[IO, String, String]): IO[Unit] = { + consumer.subscribeTo("topic") >> consumer + .stream + .evalMap { msg => + processRecord(msg).as(msg.offset) + } + .through(commitBatchWithin(100, 15.seconds)) + .compile + .drain + } + + val consumerSettings = ConsumerSettings[IO, String, String] = + ConsumerSettings[IO, String, String] + .withRebalanceRevokeMode(RebalanceRevokeMode.Graceful) + .withSessionTimeout(2.seconds) + + KafkaConsumer + .resource(consumerSettings) + .use { consumer => + run(consumer) + } + } + +} +``` + +Please note that this setting does not guarantee that all the commits will be performed before partition is revoked and +that `session.timeout.ms` setting is set to lower value. Be aware that awaiting too long for partition processor +to finish will cause processing of the whole topic to be suspended. + +Awaiting for commits to complete might be implemented in the future. + [commitrecovery-default]: @API_BASE_URL@/CommitRecovery$.html#Default:fs2.kafka.CommitRecovery [committableconsumerrecord]: @API_BASE_URL@/CommittableConsumerRecord.html [committableoffset]: @API_BASE_URL@/CommittableOffset.html diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala index 5d3290797..92f847d18 100644 --- a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala @@ -8,14 +8,14 @@ package fs2.kafka import scala.concurrent.duration.* import scala.concurrent.ExecutionContext - import cats.effect.Resource import cats.Show import fs2.kafka.security.KafkaCredentialStore - import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.requests.OffsetFetchResponse +import scala.util.Try + /** * [[ConsumerSettings]] contain settings necessary to create a [[KafkaConsumer]]. At the very * least, this includes key and value deserializers.

@@ -151,6 +151,18 @@ sealed abstract class ConsumerSettings[F[_], K, V] { */ def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V] + + /** + * Returns value for property: + * + * {{{ + * ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + * }}} + * + * Returns a value as a [[FiniteDuration]] for convenience + */ + def sessionTimeout: FiniteDuration + /** * Returns a new [[ConsumerSettings]] instance with the specified session timeout. This is * equivalent to setting the following property using the [[withProperty]] function, except you @@ -373,6 +385,17 @@ sealed abstract class ConsumerSettings[F[_], K, V] { */ def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V] + /** + * One of two possible modes of operation for [[KafkaConsumer.partitionsMapStream]]. See [[RebalanceRevokeMode]] + * for detailed explanation of differences between them. + */ + def rebalanceRevokeMode: RebalanceRevokeMode + + /** + * Creates a new [[ConsumerSettings]] with the specified [[rebalanceRevokeMode]]. + */ + def withRebalanceRevokeMode(rebalanceRevokeMode: RebalanceRevokeMode): ConsumerSettings[F, K, V] + } object ConsumerSettings { @@ -388,7 +411,8 @@ object ConsumerSettings { override val pollTimeout: FiniteDuration, override val commitRecovery: CommitRecovery, override val recordMetadata: ConsumerRecord[K, V] => String, - override val maxPrefetchBatches: Int + override val maxPrefetchBatches: Int, + override val rebalanceRevokeMode: RebalanceRevokeMode ) extends ConsumerSettings[F, K, V] { override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] = @@ -422,6 +446,13 @@ object ConsumerSettings { override def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V] = withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString) + //need to use Try, to avoid separate implementation for scala 2.12 + override def sessionTimeout: FiniteDuration = + properties.get(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) + .flatMap(str => Try(str.toLong).toOption) + .map(_.millis) + .getOrElse(45000.millis) + override def withSessionTimeout(sessionTimeout: FiniteDuration): ConsumerSettings[F, K, V] = withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toMillis.toString) @@ -509,6 +540,11 @@ object ConsumerSettings { ): ConsumerSettings[F, K, V] = withProperties(credentialsStore.properties) + override def withRebalanceRevokeMode( + rebalanceRevokeMode: RebalanceRevokeMode + ): ConsumerSettings[F, K, V] = + copy(rebalanceRevokeMode = rebalanceRevokeMode) + override def toString: String = s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)" @@ -542,7 +578,8 @@ object ConsumerSettings { pollTimeout = 50.millis, commitRecovery = CommitRecovery.Default, recordMetadata = _ => OffsetFetchResponse.NO_METADATA, - maxPrefetchBatches = 2 + maxPrefetchBatches = 2, + rebalanceRevokeMode = RebalanceRevokeMode.Eager ) def apply[F[_], K, V]( diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 57eb13ca9..c52fe843b 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -7,13 +7,11 @@ package fs2.kafka import java.util - import scala.annotation.nowarn import scala.collection.immutable.SortedSet import scala.concurrent.duration.FiniteDuration import scala.util.matching.Regex - -import cats.{Foldable, Functor, Reducible} +import cats.{Applicative, Foldable, Functor, Reducible} import cats.data.{NonEmptySet, OptionT} import cats.effect.* import cats.effect.implicits.* @@ -28,7 +26,6 @@ import fs2.kafka.internal.converters.collection.* import fs2.kafka.internal.syntax.* import fs2.kafka.internal.KafkaConsumerActor.* import fs2.kafka.internal.LogEntry.{RevokedPreviousFetch, StoredFetch} - import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetAndTimestamp} import org.apache.kafka.common.{Metric, MetricName, PartitionInfo, TopicPartition} @@ -151,7 +148,8 @@ object KafkaConsumer { def partitionStream( streamId: StreamId, partition: TopicPartition, - assignmentRevoked: F[Unit] + assignmentRevoked: F[Unit], + signalCompletion: F[Unit] ): Stream[F, CommittableConsumerRecord[F, K, V]] = Stream.force { for { chunks <- chunkQueue @@ -238,7 +236,9 @@ object KafkaConsumer { .fromQueueNoneTerminated(chunks) .flatMap(Stream.chunk) .covary[F] - .onFinalize(dequeueDone.complete(()).void) + //Previously all the revoke logic was done on the polling stream, not handling stream + //with new `signalCompletion` there is possibility for more graceful consumer revoke logic + .onFinalize(dequeueDone.complete(()) *> signalCompletion) } } .flatten @@ -246,15 +246,15 @@ object KafkaConsumer { def enqueueAssignment( streamId: StreamId, - assigned: Map[TopicPartition, Deferred[F, Unit]], + assigned: Map[TopicPartition, AssignmentSignals[F]], partitionsMapQueue: PartitionsMapQueue ): F[Unit] = stopConsumingDeferred .tryGet .flatMap { case None => - val assignment: PartitionsMap = assigned.map { case (partition, finisher) => - partition -> partitionStream(streamId, partition, finisher.get) + val assignment = assigned.map { case (partition, assignmentSignals) => + partition -> partitionStream(streamId, partition, assignmentSignals.awaitStreamTerminationSignal, assignmentSignals.signalStreamFinished.void) } partitionsMapQueue.offer(Some(assignment)) case Some(()) => @@ -263,24 +263,26 @@ object KafkaConsumer { def onRebalance( streamId: StreamId, - assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]], + assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]], partitionsMapQueue: PartitionsMapQueue ): OnRebalance[F] = OnRebalance( onRevoked = revoked => { - for { + val finishSignals = for { finishers <- assignmentRef.modify(_.partition(entry => !revoked.contains(entry._1))) - _ <- finishers.toVector.traverse { case (_, finisher) => finisher.complete(()) } - } yield () + revokeFinishers <- finishers + .toVector + .traverse { + case (_, assignmentSignals) => + assignmentSignals.signalStreamToTerminate.as(assignmentSignals.awaitStreamFinishedSignal) + } + } yield revokeFinishers + + finishSignals.flatMap(revokes => revokes.sequence_) }, onAssigned = assignedPartitions => { for { - assignment <- assignedPartitions - .toVector - .traverse { partition => - Deferred[F, Unit].map(partition -> _) - } - .map(_.toMap) + assignment <- buildAssignment(assignedPartitions) _ <- assignmentRef.update(_ ++ assignment) _ <- enqueueAssignment( streamId = streamId, @@ -291,11 +293,29 @@ object KafkaConsumer { } ) + def buildAssignment( + assignedPartitions: SortedSet[TopicPartition] + ): F[Map[TopicPartition, AssignmentSignals[F]]] = { + assignedPartitions + .toVector + .traverse { partition => + settings.rebalanceRevokeMode match { + case RebalanceRevokeMode.EagerMode => + Deferred[F, Unit].map(streamFinisher => partition -> AssignmentSignals.eager(streamFinisher)) + case RebalanceRevokeMode.GracefulMode => + (Deferred[F, Unit], Deferred[F, Unit]).mapN { (streamFinisher, revokeFinisher) => + partition -> AssignmentSignals.graceful(streamFinisher, revokeFinisher) + } + } + } + .map(_.toMap) + } + def requestAssignment( streamId: StreamId, - assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]], + assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]], partitionsMapQueue: PartitionsMapQueue - ): F[Map[TopicPartition, Deferred[F, Unit]]] = { + ): F[Map[TopicPartition, AssignmentSignals[F]]] = { val assignment = this.assignment( Some( onRebalance( @@ -312,18 +332,13 @@ object KafkaConsumer { F.pure(Map.empty) case Right(assigned) => - assigned - .toVector - .traverse { partition => - Deferred[F, Unit].map(partition -> _) - } - .map(_.toMap) + buildAssignment(assigned) } } def initialEnqueue( streamId: StreamId, - assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]], + assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]], partitionsMapQueue: PartitionsMapQueue ): F[Unit] = for { @@ -343,7 +358,7 @@ object KafkaConsumer { partitionsMapQueue <- Stream.eval(Queue.unbounded[F, Option[PartitionsMap]]) streamId <- Stream.eval(streamIdRef.modify(n => (n + 1, n))) assignmentRef <- Stream - .eval(Ref[F].of(Map.empty[TopicPartition, Deferred[F, Unit]])) + .eval(Ref[F].of(Map.empty[TopicPartition, AssignmentSignals[F]])) _ <- Stream.eval( initialEnqueue( streamId, @@ -825,6 +840,40 @@ object KafkaConsumer { } + /** + * Utility class to provide clarity for internals. + * Goal is to make [[RebalanceRevokeMode]] transparent to the rest of implementation internals. + * @tparam F effect used + */ + private sealed abstract class AssignmentSignals[F[_]] { + def signalStreamToTerminate: F[Boolean] + def awaitStreamTerminationSignal: F[Unit] + def signalStreamFinished: F[Boolean] + def awaitStreamFinishedSignal: F[Unit] + } + + private object AssignmentSignals { + + def eager[F[_]: Applicative](streamFinisher: Deferred[F, Unit]): AssignmentSignals[F] = EagerSignals(streamFinisher) + def graceful[F[_]](streamFinisher: Deferred[F, Unit], revokeFinisher: Deferred[F, Unit]): AssignmentSignals[F] = + GracefulSignals[F](streamFinisher, revokeFinisher) + + final case class EagerSignals[F[_]: Applicative](streamFinisher: Deferred[F, Unit]) extends AssignmentSignals[F] { + override def signalStreamToTerminate: F[Boolean] = streamFinisher.complete(()) + override def awaitStreamTerminationSignal: F[Unit] = streamFinisher.get + override def signalStreamFinished: F[Boolean] = true.pure[F] + override def awaitStreamFinishedSignal: F[Unit] = ().pure[F] + } + + final case class GracefulSignals[F[_]](streamFinisher: Deferred[F, Unit], revokeFinisher: Deferred[F, Unit]) extends AssignmentSignals[F] { + override def signalStreamToTerminate: F[Boolean] = streamFinisher.complete(()) + override def awaitStreamTerminationSignal: F[Unit] = streamFinisher.get + override def signalStreamFinished: F[Boolean] = revokeFinisher.complete(()) + override def awaitStreamFinishedSignal: F[Unit] = revokeFinisher.get + } + + } + /* * Prevents the default `MkConsumer` instance from being implicitly available * to code defined in this object, ensuring factory methods require an instance diff --git a/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala b/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala new file mode 100644 index 000000000..e8e62a6d2 --- /dev/null +++ b/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala @@ -0,0 +1,42 @@ +package fs2.kafka + +/** + * The available options for [[ConsumerSettings#rebalanceRevokeMode]].

+ * + * Available options include:
+ * - [[RebalanceRevokeMode.Eager]] old behaviour, to release assigned partition as soon as possible,
+ * - [[RebalanceRevokeMode.Graceful]] modified behavior, waiting for configured amount of time: + * {{{org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG}}} It is guaranteed by kafka protocol + * that after that timeout old consumer will be marked as dead. + * + * Default mode is [[RebalanceRevokeMode.Eager]] which is exactly the same as old behavior and can be preferred + * if rebalance need to happen as quickly as possible and having multiple consumers working on a partition for a moment + * is not a problem. + * + * On the other hand if you want stricter guarantees about processing and attempt to wait for existing streams to finish + * processing messages before releasing partition choose [[RebalanceRevokeMode.Graceful]]. + * Because stream is signalled to be shutdown in-flight commits might be lost and some messages might be processed again + * after new assignment. + * + */ +sealed abstract class RebalanceRevokeMode + +object RebalanceRevokeMode { + + private[kafka] case object EagerMode extends RebalanceRevokeMode + + private[kafka] case object GracefulMode extends RebalanceRevokeMode + + /** + * Old behavior releasing partition as soon as all streams have messages dispatched and signalled termination + */ + val Eager: RebalanceRevokeMode = EagerMode + + /** + * Waiting for configured amount of time:
+ * [[ConsumerSettings#withMaxPollInterval]] * 5 or until all the partition streams finish processing (but not waiting + * for commits to conclude for that partition) + */ + val Graceful: RebalanceRevokeMode = GracefulMode + +} diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 6a1f4fe02..e2c049625 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -230,7 +230,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( res.completeWithRecords >> res.completeWithoutRecords >> res.removeRevokedRecords >> - onRevoked + onRevoked.timeout(settings.sessionTimeout) //just to be extra-safe timeout this revoke } } @@ -630,7 +630,7 @@ private[kafka] object KafkaConsumerActor { final case class OnRebalance[F[_]]( onAssigned: SortedSet[TopicPartition] => F[Unit], - onRevoked: SortedSet[TopicPartition] => F[Unit] + onRevoked: SortedSet[TopicPartition] => F[Unit], ) { override def toString: String = From 8b6753c308c254270566a17111997ff84f11b860 Mon Sep 17 00:00:00 2001 From: wookievx Date: Sun, 29 Sep 2024 20:01:01 +0200 Subject: [PATCH 2/4] feat: Adding unit-test for new revocation mode --- .../scala/fs2/kafka/KafkaConsumerSpec.scala | 62 ++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala index 74b9355c7..1fc2a800e 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala @@ -10,10 +10,9 @@ import scala.collection.immutable.SortedSet import scala.concurrent.duration.* import cats.data.NonEmptySet -import cats.effect.{Fiber, IO} +import cats.effect.{Clock, Fiber, IO, Ref} import cats.effect.std.Queue import cats.effect.unsafe.implicits.global -import cats.effect.Ref import cats.syntax.all.* import fs2.concurrent.SignallingRef import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow @@ -1212,6 +1211,65 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { } } + describe("KafkaConsumer#stream") { + it("should wait for previous generation of streams to start consuming messages with RebalanceRevokeMode#Graceful") { + withTopic { topic => + createCustomTopic(topic, partitions = 2) //minimal amount of partitions for two consumers + def recordRange(from: Int, _until: Int) = (from until _until).map(n => s"key-$n" -> s"value-$n") + + def produceRange(from: Int, until: Int): IO[Unit] = IO { + val produced = recordRange(from, until) + publishToKafka(topic, produced) + } + + // tracking consumption for being unique by explicitly commiting after each message + val consumed = for { + ref <- Ref.of[IO, Vector[(String, String)]](Vector.empty) + _ <- produceRange(0, 10) + _ <- + KafkaConsumer + .stream(consumerSettings[IO].withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)) + .evalTap(_.subscribeTo(topic)) + .flatMap( + _.stream + .evalMap { record => + ref.update(_ :+ (record.record.key -> record.record.value)).as(record.offset) + } + .evalTap(_.commit) + ) + .interruptAfter(3.seconds) + .compile + .drain + .race { + Clock[IO].sleep(1.second) *> + produceRange(10, 20) *> + KafkaConsumer + .stream(consumerSettings[IO].withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)) + .evalTap(_.subscribeTo(topic)) + .flatMap( c => + fs2.Stream.exec(produceRange(20, 30)) ++ + c.stream + .evalMap { record => + ref.update(_ :+ (record.record.key -> record.record.value)).as(record.offset) + } + .evalTap(_.commit) + ) + .interruptAfter(3.seconds) + .compile + .drain + } + res <- ref.get + } yield res + + val res = consumed.unsafeRunSync() + + //expected behavior is that no duplicate consumption is performed + res.toSet should have size res.length.toLong + (res should contain).theSameElementsAs(recordRange(0, 10) ++ recordRange(10, 20) ++ recordRange(20, 30)) + } + } + } + private def commitTest( commit: (KafkaConsumer[IO, String, String], CommittableOffsetBatch[IO]) => IO[Unit] ): Assertion = From 4807bcbd70a4328c55c094e807a3a7647173306c Mon Sep 17 00:00:00 2001 From: wookievx Date: Sun, 29 Sep 2024 20:09:36 +0200 Subject: [PATCH 3/4] chore: Updating code comments to be more accurate --- modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala b/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala index e8e62a6d2..3edca1a5f 100644 --- a/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala +++ b/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala @@ -34,7 +34,7 @@ object RebalanceRevokeMode { /** * Waiting for configured amount of time:
- * [[ConsumerSettings#withMaxPollInterval]] * 5 or until all the partition streams finish processing (but not waiting + * {{{org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG}}} or until all the partition streams finish processing (but not waiting * for commits to conclude for that partition) */ val Graceful: RebalanceRevokeMode = GracefulMode From 32f5a535a4be5b39b2bca2afe4da22218202d743 Mon Sep 17 00:00:00 2001 From: wookievx Date: Wed, 13 Nov 2024 21:10:20 +0100 Subject: [PATCH 4/4] fix: Fixing implementation of revoke listener to not wait for streams termination while signaling streams to finish --- .../main/scala/fs2/kafka/KafkaConsumer.scala | 14 ++-- .../kafka/internal/KafkaConsumerActor.scala | 18 ++++- .../scala/fs2/kafka/internal/LogEntry.scala | 10 +++ .../scala/fs2/kafka/KafkaConsumerSpec.scala | 75 +++++++++++++------ 4 files changed, 84 insertions(+), 33 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index c52fe843b..5564f8a11 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -12,7 +12,7 @@ import scala.collection.immutable.SortedSet import scala.concurrent.duration.FiniteDuration import scala.util.matching.Regex import cats.{Applicative, Foldable, Functor, Reducible} -import cats.data.{NonEmptySet, OptionT} +import cats.data.{Chain, NonEmptySet, OptionT} import cats.effect.* import cats.effect.implicits.* import cats.effect.std.* @@ -268,17 +268,15 @@ object KafkaConsumer { ): OnRebalance[F] = OnRebalance( onRevoked = revoked => { - val finishSignals = for { + for { finishers <- assignmentRef.modify(_.partition(entry => !revoked.contains(entry._1))) - revokeFinishers <- finishers - .toVector + revokeFinishers <- Chain + .fromIterableOnce(finishers) .traverse { case (_, assignmentSignals) => assignmentSignals.signalStreamToTerminate.as(assignmentSignals.awaitStreamFinishedSignal) } } yield revokeFinishers - - finishSignals.flatMap(revokes => revokes.sequence_) }, onAssigned = assignedPartitions => { for { @@ -447,7 +445,9 @@ object KafkaConsumer { assignmentRef.updateAndGet(_ ++ assigned).flatMap(updateQueue.offer), onRevoked = revoked => initialAssignmentDone >> - assignmentRef.updateAndGet(_ -- revoked).flatMap(updateQueue.offer) + assignmentRef.updateAndGet(_ -- revoked) + .flatMap(updateQueue.offer) + .as(Chain.empty) ) Stream diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index e2c049625..a468dc786 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -223,14 +223,24 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( )).run(withRebalancing) } .flatMap { res => - val onRevoked = - res.onRebalances.foldLeft(F.unit)(_ >> _.onRevoked(revoked)) + val onRevokedStarted = + res.onRebalances.foldLeft(F.pure(Chain.empty[F[Unit]])) { (acc, next) => + for { + acc <- acc + next <- next.onRevoked(revoked) + } yield acc ++ next + } res.logRevoked >> res.completeWithRecords >> res.completeWithoutRecords >> res.removeRevokedRecords >> - onRevoked.timeout(settings.sessionTimeout) //just to be extra-safe timeout this revoke + onRevokedStarted //first we need to trigger interruption for all the consuming streams + .flatMap(_.sequence_) //second we await for all the streams to finish processing (Eager mode returns immediately) + .timeoutTo( + settings.sessionTimeout, + ref.get.flatMap(state => log(LogEntry.RevokeTimeoutOccurred(revoked, state))) + ) } } @@ -630,7 +640,7 @@ private[kafka] object KafkaConsumerActor { final case class OnRebalance[F[_]]( onAssigned: SortedSet[TopicPartition] => F[Unit], - onRevoked: SortedSet[TopicPartition] => F[Unit], + onRevoked: SortedSet[TopicPartition] => F[Chain[F[Unit]]], ) { override def toString: String = diff --git a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala index 782945a1d..3e8a396b7 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala @@ -223,6 +223,16 @@ private[kafka] object LogEntry { } + final case class RevokeTimeoutOccurred[F[_]]( + revoked: Set[TopicPartition], + state: State[F, ?, ?] + ) extends LogEntry { + override def level: LogLevel = Info + + override def message: String = + s"Consuming streams did not signal processing completion of [$revoked]. Current state [$state]." + } + def recordsString[F[_]]( records: Records[F] ): String = diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala index 1fc2a800e..1dc3f26d6 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala @@ -8,22 +8,16 @@ package fs2.kafka import scala.collection.immutable.SortedSet import scala.concurrent.duration.* - import cats.data.NonEmptySet import cats.effect.{Clock, Fiber, IO, Ref} -import cats.effect.std.Queue +import cats.effect.std.{Queue, Semaphore} import cats.effect.unsafe.implicits.global import cats.syntax.all.* import fs2.concurrent.SignallingRef import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow import fs2.kafka.internal.converters.collection.* import fs2.Stream - -import org.apache.kafka.clients.consumer.{ - ConsumerConfig, - CooperativeStickyAssignor, - NoOffsetForPartitionException -} +import org.apache.kafka.clients.consumer.{ConsumerConfig, CooperativeStickyAssignor, NoOffsetForPartitionException} import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.TopicPartition import org.scalatest.Assertion @@ -71,7 +65,9 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { } } - it("should consume all records at least once with subscribing for several consumers") { + def testMultipleConsumersCorrectConsumption( + customizeSettings: ConsumerSettings[IO, String, String] => ConsumerSettings[IO, String, String] + ) = { withTopic { topic => createCustomTopic(topic, partitions = 3) val produced = (0 until 5).map(n => s"key-$n" -> s"value->$n") @@ -79,7 +75,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { val consumed = KafkaConsumer - .stream(consumerSettings[IO].withGroupId("test")) + .stream(customizeSettings(consumerSettings[IO].withGroupId("test"))) .subscribeTo(topic) .evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream .records @@ -101,6 +97,14 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { } } + it("should consume all records at least once with subscribing for several consumers") { + testMultipleConsumersCorrectConsumption(identity) + } + + it("should consume all records at least once with subscribing for several consumers in graceful mode") { + testMultipleConsumersCorrectConsumption(_.withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)) + } + it("should consume records with assign by partitions") { withTopic { topic => createCustomTopic(topic, partitions = 3) @@ -1212,7 +1216,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { } describe("KafkaConsumer#stream") { - it("should wait for previous generation of streams to start consuming messages with RebalanceRevokeMode#Graceful") { + it("should wait for previous generation of streams to finish before starting consuming messages with RebalanceRevokeMode#Graceful") { withTopic { topic => createCustomTopic(topic, partitions = 2) //minimal amount of partitions for two consumers def recordRange(from: Int, _until: Int) = (from until _until).map(n => s"key-$n" -> s"value-$n") @@ -1222,50 +1226,77 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { publishToKafka(topic, produced) } + val settings = consumerSettings[IO].withGroupId("rebalance-test-group").withRebalanceRevokeMode(RebalanceRevokeMode.Graceful).withAutoOffsetReset(AutoOffsetReset.EarliestOffsetReset) + // tracking consumption for being unique by explicitly commiting after each message + // the expected timeline looks like this: + // ----stream1---|locked auquired|--|producing second batch|--|rebalance-triggered|--|lock-released|---... + //idea is that we check if all the messages are consumed by one processor (exactly once processing implies that) val consumed = for { + lock <- Semaphore[IO](1) ref <- Ref.of[IO, Vector[(String, String)]](Vector.empty) _ <- produceRange(0, 10) _ <- KafkaConsumer - .stream(consumerSettings[IO].withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)) + .stream(settings) .evalTap(_.subscribeTo(topic)) .flatMap( _.stream .evalMap { record => - ref.update(_ :+ (record.record.key -> record.record.value)).as(record.offset) + lock.permit.use { _ => + ref.update(_ :+ (record.record.key -> record.record.value)).as(record) + } + } + .evalMap { r => + //if key is last return none and terminate stream + if (r.record.key == "key-29") { + r.offset.commit.as(None) + } else { + r.offset.commit.as(Some(r)) + } } - .evalTap(_.commit) + .unNoneTerminate ) - .interruptAfter(3.seconds) .compile .drain .race { - Clock[IO].sleep(1.second) *> + Clock[IO].sleep(100.millis) *> + lock.acquire *> produceRange(10, 20) *> KafkaConsumer - .stream(consumerSettings[IO].withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)) + .stream(settings) .evalTap(_.subscribeTo(topic)) + .evalTap(_ => lock.release) .flatMap( c => fs2.Stream.exec(produceRange(20, 30)) ++ c.stream .evalMap { record => - ref.update(_ :+ (record.record.key -> record.record.value)).as(record.offset) + ref.update(_ :+ (record.record.key -> record.record.value)).as(record) + } + .evalMap { r => + //if key is last return none and terminate stream + if (r.record.key == "key-29") { + r.offset.commit.as(None) + } else { + r.offset.commit.as(Some(r)) + } } - .evalTap(_.commit) + .unNoneTerminate ) - .interruptAfter(3.seconds) .compile .drain } + .timeout(5.seconds) res <- ref.get } yield res val res = consumed.unsafeRunSync() //expected behavior is that no duplicate consumption is performed - res.toSet should have size res.length.toLong - (res should contain).theSameElementsAs(recordRange(0, 10) ++ recordRange(10, 20) ++ recordRange(20, 30)) + val resultSet = res.toSet + resultSet should have size res.length.toLong + val expectedSet = (recordRange(0, 10) ++ recordRange(10, 20) ++ recordRange(20, 30)).toSet + resultSet shouldEqual expectedSet } } }