Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use semaphore-like permits for some actor operations #906

Merged
merged 7 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 71 additions & 49 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ object KafkaConsumer {
id: Int,
withConsumer: WithConsumer[F],
stopConsumingDeferred: Deferred[F, Unit]
)(implicit F: Async[F]): KafkaConsumer[F, K, V] =
)(implicit F: Async[F], logging: Logging[F]): KafkaConsumer[F, K, V] =
new KafkaConsumer[F, K, V] {

override def partitionsMapStream
Expand Down Expand Up @@ -299,32 +299,29 @@ object KafkaConsumer {
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
partitionsMapQueue: PartitionsMapQueue
): F[Map[TopicPartition, Deferred[F, Unit]]] =
Deferred[F, Either[Throwable, SortedSet[TopicPartition]]].flatMap { deferred =>
val request =
Request.Assignment[F](
deferred.complete(_).void,
Some(
onRebalance(
streamId,
assignmentRef,
partitionsMapQueue
)
)
): F[Map[TopicPartition, Deferred[F, Unit]]] = {
val assignment = this.assignment(
Some(
onRebalance(
streamId,
assignmentRef,
partitionsMapQueue
)
val assignment = requests.offer(request) >> deferred.get.rethrow
F.race(awaitTermination.attempt, assignment).flatMap {
case Left(_) =>
F.pure(Map.empty)
)
)

case Right(assigned) =>
assigned.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
}
F.race(awaitTermination.attempt, assignment).flatMap {
case Left(_) =>
F.pure(Map.empty)

case Right(assigned) =>
assigned.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
}
}

def initialEnqueue(
streamId: StreamId,
Expand Down Expand Up @@ -408,11 +405,16 @@ object KafkaConsumer {
private def assignment(
onRebalance: Option[OnRebalance[F]]
): F[SortedSet[TopicPartition]] =
request { callback =>
Request.Assignment(
callback = callback,
onRebalance = onRebalance
)
permit.surround {
onRebalance
.fold(actor.ref.updateAndGet(_.asStreaming)) { on =>
actor.ref.updateAndGet(_.withOnRebalance(on).asStreaming).flatTap { newState =>
logging.log(LogEntry.StoredOnRebalance(on, newState))
}

}
.ensure(NotSubscribedException())(_.subscribed) >>
withConsumer.blocking(_.assignment.toSortedSet)
}

override def assignmentStream: Stream[F, SortedSet[TopicPartition]] = {
Expand Down Expand Up @@ -503,37 +505,57 @@ object KafkaConsumer {
subscribe(NonEmptyList.of(firstTopic, remainingTopics: _*))

override def subscribe[G[_]](topics: G[String])(implicit G: Reducible[G]): F[Unit] =
request { callback =>
Request.SubscribeTopics(
topics = topics.toNonEmptyList,
callback = callback
)
permit.surround {
withConsumer.blocking {
_.subscribe(
topics.toList.asJava,
actor.consumerRebalanceListener
)
} >> actor.ref
.updateAndGet(_.asSubscribed)
.log(LogEntry.SubscribedTopics(topics.toNonEmptyList, _))
}

private def permit: Resource[F, Unit] =
Resource.eval {
Deferred[F, Resource[F, Unit]].flatMap { permitDef =>
requests.offer(Request.Permit(permitDef.complete(_).void)) >> permitDef.get
}
}.flatten

override def subscribe(regex: Regex): F[Unit] =
request { callback =>
Request.SubscribePattern(
pattern = regex.pattern,
callback = callback
)
permit.surround {
withConsumer.blocking {
_.subscribe(
regex.pattern,
actor.consumerRebalanceListener
)
} >>
actor.ref
.updateAndGet(_.asSubscribed)
.log(LogEntry.SubscribedPattern(regex.pattern, _))
}

override def unsubscribe: F[Unit] =
request { callback =>
Request.Unsubscribe(
callback = callback
)
permit.surround {
withConsumer.blocking { _.unsubscribe() } >> actor.ref
.updateAndGet(_.asUnsubscribed)
.log(LogEntry.Unsubscribed(_))
}

override def stopConsuming: F[Unit] =
stopConsumingDeferred.complete(()).attempt.void

override def assign(partitions: NonEmptySet[TopicPartition]): F[Unit] =
request { callback =>
Request.Assign(
topicPartitions = partitions,
callback = callback
)
permit.surround {
withConsumer.blocking {
_.assign(
partitions.toList.asJava
)
} >> actor.ref
.updateAndGet(_.asSubscribed)
.log(LogEntry.ManuallyAssignedPartitions(partitions, _))

}

override def assign(topic: String, partitions: NonEmptySet[Int]): F[Unit] =
Expand Down Expand Up @@ -654,7 +676,7 @@ object KafkaConsumer {
id,
withConsumer,
stopConsumingDeferred
)
)(F, logging)

/**
* Creates a new [[KafkaConsumer]] in the `Stream` context,
Expand Down
156 changes: 12 additions & 144 deletions modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

package fs2.kafka.internal

import cats.data.{Chain, NonEmptyList, NonEmptySet, NonEmptyVector, StateT}
import cats.data.{Chain, NonEmptyVector, StateT}
import cats.effect._
import cats.effect.std._
import cats.effect.syntax.all._
Expand All @@ -20,7 +20,6 @@ import fs2.kafka.internal.LogEntry._
import fs2.kafka.internal.syntax._
import java.time.Duration
import java.util
import java.util.regex.Pattern

import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
Expand All @@ -45,7 +44,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
*/
private[kafka] final class KafkaConsumerActor[F[_]](
settings: ConsumerSettings[F, _, _],
ref: Ref[F, State[F]],
val ref: Ref[F, State[F]],
requests: Queue[F, Request[F]],
withConsumer: WithConsumer[F]
)(
Expand All @@ -62,7 +61,7 @@ private[kafka] final class KafkaConsumerActor[F[_]](
private[kafka] val consumerGroupId: Option[String] =
settings.properties.get(ConsumerConfig.GROUP_ID_CONFIG)

private[this] val consumerRebalanceListener: ConsumerRebalanceListener =
val consumerRebalanceListener: ConsumerRebalanceListener =
new ConsumerRebalanceListener {
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit =
dispatcher.unsafeRunSync(revoked(partitions.toSortedSet))
Expand All @@ -71,91 +70,6 @@ private[kafka] final class KafkaConsumerActor[F[_]](
dispatcher.unsafeRunSync(assigned(partitions.toSortedSet))
}

private[this] def subscribe(
topics: NonEmptyList[String],
callback: Either[Throwable, Unit] => F[Unit]
): F[Unit] = {
val subscribe =
withConsumer.blocking {
_.subscribe(
topics.toList.asJava,
consumerRebalanceListener
)
}.attempt

subscribe
.flatTap {
case Left(_) => F.unit
case Right(_) =>
ref
.updateAndGet(_.asSubscribed)
.log(SubscribedTopics(topics, _))
}
.flatMap(callback)
}

private[this] def subscribe(
pattern: Pattern,
callback: Either[Throwable, Unit] => F[Unit]
): F[Unit] = {
val subscribe =
withConsumer.blocking {
_.subscribe(
pattern,
consumerRebalanceListener
)
}.attempt

subscribe
.flatTap {
case Left(_) => F.unit
case Right(_) =>
ref
.updateAndGet(_.asSubscribed)
.log(SubscribedPattern(pattern, _))
}
.flatMap(callback)
}

private[this] def unsubscribe(
callback: Either[Throwable, Unit] => F[Unit]
): F[Unit] = {
val unsubscribe =
withConsumer.blocking { _.unsubscribe() }.attempt

unsubscribe
.flatTap {
case Left(_) => F.unit
case Right(_) =>
ref
.updateAndGet(_.asUnsubscribed)
.log(Unsubscribed(_))
}
.flatMap(callback)
}

private[this] def assign(
partitions: NonEmptySet[TopicPartition],
callback: Either[Throwable, Unit] => F[Unit]
): F[Unit] = {
val assign =
withConsumer.blocking {
_.assign(
partitions.toList.asJava
)
}.attempt

assign
.flatTap {
case Left(_) => F.unit
case Right(_) =>
ref
.updateAndGet(_.asSubscribed)
.log(ManuallyAssignedPartitions(partitions, _))
}
.flatMap(callback)
}

private[this] def fetch(
partition: TopicPartition,
streamId: StreamId,
Expand Down Expand Up @@ -336,32 +250,7 @@ private[kafka] final class KafkaConsumerActor[F[_]](
}
}

private[this] def assignment(
callback: Either[Throwable, SortedSet[TopicPartition]] => F[Unit],
onRebalance: Option[OnRebalance[F]]
): F[Unit] = {
def resolveDeferred(subscribed: Boolean): F[Unit] = {
val result =
if (subscribed) withConsumer.blocking(_.assignment.toSortedSet.asRight)
else F.pure(Left(NotSubscribedException()))

result.flatMap(callback)
}

onRebalance match {
case Some(on) =>
ref.updateAndGet(_.withOnRebalance(on).asStreaming).flatMap { newState =>
resolveDeferred(newState.subscribed) >> logging.log(StoredOnRebalance(on, newState))
}

case None =>
ref.updateAndGet(_.asStreaming).flatMap { newState =>
resolveDeferred(newState.subscribed)
}
}
}

private[kafka] val offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] =
val offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] =
offsets => {
val commit = runCommitAsync(offsets) { cb =>
requests.offer(Request.Commit(offsets, cb))
Expand Down Expand Up @@ -525,17 +414,18 @@ private[kafka] final class KafkaConsumerActor[F[_]](

def handle(request: Request[F]): F[Unit] =
request match {
case Request.Assignment(callback, onRebalance) => assignment(callback, onRebalance)
case Request.Poll() => poll
case Request.SubscribeTopics(topics, callback) => subscribe(topics, callback)
case Request.Assign(partitions, callback) => assign(partitions, callback)
case Request.SubscribePattern(pattern, callback) => subscribe(pattern, callback)
case Request.Unsubscribe(callback) => unsubscribe(callback)
case Request.Poll() => poll
case Request.Fetch(partition, streamId, callback) =>
fetch(partition, streamId, callback)
case request @ Request.Commit(_, _) => commit(request)
case request @ Request.ManualCommitAsync(_, _) => manualCommitAsync(request)
case request @ Request.ManualCommitSync(_, _) => manualCommitSync(request)
case Request.Permit(cb) => permit(cb)
}

def permit(callback: Resource[F, Unit] => F[Unit]): F[Unit] =
Deferred[F, Unit].flatMap { gate =>
callback(Resource.pure(()).onFinalize(gate.complete(()).void)) >> gate.get
}

private[this] case class RevokedResult(
Expand Down Expand Up @@ -730,10 +620,7 @@ private[kafka] object KafkaConsumerActor {
sealed abstract class Request[F[_]]

object Request {
final case class Assignment[F[_]](
callback: Either[Throwable, SortedSet[TopicPartition]] => F[Unit],
onRebalance: Option[OnRebalance[F]]
) extends Request[F]
final case class Permit[F[_]](callback: Resource[F, Unit] => F[Unit]) extends Request[F]

final case class Poll[F[_]]() extends Request[F]

Expand All @@ -743,25 +630,6 @@ private[kafka] object KafkaConsumerActor {
def poll[F[_]]: Poll[F] =
pollInstance.asInstanceOf[Poll[F]]

final case class SubscribeTopics[F[_]](
topics: NonEmptyList[String],
callback: Either[Throwable, Unit] => F[Unit]
) extends Request[F]

final case class Assign[F[_]](
topicPartitions: NonEmptySet[TopicPartition],
callback: Either[Throwable, Unit] => F[Unit]
) extends Request[F]

final case class SubscribePattern[F[_]](
pattern: Pattern,
callback: Either[Throwable, Unit] => F[Unit]
) extends Request[F]

final case class Unsubscribe[F[_]](
callback: Either[Throwable, Unit] => F[Unit]
) extends Request[F]

final case class Fetch[F[_]](
partition: TopicPartition,
streamId: StreamId,
Expand Down