diff --git a/hstream-kafka/src/Kafka/Group/GroupMetadataManager.hs b/hstream-kafka/src/Kafka/Group/GroupMetadataManager.hs index a19f33eb9..8f2d011ad 100644 --- a/hstream-kafka/src/Kafka/Group/GroupMetadataManager.hs +++ b/hstream-kafka/src/Kafka/Group/GroupMetadataManager.hs @@ -2,14 +2,16 @@ module Kafka.Group.GroupMetadataManager ( GroupMetadataManager , mkGroupMetadataManager , storeOffsets + , fetchOffsets ) where -import Control.Concurrent (MVar, modifyMVar_, newMVar) +import Control.Concurrent (MVar, modifyMVar_, newMVar, + withMVar) import Control.Concurrent.MVar (readMVar) import Control.Monad (unless) import Data.Hashable import qualified Data.HashMap.Strict as HM -import Data.Int (Int32) +import Data.Int (Int32, Int64) import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe) import qualified Data.Text as T @@ -21,13 +23,14 @@ import Kafka.Group.OffsetsStore (OffsetStorage (..), OffsetStore) import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray)) import qualified Kafka.Protocol.Error as K import Kafka.Protocol.Message.Struct (OffsetCommitRequestPartitionV0 (..), - OffsetCommitResponsePartitionV0 (..)) + OffsetCommitResponsePartitionV0 (..), + OffsetFetchResponsePartitionV0 (..)) data GroupMetadataManager = GroupMetadataManager { serverId :: Int , groupName :: T.Text , offsetsStore :: OffsetStore - , offsetsCache :: MVar (Map.Map TopicPartition Int) + , offsetsCache :: MVar (Map.Map TopicPartition Int64) , partitionsMap :: MVar (HM.HashMap TopicPartition Word64) -- ^ partitionsMap maps TopicPartition to the underlying logID } @@ -81,6 +84,36 @@ storeOffsets GroupMetadataManager{..} topicName arrayOffsets = do res = V.map (\(partitionIndex, errorCode) -> OffsetCommitResponsePartitionV0{..}) (suc <> notFoundErrs) return KaArray {unKaArray = Just res} +fetchOffsets + :: GroupMetadataManager + -> T.Text + -> KaArray Int32 + -> IO (KaArray OffsetFetchResponsePartitionV0) +fetchOffsets GroupMetadataManager{..} topicName partitions = do + let partitions' = fromMaybe V.empty (unKaArray partitions) + res <- withMVar offsetsCache $ \cache -> do + traverse + ( + \ partitionIdx -> do + let key = mkTopicPartition topicName partitionIdx + in case Map.lookup key cache of + Just offset -> return $ OffsetFetchResponsePartitionV0 + { committedOffset = offset + , metadata = Nothing + , partitionIndex= partitionIdx + , errorCode = K.NONE + } + Nothing -> return $ OffsetFetchResponsePartitionV0 + { committedOffset = -1 + , metadata = Nothing + , partitionIndex= partitionIdx + -- TODO: check the error code here + , errorCode = K.NONE + } + ) partitions' + + return $ KaArray {unKaArray = Just res} + ------------------------------------------------------------------------------------------------- -- helper diff --git a/hstream/src/HStream/Server/KafkaHandler.hs b/hstream/src/HStream/Server/KafkaHandler.hs index de8b7efc7..1e952261e 100644 --- a/hstream/src/HStream/Server/KafkaHandler.hs +++ b/hstream/src/HStream/Server/KafkaHandler.hs @@ -31,4 +31,5 @@ handlers sc = , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "fetch") (handleFetchV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "offsetCommit") (handleOffsetCommitV0 sc) + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "offsetFetch") (handleOffsetFetchV0 sc) ] diff --git a/hstream/src/HStream/Server/KafkaHandler/Offset.hs b/hstream/src/HStream/Server/KafkaHandler/Offset.hs index 0ad7346dd..743641da3 100644 --- a/hstream/src/HStream/Server/KafkaHandler/Offset.hs +++ b/hstream/src/HStream/Server/KafkaHandler/Offset.hs @@ -1,5 +1,6 @@ module HStream.Server.KafkaHandler.Offset ( handleOffsetCommitV0 + , handleOffsetFetchV0 ) where @@ -7,7 +8,7 @@ import Control.Concurrent (withMVar) import qualified Data.HashMap.Strict as HM import qualified Data.Vector as V import HStream.Server.Types (ServerContext (..)) -import Kafka.Group.GroupMetadataManager (storeOffsets) +import Kafka.Group.GroupMetadataManager (fetchOffsets, storeOffsets) import qualified Kafka.Protocol as K import qualified Kafka.Protocol.Service as K @@ -28,3 +29,21 @@ handleOffsetCommitV0 ServerContext{..} _ K.OffsetCommitRequestV0{..} = do res <- storeOffsets groupMgr name partitions return $ K.OffsetCommitResponseTopicV0 {partitions = res, name = name} return . K.OffsetCommitResponseV0 $ K.KaArray {unKaArray = Just response} + +-------------------- +-- 9: OffsetFetch +-------------------- +handleOffsetFetchV0 + :: ServerContext -> K.RequestContext -> K.OffsetFetchRequestV0 -> IO K.OffsetFetchResponseV0 +handleOffsetFetchV0 ServerContext{..} _ K.OffsetFetchRequestV0{..} = do + case K.unKaArray topics of + Nothing -> undefined + Just topics' -> do + mgr <- withMVar scGroupMetadataManagers $ return . HM.lookup groupId + case mgr of + Nothing -> undefined + Just groupMgr -> do + response <- V.forM topics' $ \K.OffsetFetchRequestTopicV0{..} -> do + res <- fetchOffsets groupMgr name partitionIndexes + return $ K.OffsetFetchResponseTopicV0 {partitions = res, name = name} + return . K.OffsetFetchResponseV0 $ K.KaArray {unKaArray = Just response}