Skip to content

Commit

Permalink
kafka: do rebalance on all members have rejoined & some member is rem…
Browse files Browse the repository at this point in the history
…oved
  • Loading branch information
Commelina committed Apr 29, 2024
1 parent 128261e commit dd836b5
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 14 deletions.
14 changes: 14 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

module HStream.Kafka.Common.Utils where

import Control.Concurrent
import Control.Exception (throw)
import qualified Control.Monad as M
import qualified Control.Monad.ST as ST
Expand All @@ -18,6 +19,7 @@ import qualified Data.Text.Encoding as T
import qualified Data.Vector as V
import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException))
import qualified Kafka.Protocol.Encoding as K
import qualified System.Timeout as Timeout

type HashTable k v = H.BasicHashTable k v

Expand Down Expand Up @@ -96,3 +98,15 @@ encodeBase64 = Base64.extractBase64 . Base64.encodeBase64

decodeBase64 :: T.Text -> BS.ByteString
decodeBase64 = Base64.decodeBase64Lenient . T.encodeUtf8

-- | Perform the action when the predicate is true or timeout is reached.
onOrTimeout :: IO Bool -> Int -> IO b -> IO b
onOrTimeout p timeoutMs action =
Timeout.timeout (timeoutMs * 1000) loop >>= \case
Nothing -> action
Just a -> return a
where
loop = p >>= \case
True -> action
-- FIXME: Hardcoded constant (check every 1ms)
False -> threadDelay 1000 >> loop
72 changes: 58 additions & 14 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import Data.Int (Int32)
import qualified Data.IORef as IO
import qualified Data.List as List
import qualified Data.Map as Map
import Data.Maybe (fromMaybe, listToMaybe)
import Data.Maybe (fromMaybe, isJust,
listToMaybe)
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.UUID as UUID
Expand Down Expand Up @@ -347,18 +348,53 @@ doDynamicNewMemberJoinGroup group reqCtx req newMemberId delayedResponse = do

addMemberAndRebalance :: Group -> K.RequestContext -> K.JoinGroupRequest -> T.Text -> C.MVar K.JoinGroupResponse -> IO ()
addMemberAndRebalance group reqCtx req newMemberId delayedResponse = do
isGroupEmpty <- Utils.hashtableNull group.members
member <- newMemberFromReq reqCtx req newMemberId (refineProtocols req.protocols)
addMember group member (Just delayedResponse)
-- TODO: check state
prepareRebalance group $ "add member:" <> member.memberId
-- Note: We consider the group as a new one if empty, and always wait
-- an interval for joining. Otherwise, wait until all previous members
-- have joined or timeout.
-- FIXME: Is this correct?
-- FIXME: How long to wait for a new group? Is 'group.initial.rebalance.delay.ms' correct?
-- FIXME: Hardcoded constant!
prepareRebalance group
(if isGroupEmpty then return False else haveAllMembersRejoined)
5000
("add member:" <> member.memberId)
where
-- Note: The new-added member is always present. So this is equivalent
-- to check among members before.
haveAllMembersRejoined :: IO Bool
haveAllMembersRejoined = do
H.foldM (\acc (mid,_) -> case acc of
False -> return False
True -> isJust <$> H.lookup group.delayedJoinResponses mid
) True group.members

updateMemberAndRebalance :: Group -> Member -> K.JoinGroupRequest -> C.MVar K.JoinGroupResponse -> IO ()
updateMemberAndRebalance group member req delayedResponse = do
updateMember group member req delayedResponse
prepareRebalance group $ "update member:" <> member.memberId

prepareRebalance :: Group -> T.Text -> IO ()
prepareRebalance group@Group{..} reason = do
-- Note: On this case, the group can not be empty because at least this member
-- is present. So we wait until all previous members have joined or timeout.
-- FIXME: How long to wait? Is 'rebalanceTimeoutMs' of the member correct?
timeout <- IO.readIORef member.rebalanceTimeoutMs
prepareRebalance group
haveAllMembersRejoined
timeout
("update member:" <> member.memberId)
where
-- Note: The new-added member is always present. So this is equivalent
-- to check among members before.
haveAllMembersRejoined :: IO Bool
haveAllMembersRejoined = do
H.foldM (\acc (mid,_) -> case acc of
False -> return False
True -> isJust <$> H.lookup group.delayedJoinResponses mid
) True group.members

prepareRebalance :: Group -> IO Bool -> Int32 -> T.Text -> IO ()
prepareRebalance group@Group{..} p timeoutMs reason = do
Log.info $ "prepare rebalance, group:" <> Log.build groupId
<> "reason:" <> Log.build reason
-- check state CompletingRebalance and cancel delayedSyncResponses
Expand All @@ -374,19 +410,19 @@ prepareRebalance group@Group{..} reason = do
-- TODO: configurable initRebalanceDelayMs, 5000 by default
IO.readIORef delayedRebalance >>= \case
Nothing -> do
delayed <- makeDelayedRebalance group 5000
delayed <- makeDelayedRebalance group p timeoutMs
Log.info $ "created delayed rebalance thread:" <> Log.buildString' delayed
<> ", group:" <> Log.build groupId
IO.atomicWriteIORef delayedRebalance (Just delayed)
IO.atomicWriteIORef state PreparingRebalance
_ -> pure ()

-- TODO: dynamically delay with initTimeoutMs and RebalanceTimeoutMs
makeDelayedRebalance :: Group -> Int32 -> IO C.ThreadId
makeDelayedRebalance group rebalanceDelayMs = do
C.forkIO $ do
C.threadDelay (1000 * fromIntegral rebalanceDelayMs)
rebalance group
makeDelayedRebalance :: Group -> IO Bool -> Int32 -> IO C.ThreadId
makeDelayedRebalance group p rebalanceDelayMs =
C.forkIO $ Utils.onOrTimeout p
(fromIntegral rebalanceDelayMs)
(rebalance group)

rebalance :: Group -> IO ()
rebalance group@Group{..} = do
Expand Down Expand Up @@ -452,6 +488,8 @@ doRelance group@Group{..} leaderMemberId = do
H.delete delayedJoinResponses memberId

rebalanceTimeoutMs <- computeRebalnceTimeoutMs group
-- FIXME: Is it correct to use rebalance timeout here? Or maybe session timeout?
-- The state machine here is really weird...
delayedSyncTid <- makeDelayedSync group generationId rebalanceTimeoutMs
IO.atomicWriteIORef delayedSync (Just delayedSyncTid)
Log.info $ "create delayed sync for group:" <> Log.build groupId
Expand Down Expand Up @@ -494,7 +532,10 @@ makeDelayedSync group@Group{..} generationId timeoutMs = do

-- remove itself (to avoid killing itself in prepareRebalance)
IO.atomicWriteIORef delayedSync Nothing
prepareRebalance group $ "delayed sync timeout"
prepareRebalance group
(pure False) -- FIXME: Is this correct?
5000 -- FIXME: timeout?
"delayed sync timeout"
s -> do
Log.warning $ "unexpected delayed sync with wrong state:" <> Log.buildString' s
<> ", group:" <> Log.build groupId
Expand Down Expand Up @@ -729,7 +770,10 @@ removeMemberAndUpdateGroup group@Group{..} member = do
cancelDelayedJoinResponse group member.memberId

removeMember group member
prepareRebalance group $ "remove member:" <> member.memberId
prepareRebalance group
(pure True) -- FIXME: Is this correct?
5000 -- FIXME: timeout?
("remove member:" <> member.memberId)

cancelDelayedJoinResponse :: Group -> T.Text -> IO ()
cancelDelayedJoinResponse Group{..} memberId = do
Expand Down

0 comments on commit dd836b5

Please sign in to comment.