Skip to content

Commit

Permalink
feat(kafka): listGroups and describeGroups (#1652)
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f authored Oct 20, 2023
1 parent 9b77ee4 commit 2a6394f
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 51 deletions.
4 changes: 2 additions & 2 deletions hstream-kafka/HStream/Kafka/Common/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ hashtableDeleteAll hashTable = do
M.forM_ lst $ \(key, _) -> H.delete hashTable key

kaArrayToList :: K.KaArray a -> [a]
kaArrayToList = undefined
kaArrayToList = V.toList . fromMaybe V.empty . K.unKaArray

listToKaArray :: [a] -> K.KaArray a
listToKaArray = undefined
listToKaArray = K.KaArray . Just . V.fromList

kaArrayToVector :: K.KaArray a -> V.Vector a
kaArrayToVector kaArray = fromMaybe V.empty (K.unKaArray kaArray)
Expand Down
113 changes: 76 additions & 37 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import qualified HStream.Logger as Log
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-- TODO:
-- * kafka/group config
Expand Down Expand Up @@ -181,8 +182,8 @@ newGroup group metadataManager = do

------------------------------------------------------------------------

joinGroup :: Group -> K.JoinGroupRequestV0 -> IO K.JoinGroupResponseV0
joinGroup group@Group{..} req = do
joinGroup :: Group -> K.RequestContext -> K.JoinGroupRequestV0 -> IO K.JoinGroupResponseV0
joinGroup group@Group{..} reqCtx req = do
-- delayed response(join barrier)
Log.debug $ "received joinGroup"
delayedResponse <- C.newEmptyMVar
Expand All @@ -202,8 +203,8 @@ joinGroup group@Group{..} req = do

Log.debug $ "checked state"
newMemberId <- if T.null req.memberId
then doNewMemberJoinGoup group req
else doCurrentMemeberJoinGroup group req
then doNewMemberJoinGoup group reqCtx req
else doCurrentMemeberJoinGroup group reqCtx req

Log.debug $ "add delayed response into response list for member:" <> Log.buildString' newMemberId
H.insert delayedJoinResponses newMemberId delayedResponse
Expand Down Expand Up @@ -235,7 +236,7 @@ resetGroup group@Group{..} = do
cancelDelayedSyncResponses group

-- reset leader
IO.writeIORef leader Nothing
IO.atomicWriteIORef leader Nothing

-- cancelDelayedCheckHeartbeats
cancelDelayedCheckHeartbeats group
Expand All @@ -244,9 +245,9 @@ resetGroup group@Group{..} = do
Utils.hashtableDeleteAll members

-- update protocols
IO.writeIORef protocolType Nothing
IO.writeIORef protocolName Nothing
IO.writeIORef supportedProtcols (Set.empty)
IO.atomicWriteIORef protocolType Nothing
IO.atomicWriteIORef protocolName Nothing
IO.atomicWriteIORef supportedProtcols (Set.empty)

cancelDelayedSyncResponses :: Group -> IO ()
cancelDelayedSyncResponses Group{..} = do
Expand All @@ -256,29 +257,29 @@ cancelDelayedSyncResponses Group{..} = do
_ <- C.tryPutMVar delayed $ K.SyncGroupResponseV0 K.REBALANCE_IN_PROGRESS BS.empty
H.delete delayedSyncResponses memberId

doNewMemberJoinGoup :: Group -> K.JoinGroupRequestV0 -> IO T.Text
doNewMemberJoinGoup group req = do
doNewMemberJoinGoup :: Group -> K.RequestContext -> K.JoinGroupRequestV0 -> IO T.Text
doNewMemberJoinGoup group reqCtx req = do
newMemberId <- generateMemberId
Log.debug $ "generated member id:" <> Log.buildString' newMemberId
doDynamicNewMemberJoinGroup group req newMemberId
doDynamicNewMemberJoinGroup group reqCtx req newMemberId
return newMemberId

-- TODO: kafka memberId format: clientId(from request context)/group_instance_id + "-" + UUID
generateMemberId :: IO T.Text
generateMemberId = UUID.toText <$> UUID.nextRandom

doCurrentMemeberJoinGroup :: Group -> K.JoinGroupRequestV0 -> IO T.Text
doCurrentMemeberJoinGroup group req = do
doDynamicNewMemberJoinGroup group req req.memberId
doCurrentMemeberJoinGroup :: Group -> K.RequestContext -> K.JoinGroupRequestV0 -> IO T.Text
doCurrentMemeberJoinGroup group reqCtx req = do
doDynamicNewMemberJoinGroup group reqCtx req req.memberId
return req.memberId

doDynamicNewMemberJoinGroup :: Group -> K.JoinGroupRequestV0 -> T.Text -> IO ()
doDynamicNewMemberJoinGroup group req newMemberId = do
addMemberAndRebalance group req newMemberId
doDynamicNewMemberJoinGroup :: Group -> K.RequestContext -> K.JoinGroupRequestV0 -> T.Text -> IO ()
doDynamicNewMemberJoinGroup group reqCtx req newMemberId = do
addMemberAndRebalance group reqCtx req newMemberId

addMemberAndRebalance :: Group -> K.JoinGroupRequestV0 -> T.Text -> IO ()
addMemberAndRebalance group req newMemberId = do
member <- newMember newMemberId req.sessionTimeoutMs req.protocolType (refineProtocols req.protocols)
addMemberAndRebalance :: Group -> K.RequestContext -> K.JoinGroupRequestV0 -> T.Text -> IO ()
addMemberAndRebalance group reqCtx req newMemberId = do
member <- newMember reqCtx newMemberId req.sessionTimeoutMs req.protocolType (refineProtocols req.protocols)
addMember group member
-- TODO: check state
prepareRebalance group
Expand All @@ -295,8 +296,8 @@ prepareRebalance group@Group{..} = do
Nothing -> do
delayed <- makeDelayedRebalance group 5000
Log.info $ "created delayed rebalance thread:" <> Log.buildString' delayed
IO.writeIORef delayedRebalance (Just delayed)
IO.writeIORef state PreparingRebalance
IO.atomicWriteIORef delayedRebalance (Just delayed)
IO.atomicWriteIORef state PreparingRebalance
_ -> pure ()

-- TODO: dynamically delay
Expand All @@ -313,9 +314,9 @@ rebalance group@Group{..} = do
IO.readIORef leader >>= \case
Nothing -> do
Log.info "cancel rebalance without any join request"
IO.writeIORef delayedRebalance Nothing
IO.atomicWriteIORef delayedRebalance Nothing
Log.info "removed delayedRebalance"
IO.writeIORef state Empty
IO.atomicWriteIORef state Empty
Log.info "state changed: PreparingRebalance -> Empty"
Just leaderMemberId -> do
doRelance group leaderMemberId
Expand Down Expand Up @@ -352,23 +353,27 @@ doRelance group@Group{..} leaderMemberId = do
<> " for " <> Log.buildString' memberId
_ <- C.tryPutMVar delayed resp
H.delete delayedJoinResponses memberId
IO.writeIORef state CompletingRebalance
IO.atomicWriteIORef state CompletingRebalance
Log.info "state changed: PreparingRebalance -> CompletingRebalance"
IO.writeIORef delayedRebalance Nothing
IO.atomicWriteIORef delayedRebalance Nothing
Log.info "rebalancing is finished"

getJoinResponseMember :: T.Text -> Member -> K.JoinGroupResponseMemberV0
getJoinResponseMember protocol m =
let metadata = snd. fromMaybe ("", "") $ List.find (\(n, _) -> n == protocol) m.supportedProtcols
let metadata = getMemberMetadata m protocol
in K.JoinGroupResponseMemberV0 m.memberId metadata

getMemberMetadata :: Member -> T.Text -> BS.ByteString
getMemberMetadata Member{..} protocol = do
snd . fromMaybe ("", "") $ List.find (\(n, _) -> n == protocol) supportedProtcols

computeProtocolName :: Group -> IO T.Text
computeProtocolName group@Group{..} = do
IO.readIORef protocolName >>= \case
Nothing -> do
pn <- chooseProtocolName group
Log.debug $ "choosed protocolName" <> Log.buildString' pn
IO.writeIORef protocolName (Just pn)
IO.atomicWriteIORef protocolName (Just pn)
pure pn
Just pn -> pure pn

Expand All @@ -384,11 +389,11 @@ addMember Group{..} member = do
-- leaderIsEmpty <- IO.readIORef leader
IO.readIORef leader >>= \case
Nothing -> do
IO.writeIORef leader (Just member.memberId)
IO.writeIORef protocolType (Just member.protocolType)
IO.atomicWriteIORef leader (Just member.memberId)
IO.atomicWriteIORef protocolType (Just member.protocolType)
Log.debug $ "init supportedProtcols:" <> Log.buildString' member.supportedProtcols
Log.debug $ "plain supportedProtcols:" <> Log.buildString' (plainProtocols member.supportedProtcols)
IO.writeIORef supportedProtcols (plainProtocols member.supportedProtcols)
IO.atomicWriteIORef supportedProtcols (plainProtocols member.supportedProtcols)
_ -> pure ()
H.insert members member.memberId member

Expand Down Expand Up @@ -443,7 +448,7 @@ doSyncGroup group@Group{..} [email protected]{memberId=memberId} delayedR
setupDelayedCheckHeartbeat group

-- set state
IO.writeIORef state Stable
IO.atomicWriteIORef state Stable

setAndPropagateAssignment :: Group -> K.SyncGroupRequestV0 -> IO ()
setAndPropagateAssignment Group{..} req = do
Expand All @@ -455,7 +460,7 @@ setAndPropagateAssignment Group{..} req = do
<> ", assignment:" <> Log.buildString' assignment.assignment
Just member <- H.lookup members assignment.memberId
-- set assignments
IO.writeIORef member.assignment assignment.assignment
IO.atomicWriteIORef member.assignment assignment.assignment
-- propagate assignments
H.lookup delayedSyncResponses assignment.memberId >>= \case
Nothing -> pure ()
Expand Down Expand Up @@ -521,7 +526,7 @@ checkGroupGenerationId Group{..} generationId = do
updateLatestHeartbeat :: Member -> IO ()
updateLatestHeartbeat Member{..} = do
newLastHeartbeat <- Time.getSystemMsTimestamp
IO.writeIORef lastHeartbeat newLastHeartbeat
IO.atomicWriteIORef lastHeartbeat newLastHeartbeat
Log.debug $ "lastHeartbeat updated, memeber:" <> Log.buildString' memberId
<> ", newLastHeartbeat:" <> Log.buildString' newLastHeartbeat

Expand All @@ -532,7 +537,7 @@ setupDelayedCheckHeartbeat group@Group{..} = do
threadId <- C.forkIO $ delayedCheckHeart group member member.sessionTimeoutMs
Log.debug $ "setup delayed heartbeat check, threadId:" <> Log.buildString' threadId
<> ", member:" <> Log.buildString' member.memberId
IO.writeIORef member.heartbeatThread (Just threadId)
IO.atomicWriteIORef member.heartbeatThread (Just threadId)

-- cancel all delayedCheckHearts
cancelDelayedCheckHeartbeats :: Group -> IO ()
Expand All @@ -543,7 +548,7 @@ cancelDelayedCheckHeartbeats Group{..} = do
Just tid -> do
Log.info $ "cancel delayedCheckHeart, member:" <> Log.buildString' mid
C.killThread tid
IO.writeIORef member.heartbeatThread Nothing
IO.atomicWriteIORef member.heartbeatThread Nothing

delayedCheckHeart :: Group -> Member -> Int32 -> IO ()
delayedCheckHeart group member delayMs = do
Expand Down Expand Up @@ -573,7 +578,7 @@ checkHeartbeatAndMaybeRebalance group Member{..} = do
<> ", now:" <> Log.buildString' now
<> ", sessionTimeoutMs:" <> Log.buildString' sessionTimeoutMs
-- remove itself (to avoid kill itself in resetGroupAndRebalance)
IO.writeIORef heartbeatThread Nothing
IO.atomicWriteIORef heartbeatThread Nothing
resetGroupAndRebalance group
return nextDelayMs

Expand All @@ -597,3 +602,37 @@ fetchOffsets Group{..} req = do
res <- GMM.fetchOffsets metadataManager name partitionIndexes
return $ K.OffsetFetchResponseTopicV0 {partitions = res, name = name}
return K.OffsetFetchResponseV0 {topics=topics}

------------------- Group Overview(ListedGroup) -------------------------
overview :: Group -> IO K.ListedGroupV0
overview Group{..} = do
pt <- fromMaybe "" <$> IO.readIORef protocolType
return $ K.ListedGroupV0 {groupId=groupId, protocolType = pt}

------------------- Describe Group -------------------------
describe :: Group -> IO K.DescribedGroupV0
describe Group{..} = do
C.withMVar lock $ \() -> do
protocolType' <- fromMaybe "" <$> IO.readIORef protocolType
protocolName' <- fromMaybe "" <$> IO.readIORef protocolName
state' <- T.pack . show <$> IO.readIORef state
members' <- H.toList members >>= M.mapM (\(_, member) -> describeMember member protocolName')
return $ K.DescribedGroupV0 {
protocolData=protocolName'
, groupState= state'
, errorCode=0
, members=Utils.listToKaArray members'
, groupId=groupId
, protocolType=protocolType'
}

describeMember :: Member -> T.Text -> IO K.DescribedGroupMemberV0
describeMember member@Member{..} protocol = do
assignment' <- IO.readIORef assignment
return $ K.DescribedGroupMemberV0 {
memberMetadata=getMemberMetadata member protocol
, memberAssignment=assignment'
, clientHost=clientHost
, clientId=clientId
, memberId=memberId
}
43 changes: 39 additions & 4 deletions hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module HStream.Kafka.Group.GroupCoordinator where

import qualified Control.Concurrent as C
import Control.Exception (handle, throw)
import qualified Control.Monad as M
import qualified Data.HashTable.IO as H
import Data.Int (Int32)
import qualified Data.Text as T
Expand All @@ -18,8 +19,9 @@ import HStream.Store (LDClient)
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

data GroupCoordinator = GroupCoordinator
newtype GroupCoordinator = GroupCoordinator
{ groups :: C.MVar (Utils.HashTable T.Text Group)
}

Expand All @@ -29,14 +31,14 @@ mkGroupCoordinator = do
groups <- H.new >>= C.newMVar
return $ GroupCoordinator {..}

joinGroup :: GroupCoordinator -> LDClient -> Int32 -> K.JoinGroupRequestV0 -> IO K.JoinGroupResponseV0
joinGroup coordinator ldClient serverId req = do
joinGroup :: GroupCoordinator -> K.RequestContext -> LDClient -> Int32 -> K.JoinGroupRequestV0 -> IO K.JoinGroupResponseV0
joinGroup coordinator reqCtx ldClient serverId req = do
handle (\((ErrorCodeException code)) -> makeErrorResponse code) $ do
-- get or create group
group <- getOrMaybeCreateGroup coordinator ldClient serverId req.groupId req.memberId

-- join group
G.joinGroup group req
G.joinGroup group reqCtx req
where
makeErrorResponse code = return $ K.JoinGroupResponseV0 {
errorCode = code
Expand Down Expand Up @@ -67,6 +69,15 @@ getGroup GroupCoordinator{..} groupId = do
Nothing -> throw (ErrorCodeException K.GROUP_ID_NOT_FOUND)
Just g -> return g

getAllGroups :: GroupCoordinator -> IO [Group]
getAllGroups GroupCoordinator{..} = do
C.withMVar groups $ (fmap (map snd) . H.toList)

getGroups :: GroupCoordinator -> [T.Text] -> IO [(T.Text, Maybe Group)]
getGroups GroupCoordinator{..} ids = do
C.withMVar groups $ \gs -> do
M.forM ids $ \gid -> (gid,) <$> (H.lookup gs gid)

getGroupM :: GroupCoordinator -> T.Text -> IO (Maybe Group)
getGroupM GroupCoordinator{..} groupId = do
C.withMVar groups $ \gs -> H.lookup gs groupId
Expand Down Expand Up @@ -121,3 +132,27 @@ fetchOffsets coordinator req = do
, metadata = Nothing
, committedOffset = -1
}

------------------- List Groups -------------------------
listGroups :: GroupCoordinator -> K.ListGroupsRequestV0 -> IO K.ListGroupsResponseV0
listGroups gc _ = do
gs <- getAllGroups gc
listedGroups <- M.mapM G.overview gs
return $ K.ListGroupsResponseV0 {errorCode=0, groups=Utils.listToKaArray listedGroups}

------------------- Describe Groups -------------------------
describeGroups :: GroupCoordinator -> K.DescribeGroupsRequestV0 -> IO K.DescribeGroupsResponseV0
describeGroups gc req = do
getGroups gc (Utils.kaArrayToList req.groups) >>= \gs -> do
listedGroups <- M.forM gs $ \case
(gid, Nothing) -> return $ K.DescribedGroupV0 {
protocolData=""
, groupState=""
, errorCode=K.GROUP_ID_NOT_FOUND
, members=Utils.listToKaArray []
, groupId=gid
, protocolType=""
}
(_, Just g) -> G.describe g
return $ K.DescribeGroupsResponseV0 {groups=Utils.listToKaArray listedGroups}

23 changes: 16 additions & 7 deletions hstream-kafka/HStream/Kafka/Group/Member.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

module HStream.Kafka.Group.Member where

import qualified Control.Concurrent as C
import qualified Data.ByteString as BS
import Data.Int (Int32, Int64)
import qualified Data.IORef as IO
import qualified Data.Text as T
import qualified Control.Concurrent as C
import qualified Data.ByteString as BS
import Data.Int (Int32, Int64)
import qualified Data.IORef as IO
import qualified Data.Text as T
import qualified Kafka.Protocol.Service as K

data Member
= Member
Expand All @@ -21,13 +22,21 @@ data Member
-- protocols
, protocolType :: T.Text
, supportedProtcols :: [(T.Text, BS.ByteString)]

-- client information
, clientId :: T.Text
, clientHost :: T.Text
}

newMember :: T.Text -> Int32 -> T.Text -> [(T.Text, BS.ByteString)] -> IO Member
newMember memberId sessionTimeoutMs protocolType supportedProtcols = do
newMember :: K.RequestContext -> T.Text -> Int32 -> T.Text -> [(T.Text, BS.ByteString)] -> IO Member
newMember reqCtx memberId sessionTimeoutMs protocolType supportedProtcols = do
assignment <- IO.newIORef BS.empty
lastHeartbeat <- IO.newIORef 0
heartbeatThread <- IO.newIORef Nothing

-- TODO: read from request context
let clientId = ""
clientHost = ""

-- TODO: check request
return $ Member {..}
2 changes: 2 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ handlers sc =
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "syncGroup") (handleSyncGroupV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "leaveGroup") (handleLeaveGroupV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "heartbeat") (handleHeartbeatV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listGroups") (handleListGroupsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "describeGroups") (handleDescribeGroupsV0 sc)
]
Loading

0 comments on commit 2a6394f

Please sign in to comment.