diff --git a/benchmarks/src/main/scala/akka/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala b/benchmarks/src/main/scala/akka/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala index ba536f228..f3f200093 100644 --- a/benchmarks/src/main/scala/akka/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala +++ b/benchmarks/src/main/scala/akka/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala @@ -63,7 +63,7 @@ object ReactiveKafkaTransactionFixtures extends PerfFixtureHelpers { Transactional.source(consumerSettings, Subscriptions.topics(c.filledTopic.topic)) val producerSettings = createProducerSettings(c.kafkaHost).withEosCommitInterval(commitInterval) - val flow: Flow[KProducerMessage, KResult, NotUsed] = Transactional.flow(producerSettings, randomId()) + val flow: Flow[KProducerMessage, KResult, NotUsed] = Transactional.flow(producerSettings) ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult](c.filledTopic.topic, sinkTopic, diff --git a/build.sbt b/build.sbt index cdc5c3be6..a65b52893 100644 --- a/build.sbt +++ b/build.sbt @@ -110,7 +110,7 @@ val commonSettings = Def.settings( "11", "-Wconf:cat=feature:w,cat=deprecation&msg=.*JavaConverters.*:s,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w" ) ++ { - if (insideCI.value && !Nightly && scalaVersion.value != Scala3) Seq("-Werror") + if (scalaVersion.value != Scala3) Seq("-Werror") else Seq.empty }, Compile / doc / scalacOptions := scalacOptions.value ++ Seq( diff --git a/core/src/main/mima-filters/5.0.0.backwards.excludes b/core/src/main/mima-filters/5.0.0.backwards.excludes new file mode 100644 index 000000000..8a1982280 --- /dev/null +++ b/core/src/main/mima-filters/5.0.0.backwards.excludes @@ -0,0 +1,5 @@ +# ApiMayChange +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.flowWithOffsetContext") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.sinkWithOffsetContext") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.flowWithOffsetContext") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext") \ No newline at end of file diff --git a/core/src/main/scala/akka/kafka/ConsumerMessage.scala b/core/src/main/scala/akka/kafka/ConsumerMessage.scala index 951f5971d..9c62ef008 100644 --- a/core/src/main/scala/akka/kafka/ConsumerMessage.scala +++ b/core/src/main/scala/akka/kafka/ConsumerMessage.scala @@ -7,11 +7,10 @@ package akka.kafka import java.util.Objects import java.util.concurrent.CompletionStage - import akka.Done import akka.annotation.{DoNotInherit, InternalApi} import akka.kafka.internal.{CommittableOffsetBatchImpl, CommittedMarker} -import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, ConsumerRecord} import org.apache.kafka.common.TopicPartition import scala.concurrent.Future @@ -133,7 +132,8 @@ object ConsumerMessage { override val key: GroupTopicPartition, override val offset: Long, private[kafka] val committedMarker: CommittedMarker, - private[kafka] val fromPartitionedSource: Boolean + private[kafka] val fromPartitionedSource: Boolean, + requestConsumerGroupMetadata: () => Future[ConsumerGroupMetadata] ) extends PartitionOffset(key, offset) /** diff --git a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala index a661fc8bb..81a6f0acf 100644 --- a/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala @@ -71,11 +71,10 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: } override def preStart(): Unit = { - super.preStart() resolveProducer(stage.settings) } - private def checkForCompletion(): Unit = + protected def checkForCompletion(): Unit = if (isClosed(stage.in) && awaitingConfirmation == 0) { completionState match { case Some(Success(_)) => onCompletionSuccess() @@ -84,7 +83,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: } } - override def onCompletionSuccess(): Unit = completeStage() + override def onCompletionSuccess(): Unit = if (readyToShutdown()) completeStage() override def onCompletionFailure(ex: Throwable): Unit = failStage(ex) @@ -104,9 +103,16 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: protected def resumeDemand(tryToPull: Boolean = true): Unit = { log.debug("Resume demand") - setHandler(stage.out, new OutHandler { - override def onPull(): Unit = tryPull(stage.in) - }) + setHandler( + stage.out, + new OutHandler { + override def onPull(): Unit = tryPull(stage.in) + + override def onDownstreamFinish(cause: Throwable): Unit = { + super.onDownstreamFinish(cause) + } + } + ) // kick off demand for more messages if we're resuming demand if (tryToPull && isAvailable(stage.out) && !hasBeenPulled(stage.in)) { tryPull(stage.in) @@ -202,6 +208,9 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: override def postStop(): Unit = { log.debug("ProducerStage postStop") closeProducer() - super.postStop() } + + // Specifically for transactional producer that needs to defer shutdown to let an async task + // complete before actually shutting down + protected def readyToShutdown(): Boolean = true } diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index 1aa5592b8..d0865c61c 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -73,6 +73,10 @@ import scala.util.control.NonFatal /** Special case commit for non-batched committing. */ final case class CommitSingle(tp: TopicPartition, offsetAndMetadata: OffsetAndMetadata) extends NoSerializationVerificationNeeded + + // Used for transactions/EOS returns the current ConsumerGroupMetadata + case object GetConsumerGroupMetadata extends NoSerializationVerificationNeeded + //responses final case class Assigned(partition: List[TopicPartition]) extends NoSerializationVerificationNeeded final case class Revoked(partition: List[TopicPartition]) extends NoSerializationVerificationNeeded @@ -294,6 +298,9 @@ import scala.util.control.NonFatal stageActorsMap = stageActorsMap.filterNot(_._2 == ref) requests -= ref + case GetConsumerGroupMetadata => + sender() ! consumer.groupMetadata() + case req: Metadata.Request => sender() ! handleMetadataRequest(req) } diff --git a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala index 9739a735b..4ff9edb50 100644 --- a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala +++ b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala @@ -5,7 +5,6 @@ package akka.kafka.internal import java.util.concurrent.CompletionStage - import akka.Done import akka.annotation.InternalApi import akka.kafka.ConsumerMessage @@ -16,7 +15,7 @@ import akka.kafka.ConsumerMessage.{ TransactionalMessage, _ } -import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.OffsetFetchResponse @@ -46,6 +45,8 @@ private[kafka] trait TransactionalMessageBuilderBase[K, V, Msg] extends MessageB def onMessage(consumerMessage: ConsumerRecord[K, V]): Unit def fromPartitionedSource: Boolean + + def requestConsumerGroupMetadata(): Future[ConsumerGroupMetadata] } /** Internal API */ @@ -62,7 +63,8 @@ private[kafka] trait TransactionalMessageBuilder[K, V] ), offset = rec.offset, committedMarker, - fromPartitionedSource + fromPartitionedSource, + requestConsumerGroupMetadata _ ) ConsumerMessage.TransactionalMessage(rec, offset) } @@ -82,7 +84,8 @@ private[kafka] trait TransactionalOffsetContextBuilder[K, V] ), offset = rec.offset, committedMarker, - fromPartitionedSource + fromPartitionedSource, + requestConsumerGroupMetadata _ ) (rec, offset) } diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala index 77ba6fc14..2c952d1d5 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala @@ -7,6 +7,7 @@ package akka.kafka.internal import akka.Done import akka.annotation.InternalApi +import akka.dispatch.{Dispatchers, ExecutionContexts} import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffsetCommittedMarker} import akka.kafka.ProducerMessage.{Envelope, Results} import akka.kafka.internal.DeferredProducer._ @@ -18,9 +19,10 @@ import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, OffsetAndMetada import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.TopicPartition -import scala.concurrent.Future +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} /** * INTERNAL API @@ -54,7 +56,7 @@ private object TransactionalProducerStage { override def committingFailed(): Unit = {} } - final class NonemptyTransactionBatch(head: PartitionOffsetCommittedMarker, + final class NonemptyTransactionBatch(val head: PartitionOffsetCommittedMarker, tail: Map[GroupTopicPartition, Long] = Map[GroupTopicPartition, Long]()) extends TransactionBatch { // There is no guarantee that offsets adding callbacks will be called in any particular order. @@ -90,6 +92,10 @@ private object TransactionalProducerStage { } } + final case class CommitTransaction(batch: NonemptyTransactionBatch, + beginNewTransaction: Boolean, + consumerGroupMetadata: ConsumerGroupMetadata) + } /** @@ -109,15 +115,27 @@ private final class TransactionalProducerStageLogic[K, V, P]( private val commitSchedulerKey = "commit" private val messageDrainInterval = 10.milliseconds - private var batchOffsets = TransactionBatch.empty - private var demandSuspended = false + private var commitInProgress = false private var firstMessage: Option[Envelope[K, V, P]] = None + private val commitTransactionCB: Try[CommitTransaction] => Unit = + createAsyncCallback[Try[CommitTransaction]](commitTransaction _).invoke _ + + private val onInternalCommitAckCb: Try[Done] => Unit = + getAsyncCallback[Try[Done]] { maybeDone => + maybeDone match { + case Failure(ex) => log.debug("Internal commit failed: {}", ex) + case _ => + } + scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval) + }.invoke _ + override protected def logSource: Class[_] = classOf[TransactionalProducerStage[_, _, _]] + // FIXME no longer true // we need to peek at the first message to generate the producer transactional id for partitioned sources override def preStart(): Unit = resumeDemand() @@ -163,7 +181,17 @@ private final class TransactionalProducerStageLogic[K, V, P]( val awaitingConf = awaitingConfirmationValue batchOffsets match { case batch: NonemptyTransactionBatch if awaitingConf == 0 => - commitTransaction(batch, beginNewTransaction) + if (commitInProgress) { + // batch will end up in next commit + log.debug("Commit already in progress, ignoring request to commit") + } else { + commitInProgress = true + batch.head + .requestConsumerGroupMetadata() + .onComplete { consumerGroupMetadataTry => + commitTransactionCB(consumerGroupMetadataTry.map(CommitTransaction(batch, beginNewTransaction, _))) + }(ExecutionContexts.parasitic) + } case _: EmptyTransactionBatch if awaitingConf == 0 && abortEmptyTransactionOnComplete => abortTransaction("Transaction is empty and stage is completing") case _ if awaitingConf > 0 => @@ -177,6 +205,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( /** * When using partitioned sources we extract the transactional id, group id, and topic partition information from * the first message in order to define a `transacitonal.id` before constructing the [[org.apache.kafka.clients.producer.KafkaProducer]] + * FIXME this is probably no longer true */ private def parseFirstMessage(msg: Envelope[K, V, P]): Boolean = producerAssignmentLifecycle match { @@ -189,7 +218,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( firstMessage = Some(msg) // initiate async async producer request _after_ first message is stashed in case future eagerly resolves // instead of asynccallback - resolveProducer(generatedTransactionalConfig(msg)) + resolveProducer(generatedTransactionalConfig) // suspend demand after we receive the first message until the producer is assigned suspendDemand() false @@ -199,20 +228,14 @@ private final class TransactionalProducerStageLogic[K, V, P]( ) } - private def generatedTransactionalConfig(msg: Envelope[K, V, P]): ProducerSettings[K, V] = { - val txId = msg.passThrough match { - case committedMarker: PartitionOffsetCommittedMarker if committedMarker.fromPartitionedSource => - val gtp = committedMarker.key - val txId = s"$transactionalId-${gtp.groupId}-${gtp.topic}-${gtp.partition}" - log.debug("Generated transactional id from partitioned source '{}'", txId) - txId - case _ => transactionalId - } - + private def generatedTransactionalConfig: ProducerSettings[K, V] = { stage.settings.withProperties( ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString, - ProducerConfig.TRANSACTIONAL_ID_CONFIG -> txId, - ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString + ProducerConfig.TRANSACTIONAL_ID_CONFIG -> transactionalId, + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString, + // KIP-447: "We shall set `transaction.timout.ms` default to 10000 ms (10 seconds) on Kafka Streams. For non-stream users, + // we highly recommend you to do the same if you want to use the new semantics." + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG -> 10000.toString ) } @@ -223,46 +246,49 @@ private final class TransactionalProducerStageLogic[K, V, P]( override def onCompletionSuccess(): Unit = { log.debug("Committing final transaction before shutdown") + if (commitInProgress) + log.warning( + "Stage onCompleteSuccess with commit in flight, this is a bug, please report at https://github.com/akka/alpakka-kafka/issues" + ) cancelTimer(commitSchedulerKey) maybeCommitTransaction(beginNewTransaction = false, abortEmptyTransactionOnComplete = true) super.onCompletionSuccess() } override def onCompletionFailure(ex: Throwable): Unit = { - abortTransaction("Stage failure") + abortTransaction(s"Stage failure ($ex)") batchOffsets.committingFailed() super.onCompletionFailure(ex) } - private def commitTransaction(batch: NonemptyTransactionBatch, beginNewTransaction: Boolean): Unit = { - val group = batch.group - log.debug("Committing transaction for transactional id '{}' consumer group '{}' with offsets: {}", - transactionalId, - group, - batch.offsets) - val offsetMap = batch.offsetMap() - producer.sendOffsetsToTransaction(offsetMap.asJava, new ConsumerGroupMetadata(group)) - producer.commitTransaction() - log.debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}", - transactionalId, - group, - batch.offsets) - batchOffsets = TransactionBatch.empty - batch - .internalCommit() - .onComplete { _ => - onInternalCommitAckCb.invoke(()) - }(materializer.executionContext) - if (beginNewTransaction) { - beginTransaction() - resumeDemand() + private def commitTransaction(t: Try[CommitTransaction]): Unit = { + commitInProgress = false + t match { + case Failure(ex) => + failStage(new RuntimeException("Failed to fetch consumer group metadata", ex)) + case Success(CommitTransaction(batch, beginNewTransaction, consumerGroupMetadata)) => + log.debug("Committing transaction for transactional id '{}' consumer group '{}' with offsets: {}", + transactionalId, + consumerGroupMetadata, + batch.offsets) + val offsetMap = batch.offsetMap() + producer.sendOffsetsToTransaction(offsetMap.asJava, consumerGroupMetadata) + producer.commitTransaction() + log.debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}", + transactionalId, + consumerGroupMetadata, + batch.offsets) + batchOffsets = TransactionBatch.empty + batch + .internalCommit() + .onComplete(onInternalCommitAckCb)(ExecutionContexts.parasitic) + if (beginNewTransaction) { + beginTransaction() + resumeDemand() + } } - } - - private val onInternalCommitAckCb: AsyncCallback[Unit] = { - getAsyncCallback[Unit]( - _ => scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval) - ) + // in case stage wants to shut down but was waiting for commit to complete + checkForCompletion() } private def initTransactions(): Unit = { @@ -279,4 +305,14 @@ private final class TransactionalProducerStageLogic[K, V, P]( log.debug("Aborting transaction: {}", reason) if (producerAssignmentLifecycle == Assigned) producer.abortTransaction() } + + override protected def readyToShutdown(): Boolean = !commitInProgress + + override def postStop(): Unit = { + if (commitInProgress) + log.warning( + "Stage postStop with commit in flight, this is a bug, please report at https://github.com/akka/alpakka-kafka/issues" + ) + super.postStop() + } } diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala index fa7e4d372..c741cbc78 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala @@ -6,11 +6,11 @@ package akka.kafka.internal import java.util.Locale - import akka.{Done, NotUsed} import akka.actor.{ActorRef, Status, Terminated} import akka.actor.Status.Failure import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts import akka.kafka.ConsumerMessage.{PartitionOffset, TransactionalMessage} import akka.kafka.internal.KafkaConsumerActor.Internal.Revoked import akka.kafka.internal.SubSourceLogic._ @@ -22,10 +22,10 @@ import akka.stream.SourceShape import akka.stream.scaladsl.Source import akka.stream.stage.{AsyncCallback, GraphStageLogic} import akka.util.Timeout -import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata} import org.apache.kafka.common.{IsolationLevel, TopicPartition} -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{Await, ExecutionContext, Future} /** Internal API */ @@ -115,7 +115,7 @@ private[internal] abstract class TransactionalSourceLogic[K, V, Msg](shape: Sour case (sender, Committed(offsets)) => inFlightRecords.committed(offsets.iterator.map { case (k, v) => k -> (v.offset() - 1L) }.toMap) sender.tell(Done, sourceActor.ref) - case (sender, CommittingFailure) => { + case (_, CommittingFailure) => { log.info("Committing failed, resetting in flight offsets") inFlightRecords.reset() } @@ -127,20 +127,15 @@ private[internal] abstract class TransactionalSourceLogic[K, V, Msg](shape: Sour log.debug(s"Draining partitions {}", partitions) materializer.scheduleOnce( consumerSettings.drainingCheckInterval, - new Runnable { - override def run(): Unit = - sourceActor.ref.tell(Drain(partitions, ack.orElse(Some(sender)), msg), sourceActor.ref) - } + () => sourceActor.ref.tell(Drain(partitions, ack.orElse(Some(sender)), msg), sourceActor.ref) ) } } override val groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG) - override lazy val committedMarker: CommittedMarker = { - val ec = materializer.executionContext - CommittedMarkerRef(sourceActor.ref, consumerSettings.commitTimeout)(ec) - } + override lazy val committedMarker: CommittedMarker = + CommittedMarkerRef(sourceActor.ref, consumerSettings.commitTimeout) override def onMessage(rec: ConsumerRecord[K, V]): Unit = inFlightRecords.add(Map(new TopicPartition(rec.topic(), rec.partition()) -> rec.offset())) @@ -157,6 +152,8 @@ private[internal] abstract class TransactionalSourceLogic[K, V, Msg](shape: Sour override protected def addToPartitionAssignmentHandler( handler: PartitionAssignmentHandler ): PartitionAssignmentHandler = { + // FIXME this touches mutable internal stage fields (sourceActor, stageActor, consumerActor, subSources) from + // another thread (consumer actor) not thread safe val blockingRevokedCall = new PartitionAssignmentHandler { override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = () @@ -173,20 +170,27 @@ private[internal] abstract class TransactionalSourceLogic[K, V, Msg](shape: Sour onRevoke(lostTps, consumer) override def onStop(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = () + + private def waitForDraining(partitions: Set[TopicPartition]): Boolean = { + import akka.pattern.ask + implicit val timeout = Timeout(consumerSettings.commitTimeout) + try { + Await.result(ask(stageActor.ref, Drain(partitions, None, Drained)), timeout.duration) + true + } catch { + case t: Throwable => + false + } + } } new PartitionAssignmentHelpers.Chain(handler, blockingRevokedCall) } - private def waitForDraining(partitions: Set[TopicPartition]): Boolean = { + override def requestConsumerGroupMetadata(): Future[ConsumerGroupMetadata] = { import akka.pattern.ask - implicit val timeout = Timeout(consumerSettings.commitTimeout) - try { - Await.result(ask(stageActor.ref, Drain(partitions, None, Drained)), timeout.duration) - true - } catch { - case t: Throwable => - false - } + implicit val timeout: Timeout = 5.seconds // FIXME specific timeout config for this? + ask(consumerActor, KafkaConsumerActor.Internal.GetConsumerGroupMetadata)(timeout) + .mapTo[ConsumerGroupMetadata] } } @@ -240,6 +244,8 @@ private[kafka] final class TransactionalSubSource[K, V]( override protected def addToPartitionAssignmentHandler( handler: PartitionAssignmentHandler ): PartitionAssignmentHandler = { + // FIXME this touches mutable internal stage fields (sourceActor, stageActor, consumerActor, subSources) from + // another thread (consumer actor) not thread safe val blockingRevokedCall = new PartitionAssignmentHandler { override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = () @@ -293,14 +299,13 @@ private object TransactionalSourceLogic { final case class Committed(offsets: Map[TopicPartition, OffsetAndMetadata]) case object CommittingFailure - private[internal] final case class CommittedMarkerRef(sourceActor: ActorRef, commitTimeout: FiniteDuration)( - implicit ec: ExecutionContext - ) extends CommittedMarker { + private[internal] final case class CommittedMarkerRef(sourceActor: ActorRef, commitTimeout: FiniteDuration) + extends CommittedMarker { override def committed(offsets: Map[TopicPartition, OffsetAndMetadata]): Future[Done] = { import akka.pattern.ask sourceActor .ask(Committed(offsets))(Timeout(commitTimeout)) - .map(_ => Done) + .map(_ => Done)(ExecutionContexts.parasitic) } override def failed(): Unit = @@ -370,13 +375,15 @@ private final class TransactionalSubSourceStageLogic[K, V]( private val inFlightRecords = InFlightRecords.empty + override lazy val committedMarker: CommittedMarker = + CommittedMarkerRef(subSourceActor.ref, consumerSettings.commitTimeout) + + override val fromPartitionedSource: Boolean = true override def groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG) override def onMessage(rec: ConsumerRecord[K, V]): Unit = inFlightRecords.add(Map(new TopicPartition(rec.topic(), rec.partition()) -> rec.offset())) - override val fromPartitionedSource: Boolean = true - override protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = super.messageHandling.orElse(drainHandling).orElse { case (_, Revoked(tps)) => @@ -423,20 +430,20 @@ private final class TransactionalSubSourceStageLogic[K, V]( log.debug(s"Draining partitions {}", partitions) materializer.scheduleOnce( consumerSettings.drainingCheckInterval, - new Runnable { - override def run(): Unit = - subSourceActor.ref.tell(Drain(partitions, ack.orElse(Some(sender)), msg), stageActor.ref) - } + () => subSourceActor.ref.tell(Drain(partitions, ack.orElse(Some(sender)), msg), stageActor.ref) ) } case (sender, DrainingComplete) => completeStage() } - lazy val committedMarker: CommittedMarker = { - val ec = materializer.executionContext - CommittedMarkerRef(subSourceActor.ref, consumerSettings.commitTimeout)(ec) + override def requestConsumerGroupMetadata(): Future[ConsumerGroupMetadata] = { + implicit val timeout: Timeout = 5.seconds // FIXME specific timeout config for this? + akka.pattern + .ask(consumerActor, KafkaConsumerActor.Internal.GetConsumerGroupMetadata)(timeout) + .mapTo[ConsumerGroupMetadata] } + } private object TransactionalSubSourceStageLogic { diff --git a/core/src/main/scala/akka/kafka/javadsl/Transactional.scala b/core/src/main/scala/akka/kafka/javadsl/Transactional.scala index 50a9f8915..6976a9d74 100644 --- a/core/src/main/scala/akka/kafka/javadsl/Transactional.scala +++ b/core/src/main/scala/akka/kafka/javadsl/Transactional.scala @@ -83,6 +83,19 @@ object Transactional { * Sink that is aware of the [[ConsumerMessage.TransactionalMessage.partitionOffset]] from a [[Transactional.source]]. It will * initialize, begin, produce, and commit the consumer offset as part of a transaction. */ + def sink[K, V, IN <: Envelope[K, V, ConsumerMessage.PartitionOffset]]( + settings: ProducerSettings[K, V] + ): Sink[IN, CompletionStage[Done]] = + scaladsl.Transactional + .sink(settings) + .mapMaterializedValue(_.toJava) + .asJava + + /** + * Sink that is aware of the [[ConsumerMessage.TransactionalMessage.partitionOffset]] from a [[Transactional.source]]. It will + * initialize, begin, produce, and commit the consumer offset as part of a transaction. + */ + @deprecated("Use the 'sink' factory method without a transactionalId parameter") def sink[K, V, IN <: Envelope[K, V, ConsumerMessage.PartitionOffset]]( settings: ProducerSettings[K, V], transactionalId: String @@ -100,13 +113,12 @@ object Transactional { */ @ApiMayChange def sinkWithOffsetContext[K, V]( - settings: ProducerSettings[K, V], - transactionalId: String + settings: ProducerSettings[K, V] ): Sink[Pair[Envelope[K, V, NotUsed], PartitionOffset], CompletionStage[Done]] = akka.stream.scaladsl .Flow[Pair[Envelope[K, V, NotUsed], PartitionOffset]] .map(_.toScala) - .toMat(scaladsl.Transactional.sinkWithOffsetContext(settings, transactionalId))(akka.stream.scaladsl.Keep.right) + .toMat(scaladsl.Transactional.sinkWithOffsetContext(settings))(akka.stream.scaladsl.Keep.right) .mapMaterializedValue(_.toJava) .asJava @@ -115,6 +127,17 @@ object Transactional { * emits a [[ConsumerMessage.TransactionalMessage]]. The flow requires a unique `transactional.id` across all app * instances. The flow will override producer properties to enable Kafka exactly-once transactional support. */ + def flow[K, V, IN <: Envelope[K, V, ConsumerMessage.PartitionOffset]]( + settings: ProducerSettings[K, V] + ): Flow[IN, Results[K, V, ConsumerMessage.PartitionOffset], NotUsed] = + scaladsl.Transactional.flow(settings).asJava + + /** + * Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that + * emits a [[ConsumerMessage.TransactionalMessage]]. The flow requires a unique `transactional.id` across all app + * instances. The flow will override producer properties to enable Kafka exactly-once transactional support. + */ + @deprecated("Use the 'flow' factory method without a transactionalId parameter") def flow[K, V, IN <: Envelope[K, V, ConsumerMessage.PartitionOffset]]( settings: ProducerSettings[K, V], transactionalId: String @@ -133,12 +156,11 @@ object Transactional { */ @ApiMayChange def flowWithOffsetContext[K, V]( - settings: ProducerSettings[K, V], - transactionalId: String + settings: ProducerSettings[K, V] ): FlowWithContext[Envelope[K, V, NotUsed], ConsumerMessage.PartitionOffset, Results[K, V, ConsumerMessage.PartitionOffset], ConsumerMessage.PartitionOffset, NotUsed] = - scaladsl.Transactional.flowWithOffsetContext(settings, transactionalId).asJava + scaladsl.Transactional.flowWithOffsetContext(settings).asJava } diff --git a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala index 4892f0512..7f1983839 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala @@ -22,6 +22,8 @@ import akka.{Done, NotUsed} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition +import java.util.UUID +import scala.annotation.nowarn import scala.concurrent.Future /** @@ -76,12 +78,23 @@ object Transactional { * Sink that is aware of the [[ConsumerMessage.TransactionalMessage.partitionOffset]] from a [[Transactional.source]]. It will * initialize, begin, produce, and commit the consumer offset as part of a transaction. */ + @nowarn("msg=deprecated") + @deprecated("Use the 'sink' factory method without a transactionalId parameter") def sink[K, V]( settings: ProducerSettings[K, V], transactionalId: String ): Sink[Envelope[K, V, ConsumerMessage.PartitionOffset], Future[Done]] = flow(settings, transactionalId).toMat(Sink.ignore)(Keep.right) + /** + * Sink that is aware of the [[ConsumerMessage.TransactionalMessage.partitionOffset]] from a [[Transactional.source]]. It will + * initialize, begin, produce, and commit the consumer offset as part of a transaction. + */ + def sink[K, V]( + settings: ProducerSettings[K, V] + ): Sink[Envelope[K, V, ConsumerMessage.PartitionOffset], Future[Done]] = + flow(settings).toMat(Sink.ignore)(Keep.right) + /** * API MAY CHANGE * @@ -90,20 +103,31 @@ object Transactional { */ @ApiMayChange def sinkWithOffsetContext[K, V]( - settings: ProducerSettings[K, V], - transactionalId: String + settings: ProducerSettings[K, V] ): Sink[(Envelope[K, V, NotUsed], PartitionOffset), Future[Done]] = - sink(settings, transactionalId) + sink(settings) .contramap { case (env, offset) => env.withPassThrough(offset) } + /** + * Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that + * emits a [[ConsumerMessage.TransactionalMessage]]. + * The flow will override producer properties to enable Kafka exactly-once transactional support. + */ + @nowarn("msg=deprecated") + def flow[K, V]( + settings: ProducerSettings[K, V] + ): Flow[Envelope[K, V, ConsumerMessage.PartitionOffset], Results[K, V, ConsumerMessage.PartitionOffset], NotUsed] = + flow(settings, UUID.randomUUID().toString) + /** * Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that * emits a [[ConsumerMessage.TransactionalMessage]]. The flow requires a unique `transactional.id` across all app * instances. The flow will override producer properties to enable Kafka exactly-once transactional support. */ + @deprecated("Use the 'flow' factory without a transactionalId parameter") def flow[K, V]( settings: ProducerSettings[K, V], transactionalId: String @@ -132,15 +156,14 @@ object Transactional { */ @ApiMayChange def flowWithOffsetContext[K, V]( - settings: ProducerSettings[K, V], - transactionalId: String + settings: ProducerSettings[K, V] ): FlowWithContext[Envelope[K, V, NotUsed], ConsumerMessage.PartitionOffset, Results[K, V, ConsumerMessage.PartitionOffset], ConsumerMessage.PartitionOffset, NotUsed] = { val noContext: Flow[Envelope[K, V, PartitionOffset], Results[K, V, PartitionOffset], NotUsed] = - flow(settings, transactionalId) + flow(settings) noContext .asFlowWithContext[Envelope[K, V, NotUsed], PartitionOffset, PartitionOffset]({ case (env, c) => env.withPassThrough(c) diff --git a/docs/src/main/paradox/transactions.md b/docs/src/main/paradox/transactions.md index acb016694..31121ef18 100644 --- a/docs/src/main/paradox/transactions.md +++ b/docs/src/main/paradox/transactions.md @@ -3,7 +3,7 @@ project.description: Alpakka has support for Kafka Transactions which provide gu --- # Transactions -Kafka Transactions provide guarantees that messages processed in a consume-transform-produce workflow (consumed from a source topic, transformed, and produced to a destination topic) are processed exactly once or not at all. This is achieved through coordination between the Kafka consumer group coordinator, transaction coordinator, and the consumer and producer clients used in the user application. The Kafka producer marks messages that are consumed from the source topic as "committed" only once the transformed messages are successfully produced to the sink. +Kafka Transactions provide guarantees that messages processed in a consume-transform-produce workflow (consumed from a source topic, transformed, and produced to a destination topic) are processed exactly once or not at all. This is achieved through coordination between the Kafka consumer group coordinator, transaction coordinator, and the consumer and producer clients used in the user application. The Kafka producer marks messages that are consumed from the source topic as "committed" only once the transformed messages are successfully produced to the sink. For full details on how transactions are achieved in Kafka you may wish to review the Kafka Improvement Proposal [KIP-98: Exactly Once Delivery and Transactional Messaging](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging) and its associated [design document](https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.xq0ee1vnpz4o). @@ -55,18 +55,17 @@ For more details see [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA The @apidoc[Transactional.sink](Transactional$) is similar to the @apidoc[Producer.committableSink](Producer$) in that messages will be automatically committed as part of a transaction. The @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$) are required when connecting a consumer to a producer to achieve a transactional workflow. They override producer properties `enable.idempotence` to `true` and `max.in.flight.requests.per.connection` to `1` as required by the Kafka producer to enable transactions. - -A `transactional.id` must be defined and unique for each instance of the application. +The `transaction.timeout.ms` is set to 10s as recommended in [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics). ## Consume-Transform-Produce Workflow -Kafka transactions are handled transparently to the user. The @apidoc[Transactional.source](Transactional$) will enforce that a consumer group id is specified and the @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$) will enforce that a `transactional.id` is specified. All other Kafka consumer and producer properties required to enable transactions are overridden. +Kafka transactions are handled transparently to the user. The @apidoc[Transactional.source](Transactional$) will enforce that a consumer group id is specified. All other Kafka consumer and producer properties required to enable transactions are overridden. Transactions are committed on an interval which can be controlled with the producer config `akka.kafka.producer.eos-commit-interval`, similar to how exactly once works with Kafka Streams. The default value is `100ms`. The larger commit interval is the more records will need to be reprocessed in the event of failure and the transaction is aborted. -When the stream is materialized the producer will initialize the transaction for the provided `transactional.id` and a transaction will begin. Every commit interval (`eos-commit-interval`) we check if there are any offsets available to commit. If offsets exist then we suspend backpressured demand while we drain all outstanding messages that have not yet been successfully acknowledged (if any) and then commit the transaction. After the commit succeeds a new transaction is begun and we re-initialize demand for upstream messages. +When the stream is materialized and the producer sees the first message it will initialize a transaction. Every commit interval (`eos-commit-interval`) we check if there are any offsets available to commit. If offsets exist then we suspend backpressured demand while we drain all outstanding messages that have not yet been successfully acknowledged (if any) and then commit the transaction. After the commit succeeds a new transaction is begun and we re-initialize demand for upstream messages. -Messages are also drained from the stream when the consumer gets a rebalance of partitions. In that case, the consumer will wait in the `onPartitionsRevoked` callback until all of the messages have been drained from the stream and the transaction is committed before allowing the rebalance to continue. The amount of total time the consumer will wait for draining is controlled by the `akka.kafka.consumer.commit-timeout`, and the interval between checks is controlled by the `akka.kafka.consuner.eos-draining-check-interval` configuration settings. +Messages are also drained from the stream when the consumer gets a rebalance of partitions. In that case, the consumer will wait in the `onPartitionsRevoked` callback until all of the messages have been drained from the stream and the transaction is committed before allowing the rebalance to continue. The amount of total time the consumer will wait for draining is controlled by the `akka.kafka.consumer.commit-timeout`, and the interval between checks is controlled by the `akka.kafka.consumer.eos-draining-check-interval` configuration settings. To gracefully shutdown the stream and commit the current transaction you must call `shutdown()` on the @apidoc[(javadsl|scaladsl).Consumer.Control] materialized value to await all produced message acknowledgements and commit the final transaction. @@ -107,15 +106,13 @@ There are several scenarios that this library's implementation of Kafka transact All of the scenarios covered in the @ref[At-Least-Once Delivery documentation](atleastonce.md) (Multiple Effects per Commit, Non-Sequential Processing, and Conditional Message Processing) are applicable to transactional scenarios as well. -Only one application instance per `transactional.id` is allowed. If two application instances with the same `transactional.id` are run at the same time then the instance that registers with Kafka's transaction coordinator second will throw a @javadoc[ProducerFencedException](org.apache.kafka.common.errors.ProducerFencedException) so it doesn't interfere with transactions in process by the first instance. To distribute multiple transactional workflows for the same subscription the user must manually subdivide the subscription across multiple instances of the application. This may be handled internally in future versions. - Any state in the transformation logic is not part of a transaction. It's left to the user to rebuild state when applying stateful operations with transaction. It's possible to encode state into messages produced to topics during a transaction. For example you could produce messages to a topic that represents an event log as part of a transaction. This event log can be replayed to reconstitute the correct state before the stateful stream resumes consuming again at startup. -Any side effects that occur in the transformation logic is not part of a transaction (i.e. writes to an database). +Any side effects that occur in the transformation logic is not part of a transaction (i.e. writes to a database). The exactly-once-semantics are guaranteed only when your flow consumes from and produces to the same Kafka cluster. Producing to partitions from a 3rd-party source or consuming partitions from one Kafka cluster and producing to another Kafka cluster are not supported. diff --git a/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala b/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala index ff11ce6ca..8b37b92f2 100644 --- a/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala +++ b/testkit/src/main/scala/akka/kafka/testkit/internal/KafkaTestKit.scala @@ -72,11 +72,13 @@ trait KafkaTestKit { /** * Return a unique transactional id with a default suffix. */ + @deprecated("Use flows and sinks that does not require an explicit transaction id") def createTransactionalId(): String = createTransactionalId(0) /** * Return a unique transactional id with a given suffix. */ + @deprecated("Use flows and sinks that does not require an explicit transaction id") def createTransactionalId(suffix: Int): String = s"transactionalId-$suffix-${nextNumber()}" def system: ActorSystem diff --git a/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala b/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala index a62788584..e039aa6cb 100644 --- a/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala +++ b/tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala @@ -59,7 +59,6 @@ class TransactionsPartitionedSourceSpec val sourceTopic = createTopic(1, sourcePartitions, replication) val sinkTopic = createTopic(2, destinationPartitions, replication) val group = createGroupId(1) - val transactionalId = createTransactionalId() val elements = 100 * 1000 // 100 * 1,000 = 100,000 val restartAfter = (10 * 1000) / sourcePartitions // (10 * 1,000) / 10 = 100 @@ -76,7 +75,7 @@ class TransactionsPartitionedSourceSpec val completedCopy = new AtomicInteger(0) val completedWithTimeout = new AtomicInteger(0) - def runStream(id: String): UniqueKillSwitch = + def runStream(id: String): UniqueKillSwitch = { RestartSource .onFailuresWithBackoff(RestartSettings(10.millis, 100.millis, 0.2))( () => { @@ -85,7 +84,6 @@ class TransactionsPartitionedSourceSpec txProducerDefaults, sourceTopic, sinkTopic, - transactionalId, idleTimeout = 10.seconds, maxPartitions = sourcePartitions, restartAfter = Some(restartAfter), @@ -106,6 +104,7 @@ class TransactionsPartitionedSourceSpec case Failure(_) => // restart })(Keep.left) .run() + } val controls: Seq[UniqueKillSwitch] = (0 until consumers) .map(_.toString) diff --git a/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala b/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala index 3b8260686..1f78753d2 100644 --- a/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala +++ b/tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala @@ -79,26 +79,22 @@ class TransactionsSourceSpec def runStream(id: String): UniqueKillSwitch = RestartSource - .onFailuresWithBackoff(RestartSettings(10.millis, 100.millis, 0.2))( - () => { - val transactionId = s"$group-$id" - transactionalCopyStream(consumerSettings, - txProducerDefaults, - sourceTopic, - sinkTopic, - transactionId, - 10.seconds, - Some(restartAfter), - Some(maxRestarts)) - .recover { - case e: TimeoutException => - if (completedWithTimeout.incrementAndGet() > 10) - "no more messages to copy" - else - throw new Error("Continue restarting copy stream") - } - } - ) + .onFailuresWithBackoff(RestartSettings(10.millis, 100.millis, 0.2)) { () => + transactionalCopyStream(consumerSettings, + txProducerDefaults, + sourceTopic, + sinkTopic, + 10.seconds, + Some(restartAfter), + Some(maxRestarts)) + .recover { + case e: TimeoutException => + if (completedWithTimeout.incrementAndGet() > 10) + "no more messages to copy" + else + throw new Error("Continue restarting copy stream") + } + } .viaMat(KillSwitches.single)(Keep.right) .toMat(Sink.onComplete { case Success(_) => @@ -158,7 +154,7 @@ class TransactionsSourceSpec val maxBufferSize = 16 // must be power of two Await.result(produce(sourceTopic, 1 to elements), remainingOrDefault) - val elementsWrote = new AtomicInteger(0) + val elementsWritten = new AtomicInteger(0) val consumerSettings = consumerDefaults.withGroupId(group).withStopTimeout(0.seconds) @@ -173,8 +169,8 @@ class TransactionsSourceSpec .take(batchSize.toLong) .delay(3.seconds, strategy = DelayOverflowStrategy.backpressure) .addAttributes(Attributes.inputBuffer(10, maxBufferSize)) - .via(Transactional.flow(producerDefaults, s"$group-$id")) - .map(_ => elementsWrote.incrementAndGet()) + .via(Transactional.flow(producerDefaults)) + .map(_ => elementsWritten.incrementAndGet()) .toMat(Sink.ignore)(Keep.left) .run() control @@ -187,8 +183,8 @@ class TransactionsSourceSpec val probeConsumerGroup = createGroupId(2) val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) - periodicalCheck("Wait for elements written to Kafka", maxTries = 30, 1.second) { () => - elementsWrote.get() + periodicalCheck("Wait for elements written to Kafka", maxTries = 60, 1.second) { () => + elementsWritten.get() }(_ > 10) probeConsumer diff --git a/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java b/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java index cd1153db0..88b4d2887 100644 --- a/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java +++ b/tests/src/test/java/docs/javadsl/TransactionsExampleTest.java @@ -66,7 +66,6 @@ public void sourceSink() throws Exception { consumerDefaults().withGroupId(createGroupId()); String sourceTopic = createTopic(1); String targetTopic = createTopic(2); - String transactionalId = createTransactionalId(); // #transactionalSink Consumer.DrainingControl control = Transactional.source(consumerSettings, Subscriptions.topics(sourceTopic)) @@ -76,9 +75,7 @@ public void sourceSink() throws Exception { ProducerMessage.single( new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()), msg.partitionOffset())) - .toMat( - Transactional.sink(producerSettings, transactionalId), - Consumer::createDrainingControl) + .toMat(Transactional.sink(producerSettings), Consumer::createDrainingControl) .run(system); // ... @@ -102,7 +99,6 @@ public void withOffsetContext() throws Exception { consumerDefaults().withGroupId(createGroupId()); String sourceTopic = createTopic(1); String targetTopic = createTopic(2); - String transactionalId = createTransactionalId(); Consumer.DrainingControl control = Transactional.sourceWithOffsetContext(consumerSettings, Subscriptions.topics(sourceTopic)) .via(business()) @@ -111,7 +107,7 @@ record -> ProducerMessage.single( new ProducerRecord<>(targetTopic, record.key(), record.value()))) .toMat( - Transactional.sinkWithOffsetContext(producerSettings, transactionalId), + Transactional.sinkWithOffsetContext(producerSettings), Consumer::createDrainingControl) .run(system); @@ -131,7 +127,6 @@ public void usingRestartSource() throws Exception { consumerDefaults().withGroupId(createGroupId()); String sourceTopic = createTopic(1); String targetTopic = createTopic(2); - String transactionalId = createTransactionalId(); // #transactionalFailureRetry AtomicReference innerControl = new AtomicReference<>(Consumer.createNoopControl()); @@ -159,7 +154,7 @@ public void usingRestartSource() throws Exception { innerControl.set(control); return control; }) - .via(Transactional.flow(producerSettings, transactionalId))); + .via(Transactional.flow(producerSettings))); CompletionStage streamCompletion = stream.runWith(Sink.ignore(), system); diff --git a/tests/src/test/scala/akka/kafka/TransactionsOps.scala b/tests/src/test/scala/akka/kafka/TransactionsOps.scala index e89cb7a38..e708b76a8 100644 --- a/tests/src/test/scala/akka/kafka/TransactionsOps.scala +++ b/tests/src/test/scala/akka/kafka/TransactionsOps.scala @@ -31,7 +31,6 @@ trait TransactionsOps extends TestSuite with Matchers { producerSettings: ProducerSettings[String, String], sourceTopic: String, sinkTopic: String, - transactionalId: String, idleTimeout: FiniteDuration, restartAfter: Option[Int] = None, maxRestarts: Option[AtomicInteger] = None @@ -49,7 +48,7 @@ trait TransactionsOps extends TestSuite with Matchers { .map { msg => ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset) } - .via(Transactional.flow(producerSettings, transactionalId)) + .via(Transactional.flow(producerSettings)) /** * Copy messages from a source to sink topic. Source and sink must have exactly the same number of partitions. @@ -59,7 +58,6 @@ trait TransactionsOps extends TestSuite with Matchers { producerSettings: ProducerSettings[String, String], sourceTopic: String, sinkTopic: String, - transactionalId: String, idleTimeout: FiniteDuration, maxPartitions: Int, restartAfter: Option[Int] = None, @@ -86,7 +84,7 @@ trait TransactionsOps extends TestSuite with Matchers { msg.record.value), msg.partitionOffset) } - .via(Transactional.flow(producerSettings, transactionalId)) + .via(Transactional.flow(producerSettings)) results } ) @@ -196,7 +194,7 @@ trait TransactionsOps extends TestSuite with Matchers { val expectedValues: immutable.Seq[String] = (1 to elements).map(_.toString) for (partition <- 0 until maxPartitions) { - println(s"Asserting values for partition: $partition") + // println(s"Asserting values for partition: $partition") val partitionMessages: immutable.Seq[String] = values.filter(_._1 == partition).map { case (_, _, value) => value } diff --git a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala index d955f83f1..82926d15b 100644 --- a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala @@ -33,11 +33,17 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers +import java.util.Optional import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} +object ProducerSpec { + val group = "group" + val consumerGroupMetadata = new ConsumerGroupMetadata(group, 1, "memberId", Optional.of("groupInstanceId")) +} + class ProducerSpec(_system: ActorSystem) extends TestKit(_system) with AnyFlatSpecLike @@ -57,8 +63,6 @@ class ProducerSpec(_system: ActorSystem) implicit val ec: ExecutionContext = _system.dispatcher - private val group = "group" - type K = String type V = String type Record = ProducerRecord[K, V] @@ -72,12 +76,13 @@ class ProducerSpec(_system: ActorSystem) def toMessage(tuple: (Record, RecordMetadata)) = Message(tuple._1, NotUsed) private[kafka] def toTxMessage(tuple: (Record, RecordMetadata), committer: CommittedMarker) = { val consumerMessage = ConsumerMessage - .PartitionOffset(GroupTopicPartition(group, tuple._1.topic(), 1), tuple._2.offset()) + .PartitionOffset(GroupTopicPartition(ProducerSpec.group, tuple._1.topic(), 1), tuple._2.offset()) val partitionOffsetCommittedMarker = PartitionOffsetCommittedMarker(consumerMessage.key, consumerMessage.offset, committer, - fromPartitionedSource = false) + fromPartitionedSource = false, + () => Future.successful(ProducerSpec.consumerGroupMetadata)) ProducerMessage.Message( tuple._1, partitionOffsetCommittedMarker @@ -632,7 +637,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, V])(implicit ec: Execu def verifyTxCommit(po: ConsumerMessage.PartitionOffset) = { val inOrder = Mockito.inOrder(mock) val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) -> new OffsetAndMetadata(po.offset + 1)).asJava - inOrder.verify(mock).sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(po.key.groupId)) + inOrder.verify(mock).sendOffsetsToTransaction(offsets, ProducerSpec.consumerGroupMetadata) inOrder.verify(mock).commitTransaction() inOrder.verify(mock).beginTransaction() } @@ -640,7 +645,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, V])(implicit ec: Execu def verifyTxCommitWhenShutdown(po: ConsumerMessage.PartitionOffset) = { val inOrder = Mockito.inOrder(mock) val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) -> new OffsetAndMetadata(po.offset + 1)).asJava - inOrder.verify(mock).sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(po.key.groupId)) + inOrder.verify(mock).sendOffsetsToTransaction(offsets, ProducerSpec.consumerGroupMetadata) inOrder.verify(mock).commitTransaction() } diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala index d162cd49d..83ad0f31f 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala @@ -6,15 +6,15 @@ package akka.kafka.scaladsl import java.util.concurrent.atomic.AtomicBoolean - import akka.Done import akka.kafka.ConsumerMessage.PartitionOffset import akka.kafka.scaladsl.Consumer.{Control, DrainingControl} import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike import akka.kafka.{ProducerMessage, _} -import akka.stream.{OverflowStrategy, RestartSettings} +import akka.stream.{OverflowStrategy, QueueOfferResult, RestartSettings} import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source} import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped +import akka.testkit.TestException import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.scalatest.RecoverMethods._ @@ -38,7 +38,7 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa val consumerSettings = consumerDefaults.withGroupId(group) val control = - transactionalCopyStream(consumerSettings, txProducerDefaults, sourceTopic, sinkTopic, group, 10.seconds) + transactionalCopyStream(consumerSettings, txProducerDefaults, sourceTopic, sinkTopic, 10.seconds) .toMat(Sink.ignore)(Keep.left) .run() @@ -73,7 +73,7 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset) } } - .via(Transactional.flow(producerDefaults, group)) + .via(Transactional.flow(producerDefaults)) .toMat(Sink.ignore)(Keep.left) .run() @@ -99,8 +99,8 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa val consumerSettings = consumerDefaults.withGroupId(group) - var restartCount = 0 - var innerControl = null.asInstanceOf[Control] + @volatile var restartCount = 0 + @volatile var innerControl = null.asInstanceOf[Control] val restartSource = RestartSource.onFailuresWithBackoff( RestartSettings(minBackoff = 0.1.seconds, maxBackoff = 1.seconds, randomFactor = 0.2) @@ -123,7 +123,7 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa } // side effect out the `Control` materialized value because it can't be propagated through the `RestartSource` .mapMaterializedValue(innerControl = _) - .via(Transactional.flow(producerDefaults, group)) + .via(Transactional.flow(producerDefaults)) } restartSource.runWith(Sink.ignore) @@ -150,8 +150,8 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa val consumerSettings = consumerDefaults.withGroupId(group) - var restartCount = 0 - var innerControl = null.asInstanceOf[Control] + @volatile var restartCount = 0 + @volatile var innerControl = null.asInstanceOf[Control] val restartSource = RestartSource.onFailuresWithBackoff( RestartSettings(minBackoff = 0.1.seconds, maxBackoff = 1.seconds, randomFactor = 0.2) @@ -161,12 +161,12 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa .source(consumerSettings, Subscriptions.topics(sourceTopic)) .map { msg => if (msg.record.value().toInt == 50 && restartCount < 2) { - // add a delay that equals or exceeds EoS commit interval to trigger a commit for everything - // up until this record (0 -> 500) - Thread.sleep(producerDefaults.eosCommitInterval.toMillis + 10) + // add a delay that equals or exceeds EoS commit interval (10s as recommended in KIP-447) + // to trigger a commit for everything up until this record (0 -> 500) + Thread.sleep(10100) } if (msg.record.value().toInt == 51 && restartCount < 2) { - throw new RuntimeException("Uh oh..") + throw TestException("Uh oh") } else { ProducerMessage.Message(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset) @@ -179,7 +179,7 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa } // side effect out the `Control` materialized value because it can't be propagated through the `RestartSource` .mapMaterializedValue(innerControl = _) - .via(Transactional.flow(producerDefaults, group)) + .via(Transactional.flow(producerDefaults)) } restartSource.runWith(Sink.ignore) @@ -209,12 +209,7 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa def runStream(id: String): Consumer.Control = { val control: Control = - transactionalCopyStream(consumerSettings, - txProducerDefaults, - sourceTopic, - sinkTopic, - s"$group-$id", - 10.seconds) + transactionalCopyStream(consumerSettings, txProducerDefaults, sourceTopic, sinkTopic, 10.seconds) .toMat(Sink.ignore)(Keep.left) .run() control @@ -267,7 +262,7 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa msgs.map(_.partitionOffset).maxBy(_.offset) ) } - .via(Transactional.flow(producerDefaults, group)) + .via(Transactional.flow(producerDefaults)) .toMat(Sink.ignore)(Keep.left) .run() } @@ -296,7 +291,6 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa val sourceTopic = createTopic(1, maxPartitions) val sinkTopic = createTopic(2, maxPartitions) val group = createGroupId(1) - val transactionalId = createTransactionalId() val consumerSettings = consumerDefaults.withGroupId(group) @@ -313,7 +307,7 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa msg.record.value), msg.partitionOffset) } - .runWith(Transactional.sink(producerDefaults, transactionalId)) + .runWith(Transactional.sink(producerDefaults)) } .toMat(Sink.ignore)(Keep.left) .run() @@ -354,7 +348,6 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa val sourceTopic = createTopic(1, maxPartitions) val sinkTopic = createTopic(2, maxPartitions) val group = createGroupId(1) - val transactionalId = createTransactionalId() val testProducerSettings = producerDefaults.withProducer(testProducer) val consumerSettings = consumerDefaults @@ -381,12 +374,12 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa msg.record.value), msg.partitionOffset) } - .runWith(Transactional.sink(producerDefaults, transactionalId)) + .runWith(Transactional.sink(producerDefaults)) } .toMat(Sink.ignore)(Keep.both) .run() - log.info("Running 2 transactional workloads with prefix transactional id: {}", transactionalId) + log.info("Running 2 transactional workloads") val (control1, streamResult1) = runTransactional() val (control2, streamResult2) = runTransactional() @@ -436,7 +429,6 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa val topic = createTopic(1, partitions) val outTopic = createTopic(2, partitions) val group = createGroupId(1) - val transactionalId = createTransactionalId() val sourceSettings = consumerDefaults .withGroupId(group) @@ -455,7 +447,7 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa msg.record.value() + "-out"), msg.partitionOffset) } - .to(Transactional.sink(producerDefaults, transactionalId)) + .to(Transactional.sink(producerDefaults)) .run() } .toMat(Sink.ignore)(DrainingControl.apply) diff --git a/tests/src/test/scala/docs/scaladsl/TransactionsExample.scala b/tests/src/test/scala/docs/scaladsl/TransactionsExample.scala index 4aa505fef..013f83791 100644 --- a/tests/src/test/scala/docs/scaladsl/TransactionsExample.scala +++ b/tests/src/test/scala/docs/scaladsl/TransactionsExample.scala @@ -29,7 +29,6 @@ class TransactionsExample extends DocsSpecBase with TestcontainersKafkaLike with val producerSettings = txProducerDefaults val sourceTopic = createTopic(1) val sinkTopic = createTopic(2) - val transactionalId = createTransactionalId() // #transactionalSink val control = Transactional @@ -38,7 +37,7 @@ class TransactionsExample extends DocsSpecBase with TestcontainersKafkaLike with .map { msg => ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset) } - .toMat(Transactional.sink(producerSettings, transactionalId))(DrainingControl.apply) + .toMat(Transactional.sink(producerSettings))(DrainingControl.apply) .run() // ... @@ -71,7 +70,7 @@ class TransactionsExample extends DocsSpecBase with TestcontainersKafkaLike with .map { record => ProducerMessage.single(new ProducerRecord(sinkTopic, record.key, record.value)) } - .toMat(Transactional.sinkWithOffsetContext(producerSettings, createTransactionalId()))(DrainingControl.apply) + .toMat(Transactional.sinkWithOffsetContext(producerSettings))(DrainingControl.apply) .run() val testConsumerGroup = createGroupId(2) @@ -91,7 +90,6 @@ class TransactionsExample extends DocsSpecBase with TestcontainersKafkaLike with val producerSettings = txProducerDefaults val sourceTopic = createTopic(1) val sinkTopic = createTopic(2) - val transactionalId = createTransactionalId() // #transactionalFailureRetry val innerControl = new AtomicReference[Control](Consumer.NoopControl) @@ -110,7 +108,7 @@ class TransactionsExample extends DocsSpecBase with TestcontainersKafkaLike with } // side effect out the `Control` materialized value because it can't be propagated through the `RestartSource` .mapMaterializedValue(c => innerControl.set(c)) - .via(Transactional.flow(producerSettings, transactionalId)) + .via(Transactional.flow(producerSettings)) } stream.runWith(Sink.ignore)