Skip to content

Commit

Permalink
feat: kip447 transactions (#1728)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Enno Runne <[email protected]>
  • Loading branch information
johanandren and ennru authored Apr 8, 2024
1 parent 344d4e1 commit e745e58
Show file tree
Hide file tree
Showing 20 changed files with 292 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object ReactiveKafkaTransactionFixtures extends PerfFixtureHelpers {
Transactional.source(consumerSettings, Subscriptions.topics(c.filledTopic.topic))

val producerSettings = createProducerSettings(c.kafkaHost).withEosCommitInterval(commitInterval)
val flow: Flow[KProducerMessage, KResult, NotUsed] = Transactional.flow(producerSettings, randomId())
val flow: Flow[KProducerMessage, KResult, NotUsed] = Transactional.flow(producerSettings)

ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult](c.filledTopic.topic,
sinkTopic,
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ val commonSettings = Def.settings(
"11",
"-Wconf:cat=feature:w,cat=deprecation&msg=.*JavaConverters.*:s,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w"
) ++ {
if (insideCI.value && !Nightly && scalaVersion.value != Scala3) Seq("-Werror")
if (scalaVersion.value != Scala3) Seq("-Werror")
else Seq.empty
},
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/mima-filters/5.0.0.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# ApiMayChange
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.flowWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.sinkWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.flowWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext")
6 changes: 3 additions & 3 deletions core/src/main/scala/akka/kafka/ConsumerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ package akka.kafka

import java.util.Objects
import java.util.concurrent.CompletionStage

import akka.Done
import akka.annotation.{DoNotInherit, InternalApi}
import akka.kafka.internal.{CommittableOffsetBatchImpl, CommittedMarker}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, ConsumerRecord}
import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future
Expand Down Expand Up @@ -133,7 +132,8 @@ object ConsumerMessage {
override val key: GroupTopicPartition,
override val offset: Long,
private[kafka] val committedMarker: CommittedMarker,
private[kafka] val fromPartitionedSource: Boolean
private[kafka] val fromPartitionedSource: Boolean,
requestConsumerGroupMetadata: () => Future[ConsumerGroupMetadata]
) extends PartitionOffset(key, offset)

/**
Expand Down
23 changes: 16 additions & 7 deletions core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
}

override def preStart(): Unit = {
super.preStart()
resolveProducer(stage.settings)
}

private def checkForCompletion(): Unit =
protected def checkForCompletion(): Unit =
if (isClosed(stage.in) && awaitingConfirmation == 0) {
completionState match {
case Some(Success(_)) => onCompletionSuccess()
Expand All @@ -84,7 +83,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
}
}

override def onCompletionSuccess(): Unit = completeStage()
override def onCompletionSuccess(): Unit = if (readyToShutdown()) completeStage()

override def onCompletionFailure(ex: Throwable): Unit = failStage(ex)

Expand All @@ -104,9 +103,16 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:

protected def resumeDemand(tryToPull: Boolean = true): Unit = {
log.debug("Resume demand")
setHandler(stage.out, new OutHandler {
override def onPull(): Unit = tryPull(stage.in)
})
setHandler(
stage.out,
new OutHandler {
override def onPull(): Unit = tryPull(stage.in)

override def onDownstreamFinish(cause: Throwable): Unit = {
super.onDownstreamFinish(cause)
}
}
)
// kick off demand for more messages if we're resuming demand
if (tryToPull && isAvailable(stage.out) && !hasBeenPulled(stage.in)) {
tryPull(stage.in)
Expand Down Expand Up @@ -202,6 +208,9 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
override def postStop(): Unit = {
log.debug("ProducerStage postStop")
closeProducer()
super.postStop()
}

// Specifically for transactional producer that needs to defer shutdown to let an async task
// complete before actually shutting down
protected def readyToShutdown(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ import scala.util.control.NonFatal
/** Special case commit for non-batched committing. */
final case class CommitSingle(tp: TopicPartition, offsetAndMetadata: OffsetAndMetadata)
extends NoSerializationVerificationNeeded

// Used for transactions/EOS returns the current ConsumerGroupMetadata
case object GetConsumerGroupMetadata extends NoSerializationVerificationNeeded

//responses
final case class Assigned(partition: List[TopicPartition]) extends NoSerializationVerificationNeeded
final case class Revoked(partition: List[TopicPartition]) extends NoSerializationVerificationNeeded
Expand Down Expand Up @@ -294,6 +298,9 @@ import scala.util.control.NonFatal
stageActorsMap = stageActorsMap.filterNot(_._2 == ref)
requests -= ref

case GetConsumerGroupMetadata =>
sender() ! consumer.groupMetadata()

case req: Metadata.Request =>
sender() ! handleMetadataRequest(req)
}
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package akka.kafka.internal
import java.util.concurrent.CompletionStage

import akka.Done
import akka.annotation.InternalApi
import akka.kafka.ConsumerMessage
Expand All @@ -16,7 +15,7 @@ import akka.kafka.ConsumerMessage.{
TransactionalMessage,
_
}
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata}
import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.OffsetFetchResponse

Expand Down Expand Up @@ -46,6 +45,8 @@ private[kafka] trait TransactionalMessageBuilderBase[K, V, Msg] extends MessageB
def onMessage(consumerMessage: ConsumerRecord[K, V]): Unit

def fromPartitionedSource: Boolean

def requestConsumerGroupMetadata(): Future[ConsumerGroupMetadata]
}

/** Internal API */
Expand All @@ -62,7 +63,8 @@ private[kafka] trait TransactionalMessageBuilder[K, V]
),
offset = rec.offset,
committedMarker,
fromPartitionedSource
fromPartitionedSource,
requestConsumerGroupMetadata _
)
ConsumerMessage.TransactionalMessage(rec, offset)
}
Expand All @@ -82,7 +84,8 @@ private[kafka] trait TransactionalOffsetContextBuilder[K, V]
),
offset = rec.offset,
committedMarker,
fromPartitionedSource
fromPartitionedSource,
requestConsumerGroupMetadata _
)
(rec, offset)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.kafka.internal

import akka.Done
import akka.annotation.InternalApi
import akka.dispatch.{Dispatchers, ExecutionContexts}
import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffsetCommittedMarker}
import akka.kafka.ProducerMessage.{Envelope, Results}
import akka.kafka.internal.DeferredProducer._
Expand All @@ -18,9 +19,10 @@ import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, OffsetAndMetada
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

/**
* INTERNAL API
Expand Down Expand Up @@ -54,7 +56,7 @@ private object TransactionalProducerStage {
override def committingFailed(): Unit = {}
}

final class NonemptyTransactionBatch(head: PartitionOffsetCommittedMarker,
final class NonemptyTransactionBatch(val head: PartitionOffsetCommittedMarker,
tail: Map[GroupTopicPartition, Long] = Map[GroupTopicPartition, Long]())
extends TransactionBatch {
// There is no guarantee that offsets adding callbacks will be called in any particular order.
Expand Down Expand Up @@ -90,6 +92,10 @@ private object TransactionalProducerStage {
}
}

final case class CommitTransaction(batch: NonemptyTransactionBatch,
beginNewTransaction: Boolean,
consumerGroupMetadata: ConsumerGroupMetadata)

}

/**
Expand All @@ -109,15 +115,27 @@ private final class TransactionalProducerStageLogic[K, V, P](

private val commitSchedulerKey = "commit"
private val messageDrainInterval = 10.milliseconds

private var batchOffsets = TransactionBatch.empty

private var demandSuspended = false
private var commitInProgress = false

private var firstMessage: Option[Envelope[K, V, P]] = None

private val commitTransactionCB: Try[CommitTransaction] => Unit =
createAsyncCallback[Try[CommitTransaction]](commitTransaction _).invoke _

private val onInternalCommitAckCb: Try[Done] => Unit =
getAsyncCallback[Try[Done]] { maybeDone =>
maybeDone match {
case Failure(ex) => log.debug("Internal commit failed: {}", ex)
case _ =>
}
scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval)
}.invoke _

override protected def logSource: Class[_] = classOf[TransactionalProducerStage[_, _, _]]

// FIXME no longer true
// we need to peek at the first message to generate the producer transactional id for partitioned sources
override def preStart(): Unit = resumeDemand()

Expand Down Expand Up @@ -163,7 +181,17 @@ private final class TransactionalProducerStageLogic[K, V, P](
val awaitingConf = awaitingConfirmationValue
batchOffsets match {
case batch: NonemptyTransactionBatch if awaitingConf == 0 =>
commitTransaction(batch, beginNewTransaction)
if (commitInProgress) {
// batch will end up in next commit
log.debug("Commit already in progress, ignoring request to commit")
} else {
commitInProgress = true
batch.head
.requestConsumerGroupMetadata()
.onComplete { consumerGroupMetadataTry =>
commitTransactionCB(consumerGroupMetadataTry.map(CommitTransaction(batch, beginNewTransaction, _)))
}(ExecutionContexts.parasitic)
}
case _: EmptyTransactionBatch if awaitingConf == 0 && abortEmptyTransactionOnComplete =>
abortTransaction("Transaction is empty and stage is completing")
case _ if awaitingConf > 0 =>
Expand All @@ -177,6 +205,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
/**
* When using partitioned sources we extract the transactional id, group id, and topic partition information from
* the first message in order to define a `transacitonal.id` before constructing the [[org.apache.kafka.clients.producer.KafkaProducer]]
* FIXME this is probably no longer true
*/
private def parseFirstMessage(msg: Envelope[K, V, P]): Boolean =
producerAssignmentLifecycle match {
Expand All @@ -189,7 +218,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
firstMessage = Some(msg)
// initiate async async producer request _after_ first message is stashed in case future eagerly resolves
// instead of asynccallback
resolveProducer(generatedTransactionalConfig(msg))
resolveProducer(generatedTransactionalConfig)
// suspend demand after we receive the first message until the producer is assigned
suspendDemand()
false
Expand All @@ -199,20 +228,14 @@ private final class TransactionalProducerStageLogic[K, V, P](
)
}

private def generatedTransactionalConfig(msg: Envelope[K, V, P]): ProducerSettings[K, V] = {
val txId = msg.passThrough match {
case committedMarker: PartitionOffsetCommittedMarker if committedMarker.fromPartitionedSource =>
val gtp = committedMarker.key
val txId = s"$transactionalId-${gtp.groupId}-${gtp.topic}-${gtp.partition}"
log.debug("Generated transactional id from partitioned source '{}'", txId)
txId
case _ => transactionalId
}

private def generatedTransactionalConfig: ProducerSettings[K, V] = {
stage.settings.withProperties(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString,
ProducerConfig.TRANSACTIONAL_ID_CONFIG -> txId,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString
ProducerConfig.TRANSACTIONAL_ID_CONFIG -> transactionalId,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString,
// KIP-447: "We shall set `transaction.timout.ms` default to 10000 ms (10 seconds) on Kafka Streams. For non-stream users,
// we highly recommend you to do the same if you want to use the new semantics."
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG -> 10000.toString
)
}

Expand All @@ -223,46 +246,49 @@ private final class TransactionalProducerStageLogic[K, V, P](

override def onCompletionSuccess(): Unit = {
log.debug("Committing final transaction before shutdown")
if (commitInProgress)
log.warning(
"Stage onCompleteSuccess with commit in flight, this is a bug, please report at https://github.com/akka/alpakka-kafka/issues"
)
cancelTimer(commitSchedulerKey)
maybeCommitTransaction(beginNewTransaction = false, abortEmptyTransactionOnComplete = true)
super.onCompletionSuccess()
}

override def onCompletionFailure(ex: Throwable): Unit = {
abortTransaction("Stage failure")
abortTransaction(s"Stage failure ($ex)")
batchOffsets.committingFailed()
super.onCompletionFailure(ex)
}

private def commitTransaction(batch: NonemptyTransactionBatch, beginNewTransaction: Boolean): Unit = {
val group = batch.group
log.debug("Committing transaction for transactional id '{}' consumer group '{}' with offsets: {}",
transactionalId,
group,
batch.offsets)
val offsetMap = batch.offsetMap()
producer.sendOffsetsToTransaction(offsetMap.asJava, new ConsumerGroupMetadata(group))
producer.commitTransaction()
log.debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}",
transactionalId,
group,
batch.offsets)
batchOffsets = TransactionBatch.empty
batch
.internalCommit()
.onComplete { _ =>
onInternalCommitAckCb.invoke(())
}(materializer.executionContext)
if (beginNewTransaction) {
beginTransaction()
resumeDemand()
private def commitTransaction(t: Try[CommitTransaction]): Unit = {
commitInProgress = false
t match {
case Failure(ex) =>
failStage(new RuntimeException("Failed to fetch consumer group metadata", ex))
case Success(CommitTransaction(batch, beginNewTransaction, consumerGroupMetadata)) =>
log.debug("Committing transaction for transactional id '{}' consumer group '{}' with offsets: {}",
transactionalId,
consumerGroupMetadata,
batch.offsets)
val offsetMap = batch.offsetMap()
producer.sendOffsetsToTransaction(offsetMap.asJava, consumerGroupMetadata)
producer.commitTransaction()
log.debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}",
transactionalId,
consumerGroupMetadata,
batch.offsets)
batchOffsets = TransactionBatch.empty
batch
.internalCommit()
.onComplete(onInternalCommitAckCb)(ExecutionContexts.parasitic)
if (beginNewTransaction) {
beginTransaction()
resumeDemand()
}
}
}

private val onInternalCommitAckCb: AsyncCallback[Unit] = {
getAsyncCallback[Unit](
_ => scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval)
)
// in case stage wants to shut down but was waiting for commit to complete
checkForCompletion()
}

private def initTransactions(): Unit = {
Expand All @@ -279,4 +305,14 @@ private final class TransactionalProducerStageLogic[K, V, P](
log.debug("Aborting transaction: {}", reason)
if (producerAssignmentLifecycle == Assigned) producer.abortTransaction()
}

override protected def readyToShutdown(): Boolean = !commitInProgress

override def postStop(): Unit = {
if (commitInProgress)
log.warning(
"Stage postStop with commit in flight, this is a bug, please report at https://github.com/akka/alpakka-kafka/issues"
)
super.postStop()
}
}
Loading

0 comments on commit e745e58

Please sign in to comment.