Skip to content

Commit

Permalink
add a naïve OffsetFetch handler
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Sep 20, 2023
1 parent 5856d15 commit 8dbb6fe
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 5 deletions.
41 changes: 37 additions & 4 deletions hstream-kafka/src/Kafka/Group/GroupMetadataManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ module Kafka.Group.GroupMetadataManager
( GroupMetadataManager
, mkGroupMetadataManager
, storeOffsets
, fetchOffsets
) where

import Control.Concurrent (MVar, modifyMVar_, newMVar)
import Control.Concurrent (MVar, modifyMVar_, newMVar,
withMVar)
import Control.Concurrent.MVar (readMVar)
import Control.Monad (unless)
import Data.Hashable
import qualified Data.HashMap.Strict as HM
import Data.Int (Int32)
import Data.Int (Int32, Int64)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
Expand All @@ -21,13 +23,14 @@ import Kafka.Group.OffsetsStore (OffsetStorage (..), OffsetStore)
import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray))
import qualified Kafka.Protocol.Error as K
import Kafka.Protocol.Message.Struct (OffsetCommitRequestPartitionV0 (..),
OffsetCommitResponsePartitionV0 (..))
OffsetCommitResponsePartitionV0 (..),
OffsetFetchResponsePartitionV0 (..))

data GroupMetadataManager = GroupMetadataManager
{ serverId :: Int
, groupName :: T.Text
, offsetsStore :: OffsetStore
, offsetsCache :: MVar (Map.Map TopicPartition Int)
, offsetsCache :: MVar (Map.Map TopicPartition Int64)
, partitionsMap :: MVar (HM.HashMap TopicPartition Word64)
-- ^ partitionsMap maps TopicPartition to the underlying logID
}
Expand Down Expand Up @@ -81,6 +84,36 @@ storeOffsets GroupMetadataManager{..} topicName arrayOffsets = do
res = V.map (\(partitionIndex, errorCode) -> OffsetCommitResponsePartitionV0{..}) (suc <> notFoundErrs)
return KaArray {unKaArray = Just res}

fetchOffsets
:: GroupMetadataManager
-> T.Text
-> KaArray Int32
-> IO (KaArray OffsetFetchResponsePartitionV0)
fetchOffsets GroupMetadataManager{..} topicName partitions = do
let partitions' = fromMaybe V.empty (unKaArray partitions)
res <- withMVar offsetsCache $ \cache -> do
traverse
(
\ partitionIdx -> do
let key = mkTopicPartition topicName partitionIdx
in case Map.lookup key cache of
Just offset -> return $ OffsetFetchResponsePartitionV0
{ committedOffset = offset
, metadata = Nothing
, partitionIndex= partitionIdx
, errorCode = K.NONE
}
Nothing -> return $ OffsetFetchResponsePartitionV0
{ committedOffset = -1
, metadata = Nothing
, partitionIndex= partitionIdx
-- TODO: check the error code here
, errorCode = K.NONE
}
) partitions'

return $ KaArray {unKaArray = Just res}

-------------------------------------------------------------------------------------------------
-- helper

Expand Down
1 change: 1 addition & 0 deletions hstream/src/HStream/Server/KafkaHandler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ handlers sc =
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "fetch") (handleFetchV0 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "offsetCommit") (handleOffsetCommitV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "offsetFetch") (handleOffsetFetchV0 sc)
]
21 changes: 20 additions & 1 deletion hstream/src/HStream/Server/KafkaHandler/Offset.hs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
module HStream.Server.KafkaHandler.Offset
( handleOffsetCommitV0
, handleOffsetFetchV0
)
where

import Control.Concurrent (withMVar)
import qualified Data.HashMap.Strict as HM
import qualified Data.Vector as V
import HStream.Server.Types (ServerContext (..))
import Kafka.Group.GroupMetadataManager (storeOffsets)
import Kafka.Group.GroupMetadataManager (fetchOffsets, storeOffsets)
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Service as K

Expand All @@ -28,3 +29,21 @@ handleOffsetCommitV0 ServerContext{..} _ K.OffsetCommitRequestV0{..} = do
res <- storeOffsets groupMgr name partitions
return $ K.OffsetCommitResponseTopicV0 {partitions = res, name = name}
return . K.OffsetCommitResponseV0 $ K.KaArray {unKaArray = Just response}

--------------------
-- 9: OffsetFetch
--------------------
handleOffsetFetchV0
:: ServerContext -> K.RequestContext -> K.OffsetFetchRequestV0 -> IO K.OffsetFetchResponseV0
handleOffsetFetchV0 ServerContext{..} _ K.OffsetFetchRequestV0{..} = do
case K.unKaArray topics of
Nothing -> undefined
Just topics' -> do
mgr <- withMVar scGroupMetadataManagers $ return . HM.lookup groupId
case mgr of
Nothing -> undefined
Just groupMgr -> do
response <- V.forM topics' $ \K.OffsetFetchRequestTopicV0{..} -> do
res <- fetchOffsets groupMgr name partitionIndexes
return $ K.OffsetFetchResponseTopicV0 {partitions = res, name = name}
return . K.OffsetFetchResponseV0 $ K.KaArray {unKaArray = Just response}

0 comments on commit 8dbb6fe

Please sign in to comment.