Skip to content

Commit

Permalink
fix(kafka): remove group member if sync timeout and improve group logs (
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f authored Jan 22, 2024
1 parent 95c266f commit d83cab5
Showing 1 changed file with 100 additions and 41 deletions.
141 changes: 100 additions & 41 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -260,32 +260,30 @@ newGroupFromValue value metadataManager metaHandle = do
joinGroup :: Group -> K.RequestContext -> K.JoinGroupRequest -> IO K.JoinGroupResponse
joinGroup group@Group{..} reqCtx req = do
-- delayed response(join barrier)
Log.debug $ "received joinGroup"
Log.info $ "received joinGroup request:" <> Log.buildString' req
delayedResponse <- C.newEmptyMVar
C.withMVar lock $ \_ -> do
-- TODO: GROUP MAX SIZE

checkSupportedProtocols group req
Log.debug $ "checked protocols"

-- check state
IO.readIORef group.state >>= \case
Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID)
_ -> pure ()

Log.debug $ "checked state"
if T.null req.memberId
then doNewMemberJoinGoup group reqCtx req delayedResponse
else doCurrentMemeberJoinGroup group req delayedResponse

-- waiting other consumers
resp <- C.takeMVar delayedResponse
Log.info $ "joinGroup: received delayed response:" <> Log.buildString' resp
<> "group:" <> Log.build groupId
return resp

checkSupportedProtocols :: Group -> K.JoinGroupRequest -> IO ()
checkSupportedProtocols Group{..} req = do
Log.debug $ "checking protocols"
IO.readIORef state >>= \case
Empty -> do
M.when (T.null req.protocolType) $ do
Expand All @@ -305,6 +303,7 @@ cancelDelayedSyncResponses Group{..} reason = do
lst <- H.toList delayedSyncResponses
M.forM_ lst $ \(memberId, delayed) -> do
Log.info $ "cancel delayed sync response for " <> Log.buildString' memberId
<> ", group:" <> Log.build groupId
<> ", reason:" <> Log.build reason
_ <- C.tryPutMVar delayed $ K.SyncGroupResponse {throttleTimeMs=0, errorCode=K.REBALANCE_IN_PROGRESS, assignment=BS.empty}
H.delete delayedSyncResponses memberId
Expand All @@ -320,7 +319,8 @@ cancelDelayedSync Group{..} = do
doNewMemberJoinGoup :: Group -> K.RequestContext -> K.JoinGroupRequest -> C.MVar K.JoinGroupResponse -> IO ()
doNewMemberJoinGoup group reqCtx req delayedResponse = do
newMemberId <- generateMemberId reqCtx
Log.debug $ "generated member id:" <> Log.buildString' newMemberId
Log.info $ "generated member id:" <> Log.buildString' newMemberId
<> " for group:" <> Log.build group.groupId
doDynamicNewMemberJoinGroup group reqCtx req newMemberId delayedResponse

generateMemberId :: K.RequestContext -> IO T.Text
Expand Down Expand Up @@ -350,15 +350,17 @@ addMemberAndRebalance group reqCtx req newMemberId delayedResponse = do
member <- newMemberFromReq reqCtx req newMemberId (refineProtocols req.protocols)
addMember group member (Just delayedResponse)
-- TODO: check state
prepareRebalance group
prepareRebalance group $ "add member:" <> member.memberId

updateMemberAndRebalance :: Group -> Member -> K.JoinGroupRequest -> C.MVar K.JoinGroupResponse -> IO ()
updateMemberAndRebalance group member req delayedResponse = do
updateMember group member req delayedResponse
prepareRebalance group
prepareRebalance group $ "update member:" <> member.memberId

prepareRebalance :: Group -> IO ()
prepareRebalance group@Group{..} = do
prepareRebalance :: Group -> T.Text -> IO ()
prepareRebalance group@Group{..} reason = do
Log.info $ "prepare rebalance, group:" <> Log.build groupId
<> "reason:" <> Log.build reason
-- check state CompletingRebalance and cancel delayedSyncResponses
Utils.whenIORefEq state CompletingRebalance $ do
cancelDelayedSyncResponses group "rebalance"
Expand All @@ -374,6 +376,7 @@ prepareRebalance group@Group{..} = do
Nothing -> do
delayed <- makeDelayedRebalance group 5000
Log.info $ "created delayed rebalance thread:" <> Log.buildString' delayed
<> ", group:" <> Log.build groupId
IO.atomicWriteIORef delayedRebalance (Just delayed)
IO.atomicWriteIORef state PreparingRebalance
_ -> pure ()
Expand All @@ -388,7 +391,7 @@ makeDelayedRebalance group rebalanceDelayMs = do
rebalance :: Group -> IO ()
rebalance group@Group{..} = do
C.withMVar lock $ \() -> do
Log.info "rebalancing is starting"
Log.info $ "rebalancing is starting, group:" <> Log.build groupId
-- remove all members who haven't joined(and maybe elect new leader)
removeNotYetRejoinedMembers group

Expand All @@ -397,7 +400,7 @@ rebalance group@Group{..} = do
<> ", generationId:" <> Log.build nextGenerationId
IO.readIORef leader >>= \case
Nothing -> do
Log.info "cancel rebalance without any join request"
Log.info $ "cancel rebalance without any join request, " <> Log.build groupId

IO.atomicWriteIORef group.protocolName Nothing
transitionTo group Empty
Expand All @@ -415,22 +418,24 @@ transitionTo :: Group -> GroupState -> IO ()
transitionTo group state = do
oldState <- IO.atomicModifyIORef' group.state (state, )
Log.info $ "group state changed, " <> Log.buildString' oldState <> " -> " <> Log.buildString' state
<> ", group:" <> Log.build group.groupId

doRelance :: Group -> T.Text -> IO ()
doRelance group@Group{..} leaderMemberId = do
selectedProtocolName <- computeProtocolName group
Log.info $ "selected protocolName:" <> Log.buildString' selectedProtocolName

-- state changes
transitionTo group CompletingRebalance

generationId <- IO.readIORef groupGenerationId
leaderMembersInResponse <- H.toList members >>= M.mapM (\(_, m) -> getJoinResponseMember selectedProtocolName m)
Log.debug $ "members in join responses" <> Log.buildString' leaderMembersInResponse
Log.info $ "members in join responses:" <> Log.buildString' leaderMembersInResponse
<> ", group:" <> Log.build groupId

delayedJoinResponseList <- H.toList delayedJoinResponses

Log.info $ "set all delayed responses, response list size:" <> Log.buildString' (length delayedJoinResponseList)
Log.info $ "set all delayed responses, response list:" <> Log.buildString' (length delayedJoinResponseList)
<> ", group:" <> Log.build groupId
-- response all delayedJoinResponses
M.forM_ delayedJoinResponseList $ \(memberId, delayed) -> do
let memebersInResponse = if leaderMemberId == memberId then leaderMembersInResponse else []
Expand All @@ -443,24 +448,33 @@ doRelance group@Group{..} leaderMemberId = do
, members = K.KaArray (Just $ V.fromList memebersInResponse)
, throttleTimeMs = 0
}
Log.debug $ "set delayed response:" <> Log.buildString' resp
<> " for " <> Log.buildString' memberId
_ <- C.tryPutMVar delayed resp
H.delete delayedJoinResponses memberId

rebalanceTimeoutMs <- computeRebalnceTimeoutMs group
delayedSyncTid <- makeDelayedSync group generationId rebalanceTimeoutMs
IO.atomicWriteIORef delayedSync (Just delayedSyncTid)
Log.info $ "create delayed sync for group:" <> Log.build groupId
<> ", threadId:" <> Log.buildString' delayedSyncTid

IO.atomicWriteIORef delayedRebalance Nothing
Log.info "rebalancing is finished"
Log.info $ "rebalancing is finished, group:" <> Log.build groupId

removeNotYetRejoinedMembers :: Group -> IO ()
removeNotYetRejoinedMembers group@Group{..} = do
(flip H.mapM_) members $ \(mid, member) -> do
H.lookup delayedJoinResponses mid >>= \case
Nothing -> do
Log.info $ "remove member: " <> Log.build mid <> " from " <> Log.build groupId
Log.info $ "remove not yet joined member: " <> Log.build mid <> " from " <> Log.build groupId
removeMember group member
Just _ -> pure ()

removeNotYetSyncedMembers :: Group -> IO ()
removeNotYetSyncedMembers group@Group{..} = do
(flip H.mapM_) members $ \(mid, member) -> do
H.lookup delayedSyncResponses mid >>= \case
Nothing -> do
Log.info $ "remove not yet synced member: " <> Log.build mid <> " from " <> Log.build groupId
removeMember group member
Just _ -> pure ()

Expand All @@ -470,18 +484,20 @@ makeDelayedSync group@Group{..} generationId timeoutMs = do
C.threadDelay (fromIntegral timeoutMs * 1000)
C.withMVar lock $ \() -> do
Utils.unlessIORefEq groupGenerationId generationId $ \currentGid -> do
Log.fatal $ "unexpected delayed sync with wrong generationId:" <> Log.build generationId
Log.warning $ "unexpected delayed sync with wrong generationId:" <> Log.build generationId
<> ", current group generation id:" <> Log.build currentGid
<> ", groupId:" <> Log.build groupId
IO.readIORef state >>= \case
CompletingRebalance -> do
Log.info $ "delayed sync timeout, try to prepare Rebalance, groupId:" <> Log.build groupId
Log.info $ "delayed sync timeout, try to prepare Rebalance, group:" <> Log.build groupId
removeNotYetSyncedMembers group

-- remove itself (to avoid killing itself in prepareRebalance)
IO.atomicWriteIORef delayedSync Nothing
prepareRebalance group
prepareRebalance group $ "delayed sync timeout"
s -> do
Log.fatal $ "unexpected delayed sync with wrong state:" <> Log.buildString' s
<> ", groupId:" <> Log.build groupId
Log.warning $ "unexpected delayed sync with wrong state:" <> Log.buildString' s
<> ", group:" <> Log.build groupId

-- select max rebalanceTimeoutMs from all members
computeRebalnceTimeoutMs :: Group -> IO Int32
Expand All @@ -503,7 +519,6 @@ computeProtocolName group@Group{..} = do
IO.readIORef protocolName >>= \case
Nothing -> do
pn <- chooseProtocolName group
Log.debug $ "choosed protocolName" <> Log.buildString' pn
IO.atomicWriteIORef protocolName (Just pn)
pure pn
Just pn -> pure pn
Expand All @@ -512,8 +527,11 @@ computeProtocolName group@Group{..} = do
chooseProtocolName :: Group -> IO T.Text
chooseProtocolName Group {..} = do
ps <- IO.readIORef supportedProtocols
Log.debug $ "protocols:" <> Log.buildString' ps
return . head $ Set.toList ps
let pn = head $ Set.toList ps
Log.info $ "choose protocol:" <> Log.build pn
<> ", current supported protocols:" <> Log.buildString' ps
<> ", group:" <> Log.build groupId
return pn

updateSupportedProtocols :: Group -> [(T.Text, BS.ByteString)] -> IO ()
updateSupportedProtocols Group{..} protocols = do
Expand All @@ -535,14 +553,13 @@ addMember group@Group{..} member delayedResponse = do

Utils.whenIORefEq leader Nothing $ do
IO.atomicWriteIORef leader (Just member.memberId)
Log.info $ "group updated leader, groupId:" <> Log.build groupId
Log.info $ "updated leader, group:" <> Log.build groupId
<> "leader:" <> Log.build member.memberId

H.insert members member.memberId member
updateSupportedProtocols group memberProtocols

M.forM_ delayedResponse $ \delayed -> do
Log.debug $ "add delayed response into response list for member:" <> Log.buildString' member.memberId
H.insert delayedJoinResponses member.memberId delayed

updateMember :: Group -> Member -> K.JoinGroupRequest -> C.MVar K.JoinGroupResponse -> IO ()
Expand All @@ -554,7 +571,8 @@ updateMember group@Group{..} member req delayedResponse = do
IO.atomicWriteIORef member.sessionTimeoutMs req.sessionTimeoutMs

-- TODO: check delayedJoinResponses
Log.debug $ "add delayed response into response list for member:" <> Log.buildString' member.memberId
Log.info $ "updated member, add delayed response into response list for member:"
<> Log.buildString' member.memberId <> ", group:" <> Log.build groupId
H.insert delayedJoinResponses member.memberId delayedResponse

removeMember :: Group -> Member -> IO ()
Expand All @@ -572,9 +590,14 @@ removeMember Group{..} member = do
[] -> do
-- select from members
H.toList members >>= \ms -> do
IO.atomicWriteIORef leader (listToMaybe (fmap fst ms))
let newLeader = listToMaybe (fmap fst ms)
IO.atomicWriteIORef leader newLeader
Log.info $ "selcet new leader from members, group:" <> Log.build groupId
<> ", new leader:" <> Log.buildString' newLeader <> ", old leader:" <> Log.build member.memberId
(mid, _):_ -> do
IO.atomicWriteIORef leader (Just mid)
Log.info $ "select new leader from delayedJoinResponses, group:" <> Log.build groupId
<> ", new leader:" <> Log.build mid <> ", old leader:" <> Log.build member.memberId


plainProtocols :: [(T.Text, BS.ByteString)] -> Set.Set T.Text
Expand All @@ -592,6 +615,7 @@ refineProtocols protocols = case K.unKaArray protocols of

syncGroup :: Group -> K.SyncGroupRequest -> IO K.SyncGroupResponse
syncGroup group req@K.SyncGroupRequest{..} = do
Log.info $ "received sync group request:" <> Log.buildString' req
delayed <- C.newEmptyMVar
C.withMVar (group.lock) $ \() -> do
-- check member id
Expand All @@ -605,7 +629,10 @@ syncGroup group [email protected]{..} = do
M.void $ C.tryPutMVar delayed (K.SyncGroupResponse {throttleTimeMs=0, errorCode=0, assignment=assignment})
PreparingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS)
_ -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID)
C.readMVar delayed
resp <- C.readMVar delayed
Log.info $ "received delayed sync group response:" <> Log.buildString' resp
<> ", group:" <> Log.build group.groupId
return resp

doSyncGroup :: Group -> K.SyncGroupRequest -> C.MVar K.SyncGroupResponse -> IO ()
doSyncGroup group@Group{..} req@K.SyncGroupRequest{memberId=memberId} delayedResponse = do
Expand All @@ -620,8 +647,9 @@ doSyncGroup group@Group{..} [email protected]{memberId=memberId} delayedRes
(Just leaderMemberId) <- IO.readIORef leader
Log.info $ "sync group leaderMemberId: " <> Log.buildString' leaderMemberId
<> " memberId:" <> Log.buildString' memberId
<> " group:" <> Log.build groupId
<> " isLeader:" <> Log.buildString' (memberId == leaderMemberId)
when (memberId == leaderMemberId) $ do
Log.info $ "received leader SyncGroup request, " <> Log.buildString' memberId
let assignmentMap = getAssignmentMap req

storeGroup group assignmentMap
Expand All @@ -645,7 +673,7 @@ setAndPropagateAssignment :: Group -> Map.Map T.Text BS.ByteString -> IO ()
setAndPropagateAssignment group@Group{..} assignments = do
-- set assignments
Log.info $ "setting assignments:" <> Log.buildString' assignments
<> "group Id:" <> Log.build groupId
<> "group:" <> Log.build groupId
(flip H.mapM_) members $ \(memberId, member) -> do
-- set assignment
let assignment = fromMaybe "" $ Map.lookup memberId assignments
Expand All @@ -658,14 +686,14 @@ setAndPropagateAssignment group@Group{..} assignments = do
M.void $ C.tryPutMVar delayed (K.SyncGroupResponse {throttleTimeMs=0, errorCode=0, assignment=assignment})
H.delete delayedSyncResponses memberId

-- delayedJoinResponses should have been empty,
-- delayedSyncResponses should have been empty,
-- so it actually does nothing,
-- but it is useful if delayedJoinResponses is NOT empty(bug):
-- but it is useful if delayedSyncResponses is NOT empty(bug):
-- * log error information
-- * avoid inconsistent state
-- * avoid clients are getting stuck
cancelDelayedSyncResponses group "ERROR: delayedJoinResponses should be empty after propagated assignments"
Log.info $ "setAndPropagateAssignment completed"
cancelDelayedSyncResponses group "ERROR: delayedSyncResponses should be empty after propagated assignments"
Log.info $ "setAndPropagateAssignment completed, group:" <> Log.build groupId

leaveGroup :: Group -> K.LeaveGroupRequest -> IO K.LeaveGroupResponse
leaveGroup group@Group{..} req = do
Expand Down Expand Up @@ -694,8 +722,22 @@ getMember Group{..} memberId = do
removeMemberAndUpdateGroup :: Group -> Member -> IO ()
removeMemberAndUpdateGroup group@Group{..} member = do
Log.info $ "member: " <> Log.build member.memberId <> " is leaving group:" <> Log.build groupId

-- New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have
-- to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer
-- will retry the JoinGroup request if is still active.
cancelDelayedJoinResponse group member.memberId

removeMember group member
prepareRebalance group
prepareRebalance group $ "remove member:" <> member.memberId

cancelDelayedJoinResponse :: Group -> T.Text -> IO ()
cancelDelayedJoinResponse Group{..} memberId = do
H.lookup delayedJoinResponses memberId >>= \case
Nothing -> pure ()
Just delayed -> do
_ <- C.tryPutMVar delayed (makeJoinResponseError memberId K.UNKNOWN_MEMBER_ID)
H.delete delayedJoinResponses memberId

-- default heartbeat interval: 3s
heartbeat :: Group -> K.HeartbeatRequest -> IO K.HeartbeatResponse
Expand All @@ -721,9 +763,10 @@ checkGroupGenerationId :: Group -> Int32 -> IO ()
checkGroupGenerationId Group{..} generationId = do
currentGenerationId <- IO.readIORef groupGenerationId
M.unless (currentGenerationId == generationId) $ do
Log.debug $ "invalid generation id"
Log.info $ "invalid generation id"
<> ", current generationId:" <> Log.buildString' currentGenerationId
<> ", expected generationId" <> Log.buildString' generationId
<> ", expected generationId:" <> Log.buildString' generationId
<> ", group:" <> Log.build groupId
throw (ErrorCodeException K.ILLEGAL_GENERATION)

updateLatestHeartbeat :: Member -> IO ()
Expand All @@ -739,8 +782,9 @@ setupDelayedCheckHeartbeat group@Group{..} = do
updateLatestHeartbeat member
memberSessionTimeoutMs <- IO.readIORef member.sessionTimeoutMs
threadId <- C.forkIO $ delayedCheckHeart group member memberSessionTimeoutMs
Log.debug $ "setup delayed heartbeat check, threadId:" <> Log.buildString' threadId
Log.info $ "setup delayed heartbeat check, threadId:" <> Log.buildString' threadId
<> ", member:" <> Log.buildString' member.memberId
<> ", group:" <> Log.build groupId
IO.atomicWriteIORef member.heartbeatThread (Just threadId)

-- cancel all delayedCheckHearts
Expand All @@ -751,6 +795,7 @@ cancelDelayedCheckHeartbeats Group{..} = do
Nothing -> pure ()
Just tid -> do
Log.info $ "cancel delayedCheckHeart, member:" <> Log.buildString' mid
<> "group:" <> Log.build groupId
C.killThread tid
IO.atomicWriteIORef member.heartbeatThread Nothing

Expand All @@ -776,6 +821,7 @@ checkHeartbeatAndMaybeRebalance group member = do
<> ", lastHeartbeat:" <> Log.buildString' lastUpdated
<> ", now:" <> Log.buildString' now
<> ", sessionTimeoutMs:" <> Log.buildString' memberSessionTimeoutMs
<> ", group:" <> Log.build group.groupId
-- remove itself (to avoid killing itself in prepareRebalance)
IO.atomicWriteIORef member.heartbeatThread Nothing
removeMemberAndUpdateGroup group member
Expand Down Expand Up @@ -908,3 +954,16 @@ getMemberValue protocol assignments member = do
, assignment=assignment
, clientId=member.clientId
}

------------------- Group Error Response -------------------------
makeJoinResponseError :: T.Text -> K.ErrorCode -> K.JoinGroupResponse
makeJoinResponseError memberId errorCode =
K.JoinGroupResponse
{ errorCode = errorCode
, generationId = -1
, protocolName = ""
, leader = ""
, memberId = memberId
, members = K.NonNullKaArray V.empty
, throttleTimeMs = 0
}

0 comments on commit d83cab5

Please sign in to comment.