diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ee456d1abb..89dd0a906d 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -445,46 +445,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client, Some(group))) } yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata))) }, - test("handle rebalancing by completing topic-partition streams") { - val nrMessages = 50 - val nrPartitions = 6 // Must be even and strictly positive - - for { - // Produce messages on several partitions - topic <- randomTopic - group <- randomGroup - client1 <- randomClient - client2 <- randomClient - - _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions)) - _ <- ZIO.foreachDiscard(1 to nrMessages) { i => - produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) - } - - // Consume messages - subscription = Subscription.topics(topic) - consumer1 <- Consumer - .partitionedStream(subscription, Serde.string, Serde.string) - .flatMapPar(nrPartitions) { case (tp, partition) => - ZStream - .fromZIO(partition.runDrain) - .as(tp) - } - .take(nrPartitions.toLong / 2) - .runDrain - .provideSomeLayer[Kafka](consumer(client1, Some(group))) - .fork - _ <- Live.live(ZIO.sleep(5.seconds)) - consumer2 <- Consumer - .partitionedStream(subscription, Serde.string, Serde.string) - .take(nrPartitions.toLong / 2) - .runDrain - .provideSomeLayer[Kafka](consumer(client2, Some(group))) - .fork - _ <- consumer1.join - _ <- consumer2.join - } yield assertCompletes - }, test("produce diagnostic events when rebalancing") { val nrMessages = 50 val nrPartitions = 6 @@ -626,6 +586,117 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { consumedMessages <- messagesReceived.get } yield assert(consumedMessages)(contains(newMessage).negate) }, + suite("rebalanceSafeCommits prevents processing messages twice when rebalancing")({ + + /** + * Outline of this test: + * - A producer generates some messages on every partition of a topic (2 partitions), + * - A consumer starts reading from the topic. It is the only consumer so it handles all partitions. + * - After a few messages a second consumer is started. One partition will be re-assigned. + * + * Since the first consumer is slow, we expect it to not have committed the offsets yet when the rebalance + * happens. As a consequence, the second consumer would see some messages the first consumer already consumed. + * + * '''However,''' since we enable `rebalanceSafeCommits` on the first consumer, no messages should be consumed + * by both consumers. + */ + def testForPartitionAssignmentStrategy[T <: ConsumerPartitionAssignor: ClassTag] = + test(implicitly[ClassTag[T]].runtimeClass.getName) { + val partitionCount = 2 + + def makeConsumer( + clientId: String, + groupId: String, + rebalanceSafeCommits: Boolean + ): ZLayer[Kafka, Throwable, Consumer] = + ZLayer( + consumerSettings( + clientId = clientId, + groupId = Some(groupId), + properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "1") + ).map(_.withRebalanceSafeCommits(rebalanceSafeCommits)) + ) >>> minimalConsumer() + + for { + topic <- randomTopic + subscription = Subscription.topics(topic) + clientId1 <- randomClient + clientId2 <- randomClient + groupId <- randomGroup + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = partitionCount)) + // Produce one message to each partition every 500 ms + pFib <- ZStream + .fromSchedule(Schedule.fixed(500.millis)) + .mapZIO { i => + ZIO.foreachDiscard(0 until partitionCount) { p => + produceMany(topic, p, Seq((s"key-$p-$i", s"msg-$p-$i"))) + } + } + .runDrain + .fork + _ <- ZIO.logDebug("Starting consumer 1") + c1Started <- Promise.make[Nothing, Unit] + c1Keys <- Ref.make(Chunk.empty[String]) + fib1 <- ZIO + .logAnnotate("consumer", "1") { + // When the stream ends, the topic subscription ends as well. Because of that the consumer + // shuts down and commits are no longer possible. Therefore, we signal the second consumer in + // such a way that it doesn't close the stream. + Consumer + .plainStream(subscription, Serde.string, Serde.string) + .tap(record => ZIO.logDebug(s"Received ${record.key}")) + .tap { record => + // Signal consumer 2 can start when a record is seen for every partition. + for { + keys <- c1Keys.updateAndGet(_.appended(record.key)) + _ <- c1Started.succeed(()).when(keys.map(_.split('-')(1)).toSet.size == partitionCount) + } yield () + } + // Buffer so that the above can run ahead of the below, this is important; + // we want consumer 2 to start before consumer 1 commits. + .buffer(partitionCount) + .mapZIO { msg => + for { + _ <- ZIO.sleep(5.seconds) + _ <- ZIO.logDebug(s"Committing offset for key ${msg.key}") + _ <- msg.offset.commit + } yield msg.key + } + .take(partitionCount.toLong) + .runCollect + .map(_.toSet) + .provideSome[Kafka](makeConsumer(clientId1, groupId, true)) + } + .fork + _ <- c1Started.await + _ <- ZIO.logDebug("Starting consumer 2") + fib2 <- ZIO + .logAnnotate("consumer", "2") { + Consumer + .plainStream(subscription, Serde.string, Serde.string) + .tap(msg => ZIO.logDebug(s"Received ${msg.key}")) + .mapZIO(msg => msg.offset.commit.as(msg.key)) + .take(5) + .runCollect + .map(_.toSet) + .provideSome[Kafka](makeConsumer(clientId2, groupId, false)) + } + .fork + _ <- ZIO.logDebug("Waiting for consumers to end") + c2Keys: Set[String] <- fib2.join + _ <- ZIO.logDebug("Consumer 2 ready") + c1Keys: Set[String] <- fib1.join + _ <- ZIO.logDebug("Consumer 1 ready") + _ <- pFib.interrupt + } yield assertTrue((c1Keys intersect c2Keys).isEmpty) + } + + // Test for both default partition assignment strategies + Seq( + testForPartitionAssignmentStrategy[RangeAssignor], + testForPartitionAssignmentStrategy[CooperativeStickyAssignor] + ) + }: _*), test("partitions for topic doesn't fail if doesn't exist") { for { topic <- randomTopic diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala index 083d78ec01..69c8ff2abf 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala @@ -57,9 +57,9 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { } ) - private def makeCommit(offsets: Map[TopicPartition, Long]): RunloopCommand.Commit = { + private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = { val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) } val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None)) - RunloopCommand.Commit(o, p) + Runloop.Commit(o, p) } } diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 972e14577f..ead6dfe6fc 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -116,6 +116,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, maxPollInterval: Duration = 5.minutes, `max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, @@ -138,6 +139,7 @@ object KafkaTestUtils { ) .withOffsetRetrieval(offsetRetrieval) .withRestartStreamOnRebalancing(restartStreamOnRebalancing) + .withRebalanceSafeCommits(rebalanceSafeCommits) .withProperties(properties) val withClientInstanceId = clientInstanceId.fold(settings)(settings.withGroupInstanceId) @@ -154,6 +156,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, properties: Map[String, String] = Map.empty ): URIO[Kafka, ConsumerSettings] = consumerSettings( @@ -163,6 +166,7 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, properties = properties ) .map( @@ -202,6 +206,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, diagnostics: Diagnostics = Diagnostics.NoOp, restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, properties: Map[String, String] = Map.empty ): ZLayer[Kafka, Throwable, Consumer] = @@ -213,6 +218,7 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, properties = properties, commitTimeout = commitTimeout ) @@ -229,6 +235,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, diagnostics: Diagnostics = Diagnostics.NoOp, restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, properties: Map[String, String] = Map.empty, rebalanceListener: RebalanceListener = RebalanceListener.noop ): ZLayer[Kafka, Throwable, Consumer] = @@ -240,8 +247,9 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, properties = properties - ).map(_.withRebalanceListener(rebalanceListener)) + ).map(_.withRebalanceListener(rebalanceListener).withRebalanceSafeCommits(rebalanceSafeCommits)) ) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live /** diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 741990f393..3f21f9f8d7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -28,6 +28,7 @@ final case class ConsumerSettings( offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), rebalanceListener: RebalanceListener = RebalanceListener.noop, restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy() ) { private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match { @@ -154,6 +155,32 @@ final case class ConsumerSettings( def withRestartStreamOnRebalancing(value: Boolean): ConsumerSettings = copy(restartStreamOnRebalancing = value) + /** + * @param value + * Whether to hold up a rebalance until all offsets of consumed messages have been committed. The default is + * `false`, but the recommended value is `true` as it prevents duplicate messages. + * + * Use `false` _only_ when your streams don't commit, or when it is okay to have messages processed twice (possibly + * concurrently). + * + * When `true`, messages consumed from revoked partitions must be committed before we allow the rebalance to continue. + * + * When a partition is revoked, consuming the messages will be taken over by another consumer. The other consumer will + * continue from the committed offset. It is therefore important that this consumer commits offsets of all consumed + * messages. Therefore, by holding up the rebalance until these commits are done, we ensure that the new consumer will + * start from the correct offset. + * + * During a rebalance no new messages can be received _for any stream_. Therefore, _all_ streams are deprived of new + * messages until the revoked streams are ready committing. + * + * When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any + * offset commits from these streams have a high chance of being delayed (commits are not possible during some phases + * of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will + * start from an earlier offset. The result is that some messages are processed twice and concurrently. + */ + def withRebalanceSafeCommits(value: Boolean): ConsumerSettings = + copy(rebalanceSafeCommits = value) + def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings = withProperties(credentialsStore.properties) @@ -200,6 +227,6 @@ final case class ConsumerSettings( object ConsumerSettings { val defaultCommitTimeout: Duration = 15.seconds - def apply(bootstrapServers: List[String]) = + def apply(bootstrapServers: List[String]): ConsumerSettings = new ConsumerSettings().withBootstrapServers(bootstrapServers) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala index d302fb076f..3a00c04424 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala @@ -7,6 +7,9 @@ import scala.jdk.CollectionConverters._ /** * ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects. + * + * Note that the given ZIO effects are executed directly on the Kafka poll thread. Fork and shift to another executor + * when this is not desired. */ final case class RebalanceListener( onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit], diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index 3d2f2e3ddc..79f65a2c97 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -19,7 +19,7 @@ private[consumer] final class ConsumerAccess( def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] = access.withPermit(withConsumerNoPermit(f)) - private[consumer] def withConsumerNoPermit[R, A]( + private def withConsumerNoPermit[R, A]( f: ByteArrayKafkaConsumer => RIO[R, A] ): RIO[R, A] = ZIO @@ -31,10 +31,17 @@ private[consumer] final class ConsumerAccess( .flatMap(fib => fib.join.onInterrupt(ZIO.succeed(consumer.wakeup()) *> fib.interrupt)) /** - * Do not use this method outside of the Runloop + * Use this method only from Runloop. */ private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] = access.withPermit(f(consumer)) + + /** + * Use this method ONLY from the rebalance listener. + */ + private[internal] def rebalanceListenerAccess[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] = + withConsumerNoPermit(f) + } private[consumer] object ConsumerAccess { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index d982a3c01a..7b791b3889 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -22,12 +22,13 @@ import scala.jdk.CollectionConverters._ //noinspection SimplifyWhenInspection,SimplifyUnlessInspection private[consumer] final class Runloop private ( - runtime: Runtime[Any], + sameThreadRuntime: Runtime[Any], hasGroupId: Boolean, consumer: ConsumerAccess, pollTimeout: Duration, maxPollInterval: Duration, commitTimeout: Duration, + commitQueue: Queue[Commit], commandQueue: Queue[RunloopCommand], lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], @@ -35,11 +36,21 @@ private[consumer] final class Runloop private ( offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, + rebalanceSafeCommits: Boolean, currentStateRef: Ref[State], committedOffsetsRef: Ref[CommitOffsets], fetchStrategy: FetchStrategy ) { + /** + * Maximum time spent in the rebalance callback. + * + * In this time zio-kafka awaits processing of records and the completion of commits. + * + * We use 3/5 of `maxPollInterval` which by default calculates to 3 minutes. + */ + private val maxEndingStreamsInterval = (maxPollInterval.toNanos / 5L) * 3L + private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics, maxPollInterval) @@ -96,8 +107,8 @@ private[consumer] final class Runloop private ( state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams else Chunk.empty - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) - _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd)) + newCommits <- endStreams(state, streamsToEnd, awaitStreamCommits = rebalanceSafeCommits) + _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd, newCommits)) _ <- ZIO.logTrace("onAssigned done") } yield (), onRevoked = (revokedTps, _) => @@ -108,8 +119,8 @@ private[consumer] final class Runloop private ( state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams else state.assignedStreams.filter(control => revokedTps.contains(control.tp)) - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) - _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd)) + newCommits <- endStreams(state, streamsToEnd, awaitStreamCommits = rebalanceSafeCommits) + _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd, newCommits)) _ <- ZIO.logTrace("onRevoked done") } yield (), onLost = (lostTps, _) => @@ -128,19 +139,124 @@ private[consumer] final class Runloop private ( recordRebalanceRebalancingListener ++ userRebalanceListener } + /** + * End streams from the rebalance listener, optionally waiting for consumed offsets to be committed. + * + * @return + * all commits that were created while waiting, this can include commits from streams that were not ended + */ + private def endStreams( + state: State, + streamsToEnd: Chunk[PartitionStreamControl], + awaitStreamCommits: Boolean + ): Task[Chunk[Runloop.Commit]] = + for { + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + newCommits <- if (awaitStreamCommits) + consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, state, streamsToEnd)) + else ZIO.succeed(Chunk.empty) + } yield newCommits + + private def doAwaitStreamCommits( + consumer: ByteArrayKafkaConsumer, + state: State, + streamsToEnd: Chunk[PartitionStreamControl] + ): Task[Chunk[Runloop.Commit]] = { + // This method is called from the rebalance listener and therefore runs on the same-thread-runtime. This is because + // the Java kafka client requires us to invoke the consumer from the same thread that invoked the rebalance + // listener. + // Unfortunately the same-thread-runtime does not work for all ZIO operations. For example, `ZIO.timeout`, + // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work + // to another thread cannot be used. + + // Outline of this method: + // - Every 100ms until time is up: + // - Get all commits from the commit queue. + // - Start an async commit for these commits. + // - Collect all these new (pending) commits. + // - repeat the above until: + // - All streams that were ended have completed their work. + // - We have seen a completed or pending commit for all endOffsets. + // The end-offsets are those offsets for which commits need to be completed for a stream to be done. + // - Do a single sync commit without any offsets, this has the side-effect of blocking until all + // preceding async commits are complete (this requires kafka-client 3.6.0 or later). + // - Return the new commits so that they can be added to the runloop state. + + val deadline = java.lang.System.nanoTime() + maxEndingStreamsInterval - commitTimeout.toNanos + + val endingTps = streamsToEnd.map(_.tp).toSet + def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] = + commits.filter(commit => (commit.offsets.keySet intersect endingTps).nonEmpty) + val previousPendingCommits = commitsOfEndingStreams(state.pendingCommits) + + def doAsyncCommits(commits: Chunk[Commit]): UIO[Unit] = + if (commits.nonEmpty) { + val (offsets, callback, onFailure) = asyncCommitParameters(commits) + ZIO.logDebug(s"Async commit of ${offsets.size} offsets for ${commits.size} commits") *> + ZIO.attempt(consumer.commitAsync(offsets, callback)).catchAll(onFailure) + } else { + ZIO.unit + } + + // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. + // Instead, we poll the queue in a loop. + ZStream + .fromZIO(blockingSleep(100.millis) *> commitQueue.takeAll) + .tap(doAsyncCommits) + .forever + .takeWhile(_ => java.lang.System.nanoTime() <= deadline) + .scan(Chunk.empty[Runloop.Commit])(_ ++ _) + .takeUntilZIO { newCommits => + for { + streamResults: Chunk[(Boolean, Option[Offset])] <- + ZIO + .foreach(streamsToEnd) { stream => + for { + isDone <- stream.completedPromise.isDone + endOffset <- if (isDone) stream.completedPromise.await else ZIO.none + } yield (isDone, endOffset) + } + committedOffsets <- committedOffsetsRef.get + } yield { + val streamsCompleted = streamResults.forall(_._1) + def endOffsets: Chunk[Offset] = streamResults.flatMap(_._2) + val allPendingCommits = previousPendingCommits ++ commitsOfEndingStreams(newCommits) + streamsCompleted && endOffsets.forall { endOffset => + val tp = endOffset.topicPartition + val offset = endOffset.offset + committedOffsets.contains(tp, offset) || + allPendingCommits.exists { pendingCommit => + pendingCommit.offsets.get(tp).exists { pendingOffset => + pendingOffset.offset() >= offset + } + } + } + } + } + .tap { _ => + ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) + } + .runLast + .map(_.getOrElse(Chunk.empty)) + .ensuring { + ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + } + } + /** This is the implementation behind the user facing api `Offset.commit`. */ private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = offsets => for { p <- Promise.make[Throwable, Unit] - _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit + _ <- commitQueue.offer(Runloop.Commit(offsets, p)) + _ <- commandQueue.offer(RunloopCommand.CommitAvailable) _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) } yield () /** Merge commits and prepare parameters for calling `consumer.commitAsync`. */ private def asyncCommitParameters( - commits: Chunk[RunloopCommand.Commit] + commits: Chunk[Runloop.Commit] ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { val offsets = commits .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => @@ -165,7 +281,8 @@ private[consumer] final class Runloop private ( case _: RebalanceInProgressException => for { _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") - _ <- commandQueue.offerAll(commits) + _ <- commitQueue.offerAll(commits) + _ <- commandQueue.offer(RunloopCommand.CommitAvailable) } yield () case err: Throwable => cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) @@ -174,7 +291,7 @@ private[consumer] final class Runloop private ( new OffsetCommitCallback { override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = Unsafe.unsafe { implicit u => - runtime.unsafe.run { + sameThreadRuntime.unsafe.run { if (exception eq null) onSuccess else onFailure(exception) } .getOrThrowFiberFailure() @@ -183,7 +300,7 @@ private[consumer] final class Runloop private ( (offsetsWithMetaData.asJava, callback, onFailure) } - private def handleCommits(state: State, commits: Chunk[RunloopCommand.Commit]): UIO[State] = + private def handleCommits(state: State, commits: Chunk[Runloop.Commit]): UIO[State] = if (commits.isEmpty) { ZIO.succeed(state) } else { @@ -333,19 +450,20 @@ private[consumer] final class Runloop private ( ) } *> lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { - case RebalanceEvent(false, _, _, _, _) => + case RebalanceEvent(false, _, _, _, _, _) => // The fast track, rebalance listener was not invoked: - // no assignment changes, only new records. + // no assignment changes, no new commits, only new records. ZIO.succeed( PollResult( records = polledRecords, ignoreRecordsForTps = Set.empty, pendingRequests = state.pendingRequests, + pendingCommits = state.pendingCommits, assignedStreams = state.assignedStreams ) ) - case RebalanceEvent(true, assignedTps, revokedTps, lostTps, endedStreams) => + case RebalanceEvent(true, assignedTps, revokedTps, lostTps, endedStreams, newCommits) => // The slow track, the rebalance listener was invoked: // some partitions were assigned, revoked or lost, // some streams have ended. @@ -382,6 +500,8 @@ private[consumer] final class Runloop private ( !(lostTps.contains(tp) || revokedTps.contains(tp) || endedStreams.exists(_.tp == tp)) } + updatedPendingCommits = state.pendingCommits ++ newCommits + // Remove committed offsets for partitions that are no longer assigned: // NOTE: the type annotation is needed to keep the IntelliJ compiler happy. _ <- @@ -391,6 +511,7 @@ private[consumer] final class Runloop private ( records = polledRecords, ignoreRecordsForTps = ignoreRecordsForTps, pendingRequests = updatedPendingRequests, + pendingCommits = updatedPendingCommits, assignedStreams = updatedAssignedStreams ) } @@ -402,7 +523,7 @@ private[consumer] final class Runloop private ( pollResult.ignoreRecordsForTps, pollResult.records ) - updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending) + updatedPendingCommits <- ZIO.filter(pollResult.pendingCommits)(_.isPending) _ <- checkStreamPollInterval(pollResult.assignedStreams) } yield state.copy( pendingRequests = fulfillResult.pendingRequests, @@ -517,12 +638,12 @@ private[consumer] final class Runloop private ( case SubscriptionState.Subscribed(_, Subscription.Pattern(pattern)) => val rc = RebalanceConsumer.Live(c) ZIO - .attempt(c.subscribe(pattern.pattern, rebalanceListener.toKafka(runtime, rc))) + .attempt(c.subscribe(pattern.pattern, rebalanceListener.toKafka(sameThreadRuntime, rc))) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Topics(topics)) => val rc = RebalanceConsumer.Live(c) ZIO - .attempt(c.subscribe(topics.asJava, rebalanceListener.toKafka(runtime, rc))) + .attempt(c.subscribe(topics.asJava, rebalanceListener.toKafka(sameThreadRuntime, rc))) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Manual(topicPartitions)) => // For manual subscriptions we have to do some manual work before starting the run loop @@ -555,8 +676,11 @@ private[consumer] final class Runloop private ( .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { - _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") - commitCommands = commands.collect { case cmd: RunloopCommand.Commit => cmd } + commitCommands <- commitQueue.takeAll + _ <- ZIO.logDebug( + s"Processing ${commitCommands.size} commits," + + s" ${commands.size} commands: ${commands.mkString(",")}" + ) stateAfterCommits <- handleCommits(state, commitCommands) streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } stateAfterCommands <- ZIO.foldLeft(streamCommands)(stateAfterCommits)(handleCommand) @@ -599,6 +723,7 @@ object Runloop { records: ConsumerRecords[Array[Byte], Array[Byte]], ignoreRecordsForTps: Set[TopicPartition], pendingRequests: Chunk[RunloopCommand.Request], + pendingCommits: Chunk[Runloop.Commit], assignedStreams: Chunk[PartitionStreamControl] ) private final case class RevokeResult( @@ -614,20 +739,31 @@ object Runloop { assignedTps: Set[TopicPartition], revokedTps: Set[TopicPartition], lostTps: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] + endedStreams: Chunk[PartitionStreamControl], + newCommits: Chunk[Runloop.Commit] ) { - def onAssigned(assigned: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + def onAssigned( + assigned: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl], + newCommits: Chunk[Runloop.Commit] + ): RebalanceEvent = copy( wasInvoked = true, assignedTps = assignedTps ++ assigned, - endedStreams = this.endedStreams ++ endedStreams + endedStreams = this.endedStreams ++ endedStreams, + newCommits = this.newCommits ++ newCommits ) - def onRevoked(revoked: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + def onRevoked( + revoked: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl], + newCommits: Chunk[Runloop.Commit] + ): RebalanceEvent = copy( wasInvoked = true, revokedTps = revokedTps ++ revoked, - endedStreams = this.endedStreams ++ endedStreams + endedStreams = this.endedStreams ++ endedStreams, + newCommits = this.newCommits ++ newCommits ) def onLost(lost: Set[TopicPartition]): RebalanceEvent = @@ -635,7 +771,16 @@ object Runloop { } private object RebalanceEvent { - val None: RebalanceEvent = RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) + val None: RebalanceEvent = + RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty, Chunk.empty) + } + + private[internal] final case class Commit( + offsets: Map[TopicPartition, OffsetAndMetadata], + cont: Promise[Throwable, Unit] + ) { + @inline def isDone: UIO[Boolean] = cont.isDone + @inline def isPending: UIO[Boolean] = isDone.negate } private[consumer] def make( @@ -648,24 +793,27 @@ object Runloop { offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, + rebalanceSafeCommits: Boolean, partitionsHub: Hub[Take[Throwable, PartitionAssignment]], fetchStrategy: FetchStrategy ): URIO[Scope, Runloop] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) + commitQueue <- ZIO.acquireRelease(Queue.unbounded[Runloop.Commit])(_.shutdown) commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) committedOffsetsRef <- Ref.make(CommitOffsets.empty) - runtime <- ZIO.runtime[Any] + sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) runloop = new Runloop( - runtime = runtime, + sameThreadRuntime = sameThreadRuntime, hasGroupId = hasGroupId, consumer = consumer, pollTimeout = pollTimeout, maxPollInterval = maxPollInterval, commitTimeout = commitTimeout, + commitQueue = commitQueue, commandQueue = commandQueue, lastRebalanceEvent = lastRebalanceEvent, partitionsHub = partitionsHub, @@ -673,6 +821,7 @@ object Runloop { offsetRetrieval = offsetRetrieval, userRebalanceListener = userRebalanceListener, restartStreamsOnRebalancing = restartStreamsOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, currentStateRef = currentStateRef, committedOffsetsRef = committedOffsetsRef, fetchStrategy = fetchStrategy @@ -694,12 +843,12 @@ object Runloop { private final case class State( pendingRequests: Chunk[RunloopCommand.Request], - pendingCommits: Chunk[RunloopCommand.Commit], + pendingCommits: Chunk[Runloop.Commit], assignedStreams: Chunk[PartitionStreamControl], subscriptionState: SubscriptionState ) { - def addPendingCommits(c: Chunk[RunloopCommand.Commit]): State = copy(pendingCommits = pendingCommits ++ c) - def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) + def addPendingCommits(c: Chunk[Runloop.Commit]): State = copy(pendingCommits = pendingCommits ++ c) + def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) def shouldPoll: Boolean = subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) @@ -716,7 +865,7 @@ object Runloop { // package private for unit testing private[internal] final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { - def addCommits(c: Chunk[RunloopCommand.Commit]): CommitOffsets = { + def addCommits(c: Chunk[Runloop.Commit]): CommitOffsets = { val updatedOffsets = mutable.Map.empty[TopicPartition, Long] updatedOffsets.sizeHint(offsets.size) updatedOffsets ++= offsets @@ -731,6 +880,9 @@ object Runloop { def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) }) + + def contains(tp: TopicPartition, offset: Long): Boolean = + offsets.get(tp).exists(_ <= offset) } private[internal] object CommitOffsets { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 3cacf91642..de5e2b2e56 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -98,6 +98,7 @@ private[consumer] object RunloopAccess { offsetRetrieval = settings.offsetRetrieval, userRebalanceListener = settings.rebalanceListener, restartStreamsOnRebalancing = settings.restartStreamOnRebalancing, + rebalanceSafeCommits = settings.rebalanceSafeCommits, partitionsHub = partitionsHub, fetchStrategy = settings.fetchStrategy ) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala index a5259b2c27..be43f585cf 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala @@ -1,6 +1,5 @@ package zio.kafka.consumer.internal -import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import zio._ import zio.kafka.consumer.{ InvalidSubscriptionUnion, Subscription } @@ -17,15 +16,12 @@ object RunloopCommand { /** Used as a signal that another poll is needed. */ case object Poll extends Control + /** Used as a signal to the poll-loop that commits are available in the commit-queue. */ + case object CommitAvailable extends Control + case object StopRunloop extends Control case object StopAllStreams extends StreamCommand - final case class Commit(offsets: Map[TopicPartition, OffsetAndMetadata], cont: Promise[Throwable, Unit]) - extends RunloopCommand { - @inline def isDone: UIO[Boolean] = cont.isDone - @inline def isPending: UIO[Boolean] = isDone.negate - } - /** Used by a stream to request more records. */ final case class Request(tp: TopicPartition) extends StreamCommand diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala new file mode 100644 index 0000000000..b18fe95e17 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala @@ -0,0 +1,37 @@ +package zio.kafka.consumer + +import zio._ +import zio.internal.ExecutionMetrics + +package object internal { + + /** + * A runtime layer that can be used to run everything on the thread of the caller. + * + * Provided by Adam Fraser in Discord: + * https://discord.com/channels/629491597070827530/630498701860929559/1094279123880386590 but with cooperative + * yielding enabled. + * + * WARNING! Unfortunately some ZIO operations, like `ZIO.timeout`, inherently need to work multi-threaded and will + * therefore shift the fiber to another thread, even when this runtime is used. + */ + private[internal] val SameThreadRuntimeLayer: ZLayer[Any, Nothing, Unit] = { + val sameThreadExecutor = new Executor() { + override def metrics(implicit unsafe: Unsafe): Option[ExecutionMetrics] = None + + override def submit(runnable: Runnable)(implicit unsafe: Unsafe): Boolean = { + runnable.run() + true + } + } + + Runtime.setExecutor(sameThreadExecutor) ++ Runtime.setBlockingExecutor(sameThreadExecutor) + } + + /** + * A sleep that is safe to use from the same-thread-runtime. + */ + private[internal] def blockingSleep(sleepTime: Duration): Task[Unit] = + ZIO.attempt(Thread.sleep(sleepTime.toMillis)) + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala index 4285758180..b08b3f36e6 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala @@ -40,7 +40,7 @@ trait Transaction { def abort: IO[TransactionalProducer.UserInitiatedAbort.type, Nothing] } -final private[producer] class TransactionImpl( +private[producer] final class TransactionImpl( producer: Producer, private[producer] val offsetBatchRef: Ref[OffsetBatch], closed: Ref[Boolean]