Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implemented RebalanceRevokeMode #1

Open
wants to merge 4 commits into
base: series/3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,49 @@ You may notice, that actual graceful shutdown implementation requires a decent a

Also note, that even if you implement a graceful shutdown your application may fall with an error. And in this case, a graceful shutdown will not be invoked. It means that your application should be ready to an _at least once_ semantic even when a graceful shutdown is implemented. Or, if you need an _exactly once_ semantic, consider using [transactions](transactions.md).

### Graceful partition revoke

In addition to graceful shutdown of hole consumer there is an option to configure your consumer to wait for the streams
to finish processing partition before "releasing" it. Behavior can be enabled via the following settings:
```scala mdoc:silent
object WithGracefulPartitionRevoke extends IOApp.Simple {

val run: IO[Unit] = {
def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

def run(consumer: KafkaConsumer[IO, String, String]): IO[Unit] = {
consumer.subscribeTo("topic") >> consumer
.stream
.evalMap { msg =>
processRecord(msg).as(msg.offset)
}
.through(commitBatchWithin(100, 15.seconds))
.compile
.drain
}

val consumerSettings = ConsumerSettings[IO, String, String] =
ConsumerSettings[IO, String, String]
.withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)
.withSessionTimeout(2.seconds)

KafkaConsumer
.resource(consumerSettings)
.use { consumer =>
run(consumer)
}
}

}
```

Please note that this setting does not guarantee that all the commits will be performed before partition is revoked and
that `session.timeout.ms` setting is set to lower value. Be aware that awaiting too long for partition processor
to finish will cause processing of the whole topic to be suspended.

Awaiting for commits to complete might be implemented in the future.

[commitrecovery-default]: @API_BASE_URL@/CommitRecovery$.html#Default:fs2.kafka.CommitRecovery
[committableconsumerrecord]: @API_BASE_URL@/CommittableConsumerRecord.html
[committableoffset]: @API_BASE_URL@/CommittableOffset.html
Expand Down
45 changes: 41 additions & 4 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ package fs2.kafka

import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext

import cats.effect.Resource
import cats.Show
import fs2.kafka.security.KafkaCredentialStore

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.requests.OffsetFetchResponse

import scala.util.Try

/**
* [[ConsumerSettings]] contain settings necessary to create a [[KafkaConsumer]]. At the very
* least, this includes key and value deserializers.<br><br>
Expand Down Expand Up @@ -151,6 +151,18 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V]


/**
* Returns value for property:
*
* {{{
* ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG
* }}}
*
* Returns a value as a [[FiniteDuration]] for convenience
*/
def sessionTimeout: FiniteDuration

/**
* Returns a new [[ConsumerSettings]] instance with the specified session timeout. This is
* equivalent to setting the following property using the [[withProperty]] function, except you
Expand Down Expand Up @@ -373,6 +385,17 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V]

/**
* One of two possible modes of operation for [[KafkaConsumer.partitionsMapStream]]. See [[RebalanceRevokeMode]]
* for detailed explanation of differences between them.
*/
def rebalanceRevokeMode: RebalanceRevokeMode

/**
* Creates a new [[ConsumerSettings]] with the specified [[rebalanceRevokeMode]].
*/
def withRebalanceRevokeMode(rebalanceRevokeMode: RebalanceRevokeMode): ConsumerSettings[F, K, V]

}

object ConsumerSettings {
Expand All @@ -388,7 +411,8 @@ object ConsumerSettings {
override val pollTimeout: FiniteDuration,
override val commitRecovery: CommitRecovery,
override val recordMetadata: ConsumerRecord[K, V] => String,
override val maxPrefetchBatches: Int
override val maxPrefetchBatches: Int,
override val rebalanceRevokeMode: RebalanceRevokeMode
) extends ConsumerSettings[F, K, V] {

override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] =
Expand Down Expand Up @@ -422,6 +446,13 @@ object ConsumerSettings {
override def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)

//need to use Try, to avoid separate implementation for scala 2.12
override def sessionTimeout: FiniteDuration =
properties.get(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG)
.flatMap(str => Try(str.toLong).toOption)
.map(_.millis)
.getOrElse(45000.millis)

override def withSessionTimeout(sessionTimeout: FiniteDuration): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toMillis.toString)

Expand Down Expand Up @@ -509,6 +540,11 @@ object ConsumerSettings {
): ConsumerSettings[F, K, V] =
withProperties(credentialsStore.properties)

override def withRebalanceRevokeMode(
rebalanceRevokeMode: RebalanceRevokeMode
): ConsumerSettings[F, K, V] =
copy(rebalanceRevokeMode = rebalanceRevokeMode)

override def toString: String =
s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)"

Expand Down Expand Up @@ -542,7 +578,8 @@ object ConsumerSettings {
pollTimeout = 50.millis,
commitRecovery = CommitRecovery.Default,
recordMetadata = _ => OffsetFetchResponse.NO_METADATA,
maxPrefetchBatches = 2
maxPrefetchBatches = 2,
rebalanceRevokeMode = RebalanceRevokeMode.Eager
)

def apply[F[_], K, V](
Expand Down
109 changes: 79 additions & 30 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
package fs2.kafka

import java.util

import scala.annotation.nowarn
import scala.collection.immutable.SortedSet
import scala.concurrent.duration.FiniteDuration
import scala.util.matching.Regex

import cats.{Foldable, Functor, Reducible}
import cats.data.{NonEmptySet, OptionT}
import cats.{Applicative, Foldable, Functor, Reducible}
import cats.data.{Chain, NonEmptySet, OptionT}
import cats.effect.*
import cats.effect.implicits.*
import cats.effect.std.*
Expand All @@ -28,7 +26,6 @@ import fs2.kafka.internal.converters.collection.*
import fs2.kafka.internal.syntax.*
import fs2.kafka.internal.KafkaConsumerActor.*
import fs2.kafka.internal.LogEntry.{RevokedPreviousFetch, StoredFetch}

import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetAndTimestamp}
import org.apache.kafka.common.{Metric, MetricName, PartitionInfo, TopicPartition}

Expand Down Expand Up @@ -151,7 +148,8 @@ object KafkaConsumer {
def partitionStream(
streamId: StreamId,
partition: TopicPartition,
assignmentRevoked: F[Unit]
assignmentRevoked: F[Unit],
signalCompletion: F[Unit]
): Stream[F, CommittableConsumerRecord[F, K, V]] = Stream.force {
for {
chunks <- chunkQueue
Expand Down Expand Up @@ -238,23 +236,25 @@ object KafkaConsumer {
.fromQueueNoneTerminated(chunks)
.flatMap(Stream.chunk)
.covary[F]
.onFinalize(dequeueDone.complete(()).void)
//Previously all the revoke logic was done on the polling stream, not handling stream
//with new `signalCompletion` there is possibility for more graceful consumer revoke logic
.onFinalize(dequeueDone.complete(()) *> signalCompletion)
}
}
.flatten
}

def enqueueAssignment(
streamId: StreamId,
assigned: Map[TopicPartition, Deferred[F, Unit]],
assigned: Map[TopicPartition, AssignmentSignals[F]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
stopConsumingDeferred
.tryGet
.flatMap {
case None =>
val assignment: PartitionsMap = assigned.map { case (partition, finisher) =>
partition -> partitionStream(streamId, partition, finisher.get)
val assignment = assigned.map { case (partition, assignmentSignals) =>
partition -> partitionStream(streamId, partition, assignmentSignals.awaitStreamTerminationSignal, assignmentSignals.signalStreamFinished.void)
}
partitionsMapQueue.offer(Some(assignment))
case Some(()) =>
Expand All @@ -263,24 +263,24 @@ object KafkaConsumer {

def onRebalance(
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): OnRebalance[F] =
OnRebalance(
onRevoked = revoked => {
for {
finishers <- assignmentRef.modify(_.partition(entry => !revoked.contains(entry._1)))
_ <- finishers.toVector.traverse { case (_, finisher) => finisher.complete(()) }
} yield ()
revokeFinishers <- Chain
.fromIterableOnce(finishers)
.traverse {
case (_, assignmentSignals) =>
assignmentSignals.signalStreamToTerminate.as(assignmentSignals.awaitStreamFinishedSignal)
}
} yield revokeFinishers
},
onAssigned = assignedPartitions => {
for {
assignment <- assignedPartitions
.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
assignment <- buildAssignment(assignedPartitions)
_ <- assignmentRef.update(_ ++ assignment)
_ <- enqueueAssignment(
streamId = streamId,
Expand All @@ -291,11 +291,29 @@ object KafkaConsumer {
}
)

def buildAssignment(
assignedPartitions: SortedSet[TopicPartition]
): F[Map[TopicPartition, AssignmentSignals[F]]] = {
assignedPartitions
.toVector
.traverse { partition =>
settings.rebalanceRevokeMode match {
case RebalanceRevokeMode.EagerMode =>
Deferred[F, Unit].map(streamFinisher => partition -> AssignmentSignals.eager(streamFinisher))
case RebalanceRevokeMode.GracefulMode =>
(Deferred[F, Unit], Deferred[F, Unit]).mapN { (streamFinisher, revokeFinisher) =>
partition -> AssignmentSignals.graceful(streamFinisher, revokeFinisher)
}
}
}
.map(_.toMap)
}

def requestAssignment(
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): F[Map[TopicPartition, Deferred[F, Unit]]] = {
): F[Map[TopicPartition, AssignmentSignals[F]]] = {
val assignment = this.assignment(
Some(
onRebalance(
Expand All @@ -312,18 +330,13 @@ object KafkaConsumer {
F.pure(Map.empty)

case Right(assigned) =>
assigned
.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
buildAssignment(assigned)
}
}

def initialEnqueue(
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
for {
Expand All @@ -343,7 +356,7 @@ object KafkaConsumer {
partitionsMapQueue <- Stream.eval(Queue.unbounded[F, Option[PartitionsMap]])
streamId <- Stream.eval(streamIdRef.modify(n => (n + 1, n)))
assignmentRef <- Stream
.eval(Ref[F].of(Map.empty[TopicPartition, Deferred[F, Unit]]))
.eval(Ref[F].of(Map.empty[TopicPartition, AssignmentSignals[F]]))
_ <- Stream.eval(
initialEnqueue(
streamId,
Expand Down Expand Up @@ -432,7 +445,9 @@ object KafkaConsumer {
assignmentRef.updateAndGet(_ ++ assigned).flatMap(updateQueue.offer),
onRevoked = revoked =>
initialAssignmentDone >>
assignmentRef.updateAndGet(_ -- revoked).flatMap(updateQueue.offer)
assignmentRef.updateAndGet(_ -- revoked)
.flatMap(updateQueue.offer)
.as(Chain.empty)
)

Stream
Expand Down Expand Up @@ -825,6 +840,40 @@ object KafkaConsumer {

}

/**
* Utility class to provide clarity for internals.
* Goal is to make [[RebalanceRevokeMode]] transparent to the rest of implementation internals.
* @tparam F effect used
*/
private sealed abstract class AssignmentSignals[F[_]] {
def signalStreamToTerminate: F[Boolean]
def awaitStreamTerminationSignal: F[Unit]
def signalStreamFinished: F[Boolean]
def awaitStreamFinishedSignal: F[Unit]
}

private object AssignmentSignals {

def eager[F[_]: Applicative](streamFinisher: Deferred[F, Unit]): AssignmentSignals[F] = EagerSignals(streamFinisher)
def graceful[F[_]](streamFinisher: Deferred[F, Unit], revokeFinisher: Deferred[F, Unit]): AssignmentSignals[F] =
GracefulSignals[F](streamFinisher, revokeFinisher)

final case class EagerSignals[F[_]: Applicative](streamFinisher: Deferred[F, Unit]) extends AssignmentSignals[F] {
override def signalStreamToTerminate: F[Boolean] = streamFinisher.complete(())
override def awaitStreamTerminationSignal: F[Unit] = streamFinisher.get
override def signalStreamFinished: F[Boolean] = true.pure[F]
override def awaitStreamFinishedSignal: F[Unit] = ().pure[F]
}

final case class GracefulSignals[F[_]](streamFinisher: Deferred[F, Unit], revokeFinisher: Deferred[F, Unit]) extends AssignmentSignals[F] {
override def signalStreamToTerminate: F[Boolean] = streamFinisher.complete(())
override def awaitStreamTerminationSignal: F[Unit] = streamFinisher.get
override def signalStreamFinished: F[Boolean] = revokeFinisher.complete(())
override def awaitStreamFinishedSignal: F[Unit] = revokeFinisher.get
}

}

/*
* Prevents the default `MkConsumer` instance from being implicitly available
* to code defined in this object, ensuring factory methods require an instance
Expand Down
Loading