diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs index ff3277b1a..3216ae431 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs @@ -15,6 +15,8 @@ import Control.Monad import qualified Data.Text as T import qualified Data.Vector as V +import HStream.Common.Server.Lookup (KafkaResource (KafkaResGroup), + lookupKafkaPersist) import HStream.Kafka.Common.Acl import HStream.Kafka.Common.Authorizer.Class import qualified HStream.Kafka.Common.KafkaException as K @@ -23,12 +25,12 @@ import qualified HStream.Kafka.Common.Utils as Utils import qualified HStream.Kafka.Group.Group as G import qualified HStream.Kafka.Group.GroupCoordinator as GC import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log +import HStream.Server.HStreamApi (ServerNode (..)) import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K -import HStream.Common.Server.Lookup (lookupKafkaPersist, KafkaResource (KafkaResGroup)) -import HStream.Server.HStreamApi (ServerNode(..)) handleJoinGroup :: ServerContext -> K.RequestContext @@ -144,23 +146,16 @@ handleDescribeGroups ServerContext{..} reqCtx req = do describedGroups <- forM groups_m $ \(gid, group_m) -> -- [ACL] for each group id, check [DESCRIBE GROUP] simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP gid AclOp_DESCRIBE >>= \case - False -> return $ makeErrorGroup gid K.GROUP_AUTHORIZATION_FAILED - True -> case group_m of - Just group -> do - ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext - loadBalanceHashRing scAdvertisedListenersKey - (KafkaResGroup group.groupId) - if serverNodeId /= serverID - then return $ makeErrorGroup gid K.NOT_COORDINATOR - else G.describe group - Nothing -> return - K.DescribedGroup { protocolData = "" - , groupState = T.pack . show $ G.Dead - , errorCode = K.NONE - , members = Utils.listToKaArray [] - , groupId = gid - , protocolType = "" - } + False -> return $ makeErrorGroup gid K.GROUP_AUTHORIZATION_FAILED "" + True -> do + ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext + loadBalanceHashRing scAdvertisedListenersKey + (KafkaResGroup gid) + if serverNodeId /= serverID + then return $ makeErrorGroup gid K.NOT_COORDINATOR "" + else case group_m of + Just group -> G.describe group + Nothing -> return $ makeErrorGroup gid K.NONE (T.pack . show $ G.Dead) -- FIXME: hard-coded constants return $ K.DescribeGroupsResponse { groups = Utils.listToKaArray describedGroups @@ -168,9 +163,9 @@ handleDescribeGroups ServerContext{..} reqCtx req = do } where -- FIXME: hard-coded constants - makeErrorGroup gid code = K.DescribedGroup { + makeErrorGroup gid code state = K.DescribedGroup { protocolData = "" - , groupState = "" + , groupState = state , errorCode = code , members = Utils.listToKaArray [] , groupId = gid