diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 313d618c3..57eb13ca9 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -170,7 +170,7 @@ object KafkaConsumer { stopReqs <- Deferred[F, Unit] } yield Stream .eval { - def fetchPartition: F[Unit] = F + val fetchPartition: F[Unit] = F .deferred[PartitionResult] .flatMap { deferred => val callback: PartitionResult => F[Unit] = @@ -202,19 +202,15 @@ object KafkaConsumer { assigned.ifM(storeFetch, completeRevoked) } >> deferred.get - F.race(shutdown, fetch) - .flatMap { - case Left(()) => - stopReqs.complete(()).void - case Right((chunk, reason)) => - val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty) + fetch.flatMap { case (chunk, reason) => + val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty) - val completeRevoked = - stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked) + val completeRevoked = + stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked) - enqueueChunk >> completeRevoked - } + enqueueChunk >> completeRevoked + } } Stream