Skip to content

Commit

Permalink
KAFKA-17507; WriteTxnMarkers API must not return until markers are wr…
Browse files Browse the repository at this point in the history
…itten and materialized in group coordinator's cache (apache#18168)

We have observed the below errors in some cluster:

Uncaught exception in scheduled task 'handleTxnCompletion-902667' exception.message:Trying to complete a transactional offset commit for producerId *** and groupId *** even though the offset commit record itself hasn't been appended to the log.

When a transaction is completed, the transaction coordinator sends a WriteTxnMarkers request to all the partitions involved in the transaction to write the markers to them. When the broker receives it, it writes the markers and if markers are written to the __consumer_offsets partitions, it informs the group coordinator that it can materialize the pending transactional offsets in its main cache. The group coordinator does this asynchronously since Apache Kafka 2.0, see this patch.

The above error appends when the asynchronous operation is executed by the scheduler and the operation finds that there are pending transactional offsets that were not written yet. How come?

There is actually an issue is the steps described above. The group coordinator does not wait until the asynchronous operation completes to return to the api layer. Hence the WriteTxnMarkers response may be send back to the transaction coordinator before the async operation is actually completed. Hence it is possible that the next transactional produce to be started also before the operation is completed too. This could explain why the group coordinator has pending transactional offsets that are not written yet.

There is a similar issue when the transaction is aborted. However on this path, we don't have any checks to verify whether all the pending transactional offsets have been written or not so we don't see any errors in our logs. Due to the same race condition, it is possible to actually remove the wrong pending transactional offsets.

PS: The new group coordinator is not impacted by this bug.

Reviewers: Justine Olshan <[email protected]>
  • Loading branch information
dajac authored Dec 13, 2024
1 parent b73e31e commit 450c10d
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, GroupJoinKe
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard

import java.util.concurrent.CompletableFuture
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.math.max

Expand Down Expand Up @@ -982,7 +983,7 @@ private[group] class GroupCoordinator(

def scheduleHandleTxnCompletion(producerId: Long,
offsetsPartitions: Iterable[TopicPartition],
transactionResult: TransactionResult): Unit = {
transactionResult: TransactionResult): CompletableFuture[Void] = {
require(offsetsPartitions.forall(_.topic == Topic.GROUP_METADATA_TOPIC_NAME))
val isCommit = transactionResult == TransactionResult.COMMIT
groupManager.scheduleHandleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,12 +587,16 @@ private[group] class GroupCoordinatorAdapter(
producerId: Long,
partitions: java.lang.Iterable[TopicPartition],
transactionResult: TransactionResult
): Unit = {
coordinator.scheduleHandleTxnCompletion(
producerId,
partitions.asScala,
transactionResult
)
): CompletableFuture[Void] = {
try {
coordinator.scheduleHandleTxnCompletion(
producerId,
partitions.asScala,
transactionResult
)
} catch {
case e: Throwable => FutureUtils.failedFuture(e)
}
}

override def onPartitionsDeleted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import java.util.{Optional, OptionalInt, OptionalLong}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.function.Supplier
import com.yammer.metrics.core.Gauge
import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError
Expand Down Expand Up @@ -931,9 +931,17 @@ class GroupMetadataManager(brokerId: Int,
* more group metadata locks to handle transaction completion, this operation is scheduled on
* the scheduler thread to avoid deadlocks.
*/
def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = {
scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () =>
handleTxnCompletion(producerId, completedPartitions, isCommit))
def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): CompletableFuture[Void] = {
val future = new CompletableFuture[Void]()
scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () => {
try {
handleTxnCompletion(producerId, completedPartitions, isCommit)
future.complete(null)
} catch {
case e: Throwable => future.completeExceptionally(e)
}
})
future
}

private[group] def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = {
Expand Down
51 changes: 32 additions & 19 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2376,28 +2376,41 @@ class KafkaApis(val requestChannel: RequestChannel,
trace(s"End transaction marker append for producer id $producerId completed with status: $currentErrors")
updateErrors(producerId, currentErrors)

if (!groupCoordinator.isNewGroupCoordinator) {
val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) =>
topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE
}.keys

if (successfulOffsetsPartitions.nonEmpty) {
// as soon as the end transaction marker has been written for a transactional offset commit,
// call to the group coordinator to materialize the offsets into the cache
try {
groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result)
} catch {
case e: Exception =>
error(s"Received an exception while trying to update the offsets cache on transaction marker append", e)
val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
updateErrors(producerId, updatedErrors)
}
def maybeSendResponse(): Unit = {
if (numAppends.decrementAndGet() == 0) {
requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
}
}

if (numAppends.decrementAndGet() == 0)
requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
// The new group coordinator uses GroupCoordinator#completeTransaction so we do
// not need to call GroupCoordinator#onTransactionCompleted here.
if (config.isNewGroupCoordinatorEnabled) {
maybeSendResponse()
return
}

val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) =>
topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE
}.keys

// If no end transaction marker has been written to a __consumer_offsets partition, we do not
// need to call GroupCoordinator#onTransactionCompleted.
if (successfulOffsetsPartitions.isEmpty) {
maybeSendResponse()
return
}

// Otherwise, we call GroupCoordinator#onTransactionCompleted to materialize the offsets
// into the cache and we wait until the meterialization is completed.
groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
if (exception != null) {
error(s"Received an exception while trying to update the offsets cache on transaction marker append", exception)
val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
updateErrors(producerId, updatedErrors)
}
maybeSendResponse()
}
}

// TODO: The current append API makes doing separate writes per producerId a little easier, but it would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, RequestHeader}
import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, RequestHeader, TransactionResult}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
Expand All @@ -37,6 +37,7 @@ import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.mockito.ArgumentMatchers.any
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}

Expand Down Expand Up @@ -930,4 +931,26 @@ class GroupCoordinatorAdapterTest {
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(future, classOf[UnsupportedVersionException])
}

@Test
def testOnTransactionCompletedWithUnexpectedException(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)

when(groupCoordinator.scheduleHandleTxnCompletion(
any(),
any(),
any()
)).thenThrow(new IllegalStateException("Oh no!"))

val future = adapter.onTransactionCompleted(
10,
Seq.empty[TopicPartition].asJava,
TransactionResult.COMMIT
)

assertTrue(future.isDone)
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(future, classOf[Exception])
}
}
126 changes: 126 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3337,6 +3337,132 @@ class KafkaApisTest extends Logging {
any())
}

@Test
def testHandleWriteTxnMarkersRequestWithOldGroupCoordinator(): Unit = {
val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)
val foo0 = new TopicPartition("foo", 0)
val foo1 = new TopicPartition("foo", 1)

val allPartitions = List(
offset0,
offset1,
foo0,
foo1
)

val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
ApiKeys.WRITE_TXN_MARKERS.latestVersion(),
List(
new TxnMarkerEntry(
1L,
1.toShort,
0,
TransactionResult.COMMIT,
List(offset0, foo0).asJava
),
new TxnMarkerEntry(
2L,
1.toShort,
0,
TransactionResult.ABORT,
List(offset1, foo1).asJava
)
).asJava
).build()

val requestChannelRequest = buildRequest(writeTxnMarkersRequest)

allPartitions.foreach { tp =>
when(replicaManager.getMagic(tp))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
}

when(groupCoordinator.onTransactionCompleted(
ArgumentMatchers.eq(1L),
ArgumentMatchers.any(),
ArgumentMatchers.eq(TransactionResult.COMMIT)
)).thenReturn(CompletableFuture.completedFuture[Void](null))

when(groupCoordinator.onTransactionCompleted(
ArgumentMatchers.eq(2L),
ArgumentMatchers.any(),
ArgumentMatchers.eq(TransactionResult.ABORT)
)).thenReturn(FutureUtils.failedFuture[Void](Errors.NOT_CONTROLLER.exception))

val entriesPerPartition: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])

when(replicaManager.appendRecords(
ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
ArgumentMatchers.eq(-1),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
entriesPerPartition.capture(),
responseCallback.capture(),
any(),
any(),
ArgumentMatchers.eq(RequestLocal.noCaching()),
any(),
any()
)).thenAnswer { _ =>
responseCallback.getValue.apply(
entriesPerPartition.getValue.keySet.map { tp =>
tp -> new PartitionResponse(Errors.NONE)
}.toMap
)
}
kafkaApis = createKafkaApis(overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false"
))
kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.noCaching())

val expectedResponse = new WriteTxnMarkersResponseData()
.setMarkers(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
.setProducerId(1L)
.setTopics(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
.setPartitions(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code)
).asJava),
new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
.setName("foo")
.setPartitions(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code)
).asJava)
).asJava),
new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
.setProducerId(2L)
.setTopics(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
.setPartitions(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
).asJava),
new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
.setName("foo")
.setPartitions(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)
).asJava)
).asJava)
).asJava)

val response = verifyNoThrottling[WriteTxnMarkersResponse](requestChannelRequest)
assertEquals(normalize(expectedResponse), normalize(response.data))
}

@Test
def testHandleWriteTxnMarkersRequestWithNewGroupCoordinator(): Unit = {
val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,18 @@ CompletableFuture<Void> completeTransaction(
/**
* Commit or abort the pending transactional offsets for the given partitions.
*
* This method is only used by the old group coordinator. Internally, the old
* group coordinator completes the transaction asynchronously in order to
* avoid deadlocks. Hence, this method returns a future that the caller
* can wait on.
*
* @param producerId The producer id.
* @param partitions The partitions.
* @param transactionResult The result of the transaction.
*
* @return A future yielding the result.
*/
void onTransactionCompleted(
CompletableFuture<Void> onTransactionCompleted(
long producerId,
Iterable<TopicPartition> partitions,
TransactionResult transactionResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ public CompletableFuture<Void> completeTransaction(
* See {@link GroupCoordinator#onTransactionCompleted(long, Iterable, TransactionResult)}.
*/
@Override
public void onTransactionCompleted(
public CompletableFuture<Void> onTransactionCompleted(
long producerId,
Iterable<TopicPartition> partitions,
TransactionResult transactionResult
Expand Down

0 comments on commit 450c10d

Please sign in to comment.