Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix blocking initialization #1364

Open
wants to merge 1 commit into
base: series/3.x
Choose a base branch
from
Open

Conversation

eugkhp
Copy link

@eugkhp eugkhp commented Dec 10, 2024

I've got this warning by ce3 blocking detection

[WARNING] A Cats Effect worker thread was detected to be in a blocked state (BLOCKED)                                                    │
│   at org.apache.kafka.common.metrics.JmxReporter.reconfigure(JmxReporter.java:109)                                                       │
│   at org.apache.kafka.common.metrics.JmxReporter.configure(JmxReporter.java:93)                                                          │
│   at org.apache.kafka.clients.CommonClientConfigs.metricsReporters(CommonClientConfigs.java:243)                                         │
│   at org.apache.kafka.clients.CommonClientConfigs.metricsReporters(CommonClientConfigs.java:234)                                         │
│   at org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics(ConsumerUtils.java:124)                                     │
│   at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:696)                                                      │
│   at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:671)                                                      │
│   at fs2.kafka.consumer.MkConsumer$.fs2$kafka$consumer$MkConsumer$$anon$1$$_$apply$$anonfun$1(MkConsumer.scala:38)                       │
│   at cats.effect.IOFiber.runLoop(IOFiber.scala:343)                                                                                      │
│   at cats.effect.IOFiber.execR(IOFiber.scala:1362)                                                                                       │
│   at cats.effect.IOFiber.run(IOFiber.scala:112)                                                                                          │
│   at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:743)                                                                         │
│ This is very likely to be due to suspending a blocking call in IO via                                                                    │
│ `IO.delay` or `IO.apply`. If this is the case then you should use                                                                        │
│ `IO.blocking` or `IO.interruptible` instead.

Looks like something inside new org.apache.kafka.clients.consumer.KafkaConsumer does network call.

@@ -30,7 +30,7 @@ object MkConsumer {
implicit def mkConsumerForSync[F[_]](implicit F: Sync[F]): MkConsumer[F] =
new MkConsumer[F] {

def apply[G[_]](settings: ConsumerSettings[G, ?, ?]): F[KafkaByteConsumer] = F.delay {
def apply[G[_]](settings: ConsumerSettings[G, ?, ?]): F[KafkaByteConsumer] = F.blocking {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithConsumer manages the allocation of a dedicated blocking context for the consumer instance provided by MkConsumer. We should probably update WithConsumer to ensure that the instance is constructed in that context, instead of using another blocking context here.

Copy link
Author

@eugkhp eugkhp Dec 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not entirely sure. Doesn't WithConsumer create a new blocking context for consumer to use? (unless a custom pool is provided)

    val blocking: Resource[F, Blocking[F]] = settings.customBlockingContext match {
      case None     => Blocking.singleThreaded[F]("fs2-kafka-consumer")
      case Some(ec) => Resource.pure(Blocking.fromExecutionContext(ec))
    }

And here we would be using default ce3 context to initialize the consumer .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that a dedicated blocking context is created to invoke every other consumer method. I think it would make most sense to also invoke the constructor in that context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants