From 15b173b15c70e5ca738b81b7c310a79a0a6a1349 Mon Sep 17 00:00:00 2001 From: Commelina Date: Fri, 10 May 2024 10:04:41 +0300 Subject: [PATCH] kafka: fix strategy on choosing partition assignment protocol --- hstream-kafka/HStream/Kafka/Group/Group.hs | 30 ++++++++++------ hstream-kafka/HStream/Kafka/Group/Member.hs | 40 +++++++++++++++------ 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Group/Group.hs b/hstream-kafka/HStream/Kafka/Group/Group.hs index e39dbf48a..2c91c70f0 100644 --- a/hstream-kafka/HStream/Kafka/Group/Group.hs +++ b/hstream-kafka/HStream/Kafka/Group/Group.hs @@ -13,7 +13,8 @@ import qualified Data.ByteString as BS import qualified Data.HashTable.IO as H import Data.Int (Int32) import qualified Data.IORef as IO -import qualified Data.List as List +import qualified Data.List as L +import qualified Data.List.Extra as L import qualified Data.Map as Map import Data.Maybe (fromMaybe, isJust, listToMaybe) @@ -560,7 +561,7 @@ getJoinResponseMember protocol m = do getMemberMetadata :: Member -> T.Text -> IO BS.ByteString getMemberMetadata member protocol = do memberProtocols <- IO.readIORef member.supportedProtocols - return $ snd . fromMaybe ("", "") $ List.find (\(n, _) -> n == protocol) memberProtocols + return $ snd . fromMaybe ("", "") $ L.find (\(n, _) -> n == protocol) memberProtocols computeProtocolName :: Group -> IO T.Text computeProtocolName group@Group{..} = do @@ -571,15 +572,24 @@ computeProtocolName group@Group{..} = do pure pn Just pn -> pure pn --- choose protocol name from supportedProtocols +-- | Select the protocol for this group which is supported by all members. +-- This is done by letting each member vote for one of the protocols +-- and choose the one with the most votes. See Member.hs#voteForProtocol. +-- Also see kafka.coordinator.group.GroupMetadata$selectProtocol. +-- FIXME: How about protocols with the same occurrence? chooseProtocolName :: Group -> IO T.Text -chooseProtocolName Group {..} = do - ps <- IO.readIORef supportedProtocols - let pn = head $ Set.toList ps - Log.info $ "choose protocol:" <> Log.build pn - <> ", current supported protocols:" <> Log.buildString' ps - <> ", group:" <> Log.build groupId - return pn +chooseProtocolName group = do + candicateProtocols <- IO.readIORef group.supportedProtocols + members_ <- (L.map snd) <$> H.toList group.members + chosenProtocol <- + mostOccur <$> mapM (voteForProtocol candicateProtocols) members_ + Log.info $ "choose protocol:" <> Log.build chosenProtocol + <> ", current supported protocols:" <> Log.buildString' candicateProtocols + <> ", group:" <> Log.build group.groupId + return chosenProtocol + where + mostOccur :: [T.Text] -> T.Text + mostOccur = L.head . L.maximumOn L.length . L.group . L.sort updateSupportedProtocols :: Group -> [(T.Text, BS.ByteString)] -> IO () updateSupportedProtocols Group{..} protocols = do diff --git a/hstream-kafka/HStream/Kafka/Group/Member.hs b/hstream-kafka/HStream/Kafka/Group/Member.hs index 23cc82d94..491ab3964 100644 --- a/hstream-kafka/HStream/Kafka/Group/Member.hs +++ b/hstream-kafka/HStream/Kafka/Group/Member.hs @@ -5,17 +5,22 @@ module HStream.Kafka.Group.Member where -import qualified Control.Concurrent as C -import Control.Monad (join) -import qualified Data.ByteString as BS -import Data.Int (Int32, Int64) -import qualified Data.IORef as IO -import Data.Maybe (fromMaybe) -import qualified Data.Text as T -import qualified HStream.Common.Server.MetaData as CM -import qualified HStream.Kafka.Common.Utils as Utils -import qualified Kafka.Protocol as K -import qualified Kafka.Protocol.Service as K +import qualified Control.Concurrent as C +import Control.Exception (throw) +import Control.Monad (join) +import qualified Data.ByteString as BS +import Data.Int (Int32, Int64) +import qualified Data.IORef as IO +import qualified Data.List as L +import Data.Maybe (fromMaybe) +import qualified Data.Set as Set +import qualified Data.Text as T +import qualified HStream.Common.Server.MetaData as CM +import HStream.Kafka.Common.KafkaException (ErrorCodeException (..)) +import qualified HStream.Kafka.Common.Utils as Utils +import qualified Kafka.Protocol as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Service as K data Member = Member @@ -92,3 +97,16 @@ newMemberFromValue groupValue value = do , clientId=value.clientId , clientHost=value.clientHost } + +-- | Vote for a protocol the member prefers from a set of candidates, +-- which **all members support**. "prefer" means following the order +-- of protocols the member supports. +-- Throw an exception if no protocol is found. **This should not happen** +-- because it is caller's responsibility to ensure the 'candidates' +-- argument is the common subset of all members' supported protocols! +voteForProtocol :: Set.Set T.Text -> Member -> IO T.Text +voteForProtocol candidates member = do + supportedProtocols' <- (L.map fst) <$> IO.readIORef member.supportedProtocols + case L.find (`Set.member` candidates) supportedProtocols' of + Nothing -> throw (ErrorCodeException K.INCONSISTENT_GROUP_PROTOCOL) + Just protocol -> return protocol