Skip to content

Commit

Permalink
kafka: fix strategy on choosing partition assignment protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed May 10, 2024
1 parent 35d0792 commit 15b173b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
30 changes: 20 additions & 10 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import qualified Data.ByteString as BS
import qualified Data.HashTable.IO as H
import Data.Int (Int32)
import qualified Data.IORef as IO
import qualified Data.List as List
import qualified Data.List as L
import qualified Data.List.Extra as L
import qualified Data.Map as Map
import Data.Maybe (fromMaybe, isJust,
listToMaybe)
Expand Down Expand Up @@ -560,7 +561,7 @@ getJoinResponseMember protocol m = do
getMemberMetadata :: Member -> T.Text -> IO BS.ByteString
getMemberMetadata member protocol = do
memberProtocols <- IO.readIORef member.supportedProtocols
return $ snd . fromMaybe ("", "") $ List.find (\(n, _) -> n == protocol) memberProtocols
return $ snd . fromMaybe ("", "") $ L.find (\(n, _) -> n == protocol) memberProtocols

computeProtocolName :: Group -> IO T.Text
computeProtocolName group@Group{..} = do
Expand All @@ -571,15 +572,24 @@ computeProtocolName group@Group{..} = do
pure pn
Just pn -> pure pn

-- choose protocol name from supportedProtocols
-- | Select the protocol for this group which is supported by all members.
-- This is done by letting each member vote for one of the protocols
-- and choose the one with the most votes. See Member.hs#voteForProtocol.
-- Also see kafka.coordinator.group.GroupMetadata$selectProtocol.
-- FIXME: How about protocols with the same occurrence?
chooseProtocolName :: Group -> IO T.Text
chooseProtocolName Group {..} = do
ps <- IO.readIORef supportedProtocols
let pn = head $ Set.toList ps
Log.info $ "choose protocol:" <> Log.build pn
<> ", current supported protocols:" <> Log.buildString' ps
<> ", group:" <> Log.build groupId
return pn
chooseProtocolName group = do
candicateProtocols <- IO.readIORef group.supportedProtocols
members_ <- (L.map snd) <$> H.toList group.members
chosenProtocol <-
mostOccur <$> mapM (voteForProtocol candicateProtocols) members_
Log.info $ "choose protocol:" <> Log.build chosenProtocol
<> ", current supported protocols:" <> Log.buildString' candicateProtocols
<> ", group:" <> Log.build group.groupId
return chosenProtocol
where
mostOccur :: [T.Text] -> T.Text
mostOccur = L.head . L.maximumOn L.length . L.group . L.sort

updateSupportedProtocols :: Group -> [(T.Text, BS.ByteString)] -> IO ()
updateSupportedProtocols Group{..} protocols = do
Expand Down
40 changes: 29 additions & 11 deletions hstream-kafka/HStream/Kafka/Group/Member.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@

module HStream.Kafka.Group.Member where

import qualified Control.Concurrent as C
import Control.Monad (join)
import qualified Data.ByteString as BS
import Data.Int (Int32, Int64)
import qualified Data.IORef as IO
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import qualified HStream.Common.Server.MetaData as CM
import qualified HStream.Kafka.Common.Utils as Utils
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Service as K
import qualified Control.Concurrent as C
import Control.Exception (throw)
import Control.Monad (join)
import qualified Data.ByteString as BS
import Data.Int (Int32, Int64)
import qualified Data.IORef as IO
import qualified Data.List as L
import Data.Maybe (fromMaybe)
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified HStream.Common.Server.MetaData as CM
import HStream.Kafka.Common.KafkaException (ErrorCodeException (..))
import qualified HStream.Kafka.Common.Utils as Utils
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Service as K

data Member
= Member
Expand Down Expand Up @@ -92,3 +97,16 @@ newMemberFromValue groupValue value = do
, clientId=value.clientId
, clientHost=value.clientHost
}

-- | Vote for a protocol the member prefers from a set of candidates,
-- which **all members support**. "prefer" means following the order
-- of protocols the member supports.
-- Throw an exception if no protocol is found. **This should not happen**
-- because it is caller's responsibility to ensure the 'candidates'
-- argument is the common subset of all members' supported protocols!
voteForProtocol :: Set.Set T.Text -> Member -> IO T.Text
voteForProtocol candidates member = do
supportedProtocols' <- (L.map fst) <$> IO.readIORef member.supportedProtocols
case L.find (`Set.member` candidates) supportedProtocols' of
Nothing -> throw (ErrorCodeException K.INCONSISTENT_GROUP_PROTOCOL)
Just protocol -> return protocol

0 comments on commit 15b173b

Please sign in to comment.