diff --git a/hstream-kafka/HStream/Kafka/Group/Group.hs b/hstream-kafka/HStream/Kafka/Group/Group.hs index 502eccce4..27a3fe75c 100644 --- a/hstream-kafka/HStream/Kafka/Group/Group.hs +++ b/hstream-kafka/HStream/Kafka/Group/Group.hs @@ -260,20 +260,18 @@ newGroupFromValue value metadataManager metaHandle = do joinGroup :: Group -> K.RequestContext -> K.JoinGroupRequest -> IO K.JoinGroupResponse joinGroup group@Group{..} reqCtx req = do -- delayed response(join barrier) - Log.debug $ "received joinGroup" + Log.info $ "received joinGroup request:" <> Log.buildString' req delayedResponse <- C.newEmptyMVar C.withMVar lock $ \_ -> do -- TODO: GROUP MAX SIZE checkSupportedProtocols group req - Log.debug $ "checked protocols" -- check state IO.readIORef group.state >>= \case Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) _ -> pure () - Log.debug $ "checked state" if T.null req.memberId then doNewMemberJoinGoup group reqCtx req delayedResponse else doCurrentMemeberJoinGroup group req delayedResponse @@ -281,11 +279,11 @@ joinGroup group@Group{..} reqCtx req = do -- waiting other consumers resp <- C.takeMVar delayedResponse Log.info $ "joinGroup: received delayed response:" <> Log.buildString' resp + <> "group:" <> Log.build groupId return resp checkSupportedProtocols :: Group -> K.JoinGroupRequest -> IO () checkSupportedProtocols Group{..} req = do - Log.debug $ "checking protocols" IO.readIORef state >>= \case Empty -> do M.when (T.null req.protocolType) $ do @@ -305,6 +303,7 @@ cancelDelayedSyncResponses Group{..} reason = do lst <- H.toList delayedSyncResponses M.forM_ lst $ \(memberId, delayed) -> do Log.info $ "cancel delayed sync response for " <> Log.buildString' memberId + <> ", group:" <> Log.build groupId <> ", reason:" <> Log.build reason _ <- C.tryPutMVar delayed $ K.SyncGroupResponse {throttleTimeMs=0, errorCode=K.REBALANCE_IN_PROGRESS, assignment=BS.empty} H.delete delayedSyncResponses memberId @@ -320,7 +319,8 @@ cancelDelayedSync Group{..} = do doNewMemberJoinGoup :: Group -> K.RequestContext -> K.JoinGroupRequest -> C.MVar K.JoinGroupResponse -> IO () doNewMemberJoinGoup group reqCtx req delayedResponse = do newMemberId <- generateMemberId reqCtx - Log.debug $ "generated member id:" <> Log.buildString' newMemberId + Log.info $ "generated member id:" <> Log.buildString' newMemberId + <> " for group:" <> Log.build group.groupId doDynamicNewMemberJoinGroup group reqCtx req newMemberId delayedResponse generateMemberId :: K.RequestContext -> IO T.Text @@ -350,15 +350,17 @@ addMemberAndRebalance group reqCtx req newMemberId delayedResponse = do member <- newMemberFromReq reqCtx req newMemberId (refineProtocols req.protocols) addMember group member (Just delayedResponse) -- TODO: check state - prepareRebalance group + prepareRebalance group $ "add member:" <> member.memberId updateMemberAndRebalance :: Group -> Member -> K.JoinGroupRequest -> C.MVar K.JoinGroupResponse -> IO () updateMemberAndRebalance group member req delayedResponse = do updateMember group member req delayedResponse - prepareRebalance group + prepareRebalance group $ "update member:" <> member.memberId -prepareRebalance :: Group -> IO () -prepareRebalance group@Group{..} = do +prepareRebalance :: Group -> T.Text -> IO () +prepareRebalance group@Group{..} reason = do + Log.info $ "prepare rebalance, group:" <> Log.build groupId + <> "reason:" <> Log.build reason -- check state CompletingRebalance and cancel delayedSyncResponses Utils.whenIORefEq state CompletingRebalance $ do cancelDelayedSyncResponses group "rebalance" @@ -374,6 +376,7 @@ prepareRebalance group@Group{..} = do Nothing -> do delayed <- makeDelayedRebalance group 5000 Log.info $ "created delayed rebalance thread:" <> Log.buildString' delayed + <> ", group:" <> Log.build groupId IO.atomicWriteIORef delayedRebalance (Just delayed) IO.atomicWriteIORef state PreparingRebalance _ -> pure () @@ -388,7 +391,7 @@ makeDelayedRebalance group rebalanceDelayMs = do rebalance :: Group -> IO () rebalance group@Group{..} = do C.withMVar lock $ \() -> do - Log.info "rebalancing is starting" + Log.info $ "rebalancing is starting, group:" <> Log.build groupId -- remove all members who haven't joined(and maybe elect new leader) removeNotYetRejoinedMembers group @@ -397,7 +400,7 @@ rebalance group@Group{..} = do <> ", generationId:" <> Log.build nextGenerationId IO.readIORef leader >>= \case Nothing -> do - Log.info "cancel rebalance without any join request" + Log.info $ "cancel rebalance without any join request, " <> Log.build groupId IO.atomicWriteIORef group.protocolName Nothing transitionTo group Empty @@ -415,22 +418,24 @@ transitionTo :: Group -> GroupState -> IO () transitionTo group state = do oldState <- IO.atomicModifyIORef' group.state (state, ) Log.info $ "group state changed, " <> Log.buildString' oldState <> " -> " <> Log.buildString' state + <> ", group:" <> Log.build group.groupId doRelance :: Group -> T.Text -> IO () doRelance group@Group{..} leaderMemberId = do selectedProtocolName <- computeProtocolName group - Log.info $ "selected protocolName:" <> Log.buildString' selectedProtocolName -- state changes transitionTo group CompletingRebalance generationId <- IO.readIORef groupGenerationId leaderMembersInResponse <- H.toList members >>= M.mapM (\(_, m) -> getJoinResponseMember selectedProtocolName m) - Log.debug $ "members in join responses" <> Log.buildString' leaderMembersInResponse + Log.info $ "members in join responses:" <> Log.buildString' leaderMembersInResponse + <> ", group:" <> Log.build groupId delayedJoinResponseList <- H.toList delayedJoinResponses - Log.info $ "set all delayed responses, response list size:" <> Log.buildString' (length delayedJoinResponseList) + Log.info $ "set all delayed responses, response list:" <> Log.buildString' (length delayedJoinResponseList) + <> ", group:" <> Log.build groupId -- response all delayedJoinResponses M.forM_ delayedJoinResponseList $ \(memberId, delayed) -> do let memebersInResponse = if leaderMemberId == memberId then leaderMembersInResponse else [] @@ -443,24 +448,33 @@ doRelance group@Group{..} leaderMemberId = do , members = K.KaArray (Just $ V.fromList memebersInResponse) , throttleTimeMs = 0 } - Log.debug $ "set delayed response:" <> Log.buildString' resp - <> " for " <> Log.buildString' memberId _ <- C.tryPutMVar delayed resp H.delete delayedJoinResponses memberId rebalanceTimeoutMs <- computeRebalnceTimeoutMs group delayedSyncTid <- makeDelayedSync group generationId rebalanceTimeoutMs IO.atomicWriteIORef delayedSync (Just delayedSyncTid) + Log.info $ "create delayed sync for group:" <> Log.build groupId + <> ", threadId:" <> Log.buildString' delayedSyncTid IO.atomicWriteIORef delayedRebalance Nothing - Log.info "rebalancing is finished" + Log.info $ "rebalancing is finished, group:" <> Log.build groupId removeNotYetRejoinedMembers :: Group -> IO () removeNotYetRejoinedMembers group@Group{..} = do (flip H.mapM_) members $ \(mid, member) -> do H.lookup delayedJoinResponses mid >>= \case Nothing -> do - Log.info $ "remove member: " <> Log.build mid <> " from " <> Log.build groupId + Log.info $ "remove not yet joined member: " <> Log.build mid <> " from " <> Log.build groupId + removeMember group member + Just _ -> pure () + +removeNotYetSyncedMembers :: Group -> IO () +removeNotYetSyncedMembers group@Group{..} = do + (flip H.mapM_) members $ \(mid, member) -> do + H.lookup delayedSyncResponses mid >>= \case + Nothing -> do + Log.info $ "remove not yet synced member: " <> Log.build mid <> " from " <> Log.build groupId removeMember group member Just _ -> pure () @@ -470,18 +484,20 @@ makeDelayedSync group@Group{..} generationId timeoutMs = do C.threadDelay (fromIntegral timeoutMs * 1000) C.withMVar lock $ \() -> do Utils.unlessIORefEq groupGenerationId generationId $ \currentGid -> do - Log.fatal $ "unexpected delayed sync with wrong generationId:" <> Log.build generationId + Log.warning $ "unexpected delayed sync with wrong generationId:" <> Log.build generationId <> ", current group generation id:" <> Log.build currentGid <> ", groupId:" <> Log.build groupId IO.readIORef state >>= \case CompletingRebalance -> do - Log.info $ "delayed sync timeout, try to prepare Rebalance, groupId:" <> Log.build groupId + Log.info $ "delayed sync timeout, try to prepare Rebalance, group:" <> Log.build groupId + removeNotYetSyncedMembers group + -- remove itself (to avoid killing itself in prepareRebalance) IO.atomicWriteIORef delayedSync Nothing - prepareRebalance group + prepareRebalance group $ "delayed sync timeout" s -> do - Log.fatal $ "unexpected delayed sync with wrong state:" <> Log.buildString' s - <> ", groupId:" <> Log.build groupId + Log.warning $ "unexpected delayed sync with wrong state:" <> Log.buildString' s + <> ", group:" <> Log.build groupId -- select max rebalanceTimeoutMs from all members computeRebalnceTimeoutMs :: Group -> IO Int32 @@ -503,7 +519,6 @@ computeProtocolName group@Group{..} = do IO.readIORef protocolName >>= \case Nothing -> do pn <- chooseProtocolName group - Log.debug $ "choosed protocolName" <> Log.buildString' pn IO.atomicWriteIORef protocolName (Just pn) pure pn Just pn -> pure pn @@ -512,8 +527,11 @@ computeProtocolName group@Group{..} = do chooseProtocolName :: Group -> IO T.Text chooseProtocolName Group {..} = do ps <- IO.readIORef supportedProtocols - Log.debug $ "protocols:" <> Log.buildString' ps - return . head $ Set.toList ps + let pn = head $ Set.toList ps + Log.info $ "choose protocol:" <> Log.build pn + <> ", current supported protocols:" <> Log.buildString' ps + <> ", group:" <> Log.build groupId + return pn updateSupportedProtocols :: Group -> [(T.Text, BS.ByteString)] -> IO () updateSupportedProtocols Group{..} protocols = do @@ -535,14 +553,13 @@ addMember group@Group{..} member delayedResponse = do Utils.whenIORefEq leader Nothing $ do IO.atomicWriteIORef leader (Just member.memberId) - Log.info $ "group updated leader, groupId:" <> Log.build groupId + Log.info $ "updated leader, group:" <> Log.build groupId <> "leader:" <> Log.build member.memberId H.insert members member.memberId member updateSupportedProtocols group memberProtocols M.forM_ delayedResponse $ \delayed -> do - Log.debug $ "add delayed response into response list for member:" <> Log.buildString' member.memberId H.insert delayedJoinResponses member.memberId delayed updateMember :: Group -> Member -> K.JoinGroupRequest -> C.MVar K.JoinGroupResponse -> IO () @@ -554,7 +571,8 @@ updateMember group@Group{..} member req delayedResponse = do IO.atomicWriteIORef member.sessionTimeoutMs req.sessionTimeoutMs -- TODO: check delayedJoinResponses - Log.debug $ "add delayed response into response list for member:" <> Log.buildString' member.memberId + Log.info $ "updated member, add delayed response into response list for member:" + <> Log.buildString' member.memberId <> ", group:" <> Log.build groupId H.insert delayedJoinResponses member.memberId delayedResponse removeMember :: Group -> Member -> IO () @@ -572,9 +590,14 @@ removeMember Group{..} member = do [] -> do -- select from members H.toList members >>= \ms -> do - IO.atomicWriteIORef leader (listToMaybe (fmap fst ms)) + let newLeader = listToMaybe (fmap fst ms) + IO.atomicWriteIORef leader newLeader + Log.info $ "selcet new leader from members, group:" <> Log.build groupId + <> ", new leader:" <> Log.buildString' newLeader <> ", old leader:" <> Log.build member.memberId (mid, _):_ -> do IO.atomicWriteIORef leader (Just mid) + Log.info $ "select new leader from delayedJoinResponses, group:" <> Log.build groupId + <> ", new leader:" <> Log.build mid <> ", old leader:" <> Log.build member.memberId plainProtocols :: [(T.Text, BS.ByteString)] -> Set.Set T.Text @@ -592,6 +615,7 @@ refineProtocols protocols = case K.unKaArray protocols of syncGroup :: Group -> K.SyncGroupRequest -> IO K.SyncGroupResponse syncGroup group req@K.SyncGroupRequest{..} = do + Log.info $ "received sync group request:" <> Log.buildString' req delayed <- C.newEmptyMVar C.withMVar (group.lock) $ \() -> do -- check member id @@ -605,7 +629,10 @@ syncGroup group req@K.SyncGroupRequest{..} = do M.void $ C.tryPutMVar delayed (K.SyncGroupResponse {throttleTimeMs=0, errorCode=0, assignment=assignment}) PreparingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS) _ -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) - C.readMVar delayed + resp <- C.readMVar delayed + Log.info $ "received delayed sync group response:" <> Log.buildString' resp + <> ", group:" <> Log.build group.groupId + return resp doSyncGroup :: Group -> K.SyncGroupRequest -> C.MVar K.SyncGroupResponse -> IO () doSyncGroup group@Group{..} req@K.SyncGroupRequest{memberId=memberId} delayedResponse = do @@ -620,8 +647,9 @@ doSyncGroup group@Group{..} req@K.SyncGroupRequest{memberId=memberId} delayedRes (Just leaderMemberId) <- IO.readIORef leader Log.info $ "sync group leaderMemberId: " <> Log.buildString' leaderMemberId <> " memberId:" <> Log.buildString' memberId + <> " group:" <> Log.build groupId + <> " isLeader:" <> Log.buildString' (memberId == leaderMemberId) when (memberId == leaderMemberId) $ do - Log.info $ "received leader SyncGroup request, " <> Log.buildString' memberId let assignmentMap = getAssignmentMap req storeGroup group assignmentMap @@ -645,7 +673,7 @@ setAndPropagateAssignment :: Group -> Map.Map T.Text BS.ByteString -> IO () setAndPropagateAssignment group@Group{..} assignments = do -- set assignments Log.info $ "setting assignments:" <> Log.buildString' assignments - <> "group Id:" <> Log.build groupId + <> "group:" <> Log.build groupId (flip H.mapM_) members $ \(memberId, member) -> do -- set assignment let assignment = fromMaybe "" $ Map.lookup memberId assignments @@ -658,14 +686,14 @@ setAndPropagateAssignment group@Group{..} assignments = do M.void $ C.tryPutMVar delayed (K.SyncGroupResponse {throttleTimeMs=0, errorCode=0, assignment=assignment}) H.delete delayedSyncResponses memberId - -- delayedJoinResponses should have been empty, + -- delayedSyncResponses should have been empty, -- so it actually does nothing, - -- but it is useful if delayedJoinResponses is NOT empty(bug): + -- but it is useful if delayedSyncResponses is NOT empty(bug): -- * log error information -- * avoid inconsistent state -- * avoid clients are getting stuck - cancelDelayedSyncResponses group "ERROR: delayedJoinResponses should be empty after propagated assignments" - Log.info $ "setAndPropagateAssignment completed" + cancelDelayedSyncResponses group "ERROR: delayedSyncResponses should be empty after propagated assignments" + Log.info $ "setAndPropagateAssignment completed, group:" <> Log.build groupId leaveGroup :: Group -> K.LeaveGroupRequest -> IO K.LeaveGroupResponse leaveGroup group@Group{..} req = do @@ -694,8 +722,22 @@ getMember Group{..} memberId = do removeMemberAndUpdateGroup :: Group -> Member -> IO () removeMemberAndUpdateGroup group@Group{..} member = do Log.info $ "member: " <> Log.build member.memberId <> " is leaving group:" <> Log.build groupId + + -- New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have + -- to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer + -- will retry the JoinGroup request if is still active. + cancelDelayedJoinResponse group member.memberId + removeMember group member - prepareRebalance group + prepareRebalance group $ "remove member:" <> member.memberId + +cancelDelayedJoinResponse :: Group -> T.Text -> IO () +cancelDelayedJoinResponse Group{..} memberId = do + H.lookup delayedJoinResponses memberId >>= \case + Nothing -> pure () + Just delayed -> do + _ <- C.tryPutMVar delayed (makeJoinResponseError memberId K.UNKNOWN_MEMBER_ID) + H.delete delayedJoinResponses memberId -- default heartbeat interval: 3s heartbeat :: Group -> K.HeartbeatRequest -> IO K.HeartbeatResponse @@ -721,9 +763,10 @@ checkGroupGenerationId :: Group -> Int32 -> IO () checkGroupGenerationId Group{..} generationId = do currentGenerationId <- IO.readIORef groupGenerationId M.unless (currentGenerationId == generationId) $ do - Log.debug $ "invalid generation id" + Log.info $ "invalid generation id" <> ", current generationId:" <> Log.buildString' currentGenerationId - <> ", expected generationId" <> Log.buildString' generationId + <> ", expected generationId:" <> Log.buildString' generationId + <> ", group:" <> Log.build groupId throw (ErrorCodeException K.ILLEGAL_GENERATION) updateLatestHeartbeat :: Member -> IO () @@ -739,8 +782,9 @@ setupDelayedCheckHeartbeat group@Group{..} = do updateLatestHeartbeat member memberSessionTimeoutMs <- IO.readIORef member.sessionTimeoutMs threadId <- C.forkIO $ delayedCheckHeart group member memberSessionTimeoutMs - Log.debug $ "setup delayed heartbeat check, threadId:" <> Log.buildString' threadId + Log.info $ "setup delayed heartbeat check, threadId:" <> Log.buildString' threadId <> ", member:" <> Log.buildString' member.memberId + <> ", group:" <> Log.build groupId IO.atomicWriteIORef member.heartbeatThread (Just threadId) -- cancel all delayedCheckHearts @@ -751,6 +795,7 @@ cancelDelayedCheckHeartbeats Group{..} = do Nothing -> pure () Just tid -> do Log.info $ "cancel delayedCheckHeart, member:" <> Log.buildString' mid + <> "group:" <> Log.build groupId C.killThread tid IO.atomicWriteIORef member.heartbeatThread Nothing @@ -776,6 +821,7 @@ checkHeartbeatAndMaybeRebalance group member = do <> ", lastHeartbeat:" <> Log.buildString' lastUpdated <> ", now:" <> Log.buildString' now <> ", sessionTimeoutMs:" <> Log.buildString' memberSessionTimeoutMs + <> ", group:" <> Log.build group.groupId -- remove itself (to avoid killing itself in prepareRebalance) IO.atomicWriteIORef member.heartbeatThread Nothing removeMemberAndUpdateGroup group member @@ -908,3 +954,16 @@ getMemberValue protocol assignments member = do , assignment=assignment , clientId=member.clientId } + +------------------- Group Error Response ------------------------- +makeJoinResponseError :: T.Text -> K.ErrorCode -> K.JoinGroupResponse +makeJoinResponseError memberId errorCode = + K.JoinGroupResponse + { errorCode = errorCode + , generationId = -1 + , protocolName = "" + , leader = "" + , memberId = memberId + , members = K.NonNullKaArray V.empty + , throttleTimeMs = 0 + }