Skip to content

Commit

Permalink
feat: Adding unit-test for new revocation mode
Browse files Browse the repository at this point in the history
  • Loading branch information
wookievx committed Dec 21, 2024
1 parent 46fee86 commit 8b6753c
Showing 1 changed file with 60 additions and 2 deletions.
62 changes: 60 additions & 2 deletions modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import scala.collection.immutable.SortedSet
import scala.concurrent.duration.*

import cats.data.NonEmptySet
import cats.effect.{Fiber, IO}
import cats.effect.{Clock, Fiber, IO, Ref}
import cats.effect.std.Queue
import cats.effect.unsafe.implicits.global
import cats.effect.Ref
import cats.syntax.all.*
import fs2.concurrent.SignallingRef
import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow
Expand Down Expand Up @@ -1212,6 +1211,65 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}

describe("KafkaConsumer#stream") {
it("should wait for previous generation of streams to start consuming messages with RebalanceRevokeMode#Graceful") {
withTopic { topic =>
createCustomTopic(topic, partitions = 2) //minimal amount of partitions for two consumers
def recordRange(from: Int, _until: Int) = (from until _until).map(n => s"key-$n" -> s"value-$n")

def produceRange(from: Int, until: Int): IO[Unit] = IO {
val produced = recordRange(from, until)
publishToKafka(topic, produced)
}

// tracking consumption for being unique by explicitly commiting after each message
val consumed = for {
ref <- Ref.of[IO, Vector[(String, String)]](Vector.empty)
_ <- produceRange(0, 10)
_ <-
KafkaConsumer
.stream(consumerSettings[IO].withRebalanceRevokeMode(RebalanceRevokeMode.Graceful))
.evalTap(_.subscribeTo(topic))
.flatMap(
_.stream
.evalMap { record =>
ref.update(_ :+ (record.record.key -> record.record.value)).as(record.offset)
}
.evalTap(_.commit)
)
.interruptAfter(3.seconds)
.compile
.drain
.race {
Clock[IO].sleep(1.second) *>
produceRange(10, 20) *>
KafkaConsumer
.stream(consumerSettings[IO].withRebalanceRevokeMode(RebalanceRevokeMode.Graceful))
.evalTap(_.subscribeTo(topic))
.flatMap( c =>
fs2.Stream.exec(produceRange(20, 30)) ++
c.stream
.evalMap { record =>
ref.update(_ :+ (record.record.key -> record.record.value)).as(record.offset)
}
.evalTap(_.commit)
)
.interruptAfter(3.seconds)
.compile
.drain
}
res <- ref.get
} yield res

val res = consumed.unsafeRunSync()

//expected behavior is that no duplicate consumption is performed
res.toSet should have size res.length.toLong
(res should contain).theSameElementsAs(recordRange(0, 10) ++ recordRange(10, 20) ++ recordRange(20, 30))
}
}
}

private def commitTest(
commit: (KafkaConsumer[IO, String, String], CommittableOffsetBatch[IO]) => IO[Unit]
): Assertion =
Expand Down

0 comments on commit 8b6753c

Please sign in to comment.