Skip to content

Commit

Permalink
KAFKA-18263; Group lock must be acquired when reverting static member…
Browse files Browse the repository at this point in the history
…ship rejoin (apache#18207)

When a static member rejoins the group, the group state is rewritten to the partition in order to persist the change. If the write fails, the change is reverted. However, this is done without acquiring the group lock.

This is only try in the old group coordinator. The new one does not suffer from this issue.

Reviewers: Jeff Kim <[email protected]>
  • Loading branch information
dajac authored Dec 16, 2024
1 parent 4aee33d commit a12152f
Showing 1 changed file with 64 additions and 62 deletions.
126 changes: 64 additions & 62 deletions core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1384,68 +1384,70 @@ private[group] class GroupCoordinator(
info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
groupManager.storeGroup(group, groupAssignment, error => {
if (error != Errors.NONE) {
warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")

// Failed to persist member.id of the given static member, revert the update of the static member in the group.
group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId)
completeAndScheduleNextHeartbeatExpiration(group, oldMember)
responseCallback(JoinGroupResult(
List.empty,
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = currentLeader,
skipAssignment = false,
error = error
))
} else if (supportSkippingAssignment) {
// Starting from version 9 of the JoinGroup API, static members are able to
// skip running the assignor based on the `SkipAssignment` field. We leverage
// this to tell the leader that it is the leader of the group but by skipping
// running the assignor while the group is in stable state.
// Notes:
// 1) This allows the leader to continue monitoring metadata changes for the
// group. Note that any metadata changes happening while the static leader is
// down won't be noticed.
// 2) The assignors are not idempotent nor free from side effects. This is why
// we skip entirely the assignment step as it could generate a different group
// assignment which would be ignored by the group coordinator because the group
// is the stable state.
val isLeader = group.isLeader(newMemberId)
group.maybeInvokeJoinCallback(member, JoinGroupResult(
members = if (isLeader) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = newMemberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
skipAssignment = isLeader,
error = Errors.NONE
))
} else {
// Prior to version 9 of the JoinGroup API, we wanted to avoid current leader
// performing trivial assignment while the group is in stable stage, because
// the new assignment in leader's next sync call won't be broadcast by a stable group.
// This could be guaranteed by always returning the old leader id so that the current
// leader won't assume itself as a leader based on the returned message, since the new
// member.id won't match returned leader id, therefore no assignment will be performed.
group.maybeInvokeJoinCallback(member, JoinGroupResult(
members = List.empty,
memberId = newMemberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = currentLeader,
skipAssignment = false,
error = Errors.NONE
))
group.inLock {
if (error != Errors.NONE) {
warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")

// Failed to persist member.id of the given static member, revert the update of the static member in the group.
group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId)
completeAndScheduleNextHeartbeatExpiration(group, oldMember)
responseCallback(JoinGroupResult(
List.empty,
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = currentLeader,
skipAssignment = false,
error = error
))
} else if (supportSkippingAssignment) {
// Starting from version 9 of the JoinGroup API, static members are able to
// skip running the assignor based on the `SkipAssignment` field. We leverage
// this to tell the leader that it is the leader of the group but by skipping
// running the assignor while the group is in stable state.
// Notes:
// 1) This allows the leader to continue monitoring metadata changes for the
// group. Note that any metadata changes happening while the static leader is
// down won't be noticed.
// 2) The assignors are not idempotent nor free from side effects. This is why
// we skip entirely the assignment step as it could generate a different group
// assignment which would be ignored by the group coordinator because the group
// is the stable state.
val isLeader = group.isLeader(newMemberId)
group.maybeInvokeJoinCallback(member, JoinGroupResult(
members = if (isLeader) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = newMemberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
skipAssignment = isLeader,
error = Errors.NONE
))
} else {
// Prior to version 9 of the JoinGroup API, we wanted to avoid current leader
// performing trivial assignment while the group is in stable stage, because
// the new assignment in leader's next sync call won't be broadcast by a stable group.
// This could be guaranteed by always returning the old leader id so that the current
// leader won't assume itself as a leader based on the returned message, since the new
// member.id won't match returned leader id, therefore no assignment will be performed.
group.maybeInvokeJoinCallback(member, JoinGroupResult(
members = List.empty,
memberId = newMemberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = currentLeader,
skipAssignment = false,
error = Errors.NONE
))
}
}
}, requestLocal)
} else {
Expand Down

0 comments on commit a12152f

Please sign in to comment.