Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed May 7, 2024
1 parent c401fc6 commit 6b13ec4
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 54 deletions.
14 changes: 10 additions & 4 deletions hstream-kafka/HStream/Kafka/Common/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,16 @@ decodeBase64 :: T.Text -> BS.ByteString
decodeBase64 = Base64.decodeBase64Lenient . T.encodeUtf8

-- | Perform the action when the predicate is true or timeout is reached.
onOrTimeout :: IO Bool -> Int -> IO b -> IO b
onOrTimeout p timeoutMs action =
Timeout.timeout (timeoutMs * 1000) loop >>= \case
Nothing -> action
-- An extra action is performed when the timeout expire, whose result will
-- be discarded.
-- Warning: The second action is always performed no matter whether the
-- timeout is reached or not.
onOrTimeout :: IO Bool -> Int -> IO a -> IO b -> IO b
onOrTimeout p timeoutMs actionOnExpire action =
Timeout.timeout (timeoutMs * 1000) action loop >>= \case
Nothing -> do
M.void actionOnExpire
action
Just a -> return a
where
loop = p >>= \case
Expand Down
133 changes: 84 additions & 49 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
Expand All @@ -7,6 +8,7 @@ module HStream.Kafka.Group.Group where

import qualified Control.Concurrent as C
import Control.Exception (throw)
import qualified Control.Exception as E
import Control.Monad (when)
import qualified Control.Monad as M
import qualified Data.ByteString as BS
Expand Down Expand Up @@ -106,6 +108,29 @@ data GroupState
| Empty
deriving (Show, Eq)

groupStateValidPrevs :: GroupState -> [GroupState]
groupStateValidPrevs = \case
PreparingRebalance -> [Stable, Empty, CompletingRebalance]
CompletingRebalance -> [PreparingRebalance]
Stable -> [CompletingRebalance]
Dead -> [Stable, Empty, Dead, PreparingRebalance, CompletingRebalance]
Empty -> [PreparingRebalance]

data PleaseFixmeException = PleaseFixmeException T.Text
deriving (Show, E.Exception)

-- FIXME: throw exceptions or use Either? I prefer the latter...
transitionTo :: Group -> GroupState -> IO ()
transitionTo group toState = do
curState <- IO.readIORef group.state
if curState `elem` groupStateValidPrevs toState
then do
IO.atomicWriteIORef group.state toState
Log.info $ "group state changed, " <> Log.buildString' curState <> " -> " <> Log.buildString' toState
<> ", group:" <> Log.build group.groupId
else do
throw (PleaseFixmeException $ "Invalid state transition from " <> T.pack (show curState) <> " to " <> T.pack (show toState) <> " for group:" <> group.groupId)

data Group = Group
{ lock :: C.MVar ()
, groupId :: T.Text
Expand Down Expand Up @@ -355,27 +380,26 @@ addMemberAndRebalance group reqCtx req newMemberId delayedResponse = do
member <- newMemberFromReq reqCtx req newMemberId (refineProtocols req.protocols)
addMember group member (Just delayedResponse)
-- TODO: check state
-- Note: We consider the group as a new one if empty, and always wait
-- an interval for joining. Otherwise, wait until all previous members
-- have joined or timeout.
-- FIXME: Is this correct?
-- FIXME: How long to wait for a new group? Is 'group.initial.rebalance.delay.ms' correct?
-- FIXME: Hardcoded constant!
-- Note: We only support dynamic join so it is OK to consider it as new init join
-- when the group was empty.
memberRebalanceTimeoutMs <- IO.readIORef member.rebalanceTimeoutMs
prepareRebalance group
(if isGroupEmpty then return False else haveAllMembersRejoined)
(if isGroupEmpty then fromIntegral group.groupConfig.groupInitialRebalanceDelay
else memberRebalanceTimeoutMs)
("add member:" <> member.memberId)
curState <- IO.readIORef group.state
curMembers <- H.toList group.members
when (curState `elem` [Empty, Stable, CompletingRebalance]) $ do
prepareRebalance group
(if isGroupEmpty then return False else (haveAllMembersRejoined curMembers))
(if isGroupEmpty then fromIntegral group.groupConfig.groupInitialRebalanceDelay
else memberRebalanceTimeoutMs)
("add member:" <> member.memberId)
where
-- Note: The new-added member is always present. So this is equivalent
-- to check among members before.
haveAllMembersRejoined :: IO Bool
haveAllMembersRejoined = do
H.foldM (\acc (mid,_) -> case acc of
haveAllMembersRejoined :: [(T.Text, Member)] -> IO Bool
haveAllMembersRejoined curMembers_ = do
M.foldM (\acc (mid,_) -> case acc of
False -> return False
True -> isJust <$> H.lookup group.delayedJoinResponses mid
) True group.members
) True curMembers_

updateMemberAndRebalance :: Group -> Member -> K.JoinGroupRequest -> C.MVar K.JoinGroupResponse -> IO ()
updateMemberAndRebalance group member req delayedResponse = do
Expand All @@ -384,19 +408,22 @@ updateMemberAndRebalance group member req delayedResponse = do
-- is present. So we wait until all previous members have joined or timeout.
-- FIXME: How long to wait? Is 'rebalanceTimeoutMs' of the member correct?
timeout <- IO.readIORef member.rebalanceTimeoutMs
prepareRebalance group
haveAllMembersRejoined
timeout
("update member:" <> member.memberId)
curState <- IO.readIORef group.state
curMembers <- H.toList group.members
when (curState `elem` [Empty, Stable, CompletingRebalance]) $ do
prepareRebalance group
(haveAllMembersRejoined curMembers)
timeout
("update member:" <> member.memberId)
where
-- Note: The new-added member is always present. So this is equivalent
-- to check among members before.
haveAllMembersRejoined :: IO Bool
haveAllMembersRejoined = do
H.foldM (\acc (mid,_) -> case acc of
haveAllMembersRejoined :: [(T.Text, Member)] -> IO Bool
haveAllMembersRejoined curMembers_ = do
M.foldM (\acc (mid,_) -> case acc of
False -> return False
True -> isJust <$> H.lookup group.delayedJoinResponses mid
) True group.members
) True curMembers_

prepareRebalance :: Group -> IO Bool -> Int32 -> T.Text -> IO ()
prepareRebalance group@Group{..} p timeoutMs reason = do
Expand All @@ -409,21 +436,19 @@ prepareRebalance group@Group{..} p timeoutMs reason = do
-- cancel delayed sync
cancelDelayedSync group

-- isEmptyState <- (Empty ==) <$> IO.readIORef state

-- setup delayed rebalance if delayedRebalance is Nothing
IO.readIORef delayedRebalance >>= \case
Nothing -> do
delayed <- makeDelayedRebalance group p (fromIntegral timeoutMs)
Log.info $ "created delayed rebalance thread:" <> Log.buildString' delayed
<> ", group:" <> Log.build groupId
IO.atomicWriteIORef delayedRebalance (Just delayed)
IO.atomicWriteIORef state PreparingRebalance
transitionTo group PreparingRebalance
_ -> pure ()

makeDelayedRebalance :: Group -> IO Bool -> Int -> IO C.ThreadId
makeDelayedRebalance group p rebalanceDelayMs =
C.forkIO $ Utils.onOrTimeout p rebalanceDelayMs (rebalance group)
C.forkIO $ Utils.onOrTimeout p rebalanceDelayMs (pure ()) (rebalance group)

rebalance :: Group -> IO ()
rebalance group@Group{..} = do
Expand Down Expand Up @@ -451,12 +476,6 @@ rebalance group@Group{..} = do
Just leaderMemberId -> do
doRelance group leaderMemberId

transitionTo :: Group -> GroupState -> IO ()
transitionTo group state = do
oldState <- IO.atomicModifyIORef' group.state (state, )
Log.info $ "group state changed, " <> Log.buildString' oldState <> " -> " <> Log.buildString' state
<> ", group:" <> Log.build group.groupId

doRelance :: Group -> T.Text -> IO ()
doRelance group@Group{..} leaderMemberId = do
selectedProtocolName <- computeProtocolName group
Expand Down Expand Up @@ -519,8 +538,11 @@ removeNotYetSyncedMembers group@Group{..} = do

makeDelayedSync :: Group -> Int32 -> Int32 -> IO C.ThreadId
makeDelayedSync group@Group{..} generationId timeoutMs = do
C.forkIO $ do
C.threadDelay (fromIntegral timeoutMs * 1000)
curMembers <- H.toList members
C.forkIO $ Utils.onOrTimeout (hasReceivedAllSyncs curMembers)
(fromIntegral timeoutMs) -- always use rebalance timeout
(pure ())
$ do
C.withMVar lock $ \() -> do
Utils.unlessIORefEq groupGenerationId generationId $ \currentGid -> do
Log.warning $ "unexpected delayed sync with wrong generationId:" <> Log.build generationId
Expand All @@ -533,13 +555,16 @@ makeDelayedSync group@Group{..} generationId timeoutMs = do

-- remove itself (to avoid killing itself in prepareRebalance)
IO.atomicWriteIORef delayedSync Nothing
prepareRebalance group
(pure False) -- FIXME: Is this correct?
5000 -- FIXME: timeout?
"delayed sync timeout"
s -> do
Log.warning $ "unexpected delayed sync with wrong state:" <> Log.buildString' s
<> ", group:" <> Log.build groupId
where
hasReceivedAllSyncs :: [(T.Text, Member)] -> IO Bool
hasReceivedAllSyncs curMembers_ = do
M.foldM (\acc (mid,_) -> case acc of
False -> return False
True -> isJust <$> H.lookup group.delayedSyncResponses mid
) True curMembers_

-- select max rebalanceTimeoutMs from all members
computeRebalnceTimeoutMs :: Group -> IO Int32
Expand Down Expand Up @@ -747,12 +772,11 @@ leaveGroup group@Group{..} req = do
CompletingRebalance -> removeMemberAndUpdateGroup group member
Stable -> removeMemberAndUpdateGroup group member
PreparingRebalance -> do
-- TODO: should NOT BE PASSIBLE in this version
Log.warning $ "received a leave group in PreparingRebalance state, ignored it"
<> ", groupId:" <> Log.buildString' req.groupId
<> ", memberId:" <> Log.buildString' req.memberId
throw (ErrorCodeException K.UNKNOWN_MEMBER_ID)

-- TODO: should NOT BE POSSIBLE in this version
Log.warning $ "received a leave group in PreparingRebalance state, ignored it"
<> ", groupId:" <> Log.buildString' req.groupId
<> ", memberId:" <> Log.buildString' req.memberId
throw (ErrorCodeException K.UNKNOWN_MEMBER_ID)
return $ K.LeaveGroupResponse {errorCode=0, throttleTimeMs=0}

getMember :: Group -> T.Text -> IO Member
Expand All @@ -771,10 +795,21 @@ removeMemberAndUpdateGroup group@Group{..} member = do
cancelDelayedJoinResponse group member.memberId

removeMember group member
prepareRebalance group
(pure True) -- FIXME: Is this correct?
5000 -- FIXME: timeout?
("remove member:" <> member.memberId)

curState <- IO.readIORef state
curMembers <- H.toList members
when (curState `elem` [Empty, Stable, CompletingRebalance]) $ do
prepareRebalance group
(haveAllMembersRejoined curMembers)
5000 -- FIXME: use group.rebalance.timeout
("remove member:" <> member.memberId)
where
haveAllMembersRejoined :: [(T.Text, Member)] -> IO Bool
haveAllMembersRejoined curMembers_ = do
M.foldM (\acc (mid,_) -> case acc of
False -> return False
True -> isJust <$> H.lookup group.delayedJoinResponses mid
) True curMembers_

cancelDelayedJoinResponse :: Group -> T.Text -> IO ()
cancelDelayedJoinResponse Group{..} memberId = do
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ test-suite hstream-kafka-test
HStream.Kafka.Common.AclEntrySpec
HStream.Kafka.Common.AclSpec
HStream.Kafka.Common.AuthorizerSpec
HStream.Kafka.Common.ConfigSpec
HStream.Kafka.Common.OffsetManagerSpec
HStream.Kafka.Common.TestUtils
HStream.Kafka.Common.ConfigSpec

hs-source-dirs: tests
build-depends:
Expand Down

0 comments on commit 6b13ec4

Please sign in to comment.