Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer committed Mar 15, 2022
1 parent 31e370b commit c21aacc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
43 changes: 20 additions & 23 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,32 +275,29 @@ object KafkaConsumer {
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
partitionsMapQueue: PartitionsMapQueue
): F[Map[TopicPartition, Deferred[F, Unit]]] =
Deferred[F, Either[Throwable, SortedSet[TopicPartition]]].flatMap { deferred =>
val request =
Request.Assignment[F](
deferred.complete(_).void,
Some(
onRebalance(
streamId,
assignmentRef,
partitionsMapQueue
)
)
): F[Map[TopicPartition, Deferred[F, Unit]]] = {
val assignment = this.assignment(
Some(
onRebalance(
streamId,
assignmentRef,
partitionsMapQueue
)
val assignment = requests.offer(request) >> deferred.get.rethrow
F.race(awaitTermination.attempt, assignment).flatMap {
case Left(_) =>
F.pure(Map.empty)
)
)

case Right(assigned) =>
assigned.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
}
F.race(awaitTermination.attempt, assignment).flatMap {
case Left(_) =>
F.pure(Map.empty)

case Right(assigned) =>
assigned.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
}
}

def initialEnqueue(
streamId: StreamId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](

def handle(request: Request[F, K, V]): F[Unit] =
request match {
case Request.Poll() => poll
case Request.Poll() => poll
case Request.Fetch(partition, streamId, callback) =>
fetch(partition, streamId, callback)
case request @ Request.Commit(_, _) => commit(request)
Expand Down Expand Up @@ -648,7 +648,7 @@ private[kafka] object KafkaConsumerActor {
object Request {
final case class Permit[F[_]](callback: Resource[F, Unit] => F[Unit])
extends Request[F, Any, Any]

final case class Poll[F[_]]() extends Request[F, Any, Any]

private[this] val pollInstance: Poll[Nothing] =
Expand Down

0 comments on commit c21aacc

Please sign in to comment.