Skip to content

Commit

Permalink
kafka: add naïve OffsetFetch and OffsetCommit handler (#1614)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Sep 25, 2023
1 parent ce4a91d commit 7edacd7
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 15 deletions.
133 changes: 133 additions & 0 deletions hstream-kafka/HStream/Kafka/Group/GroupMetadataManager.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
module HStream.Kafka.Group.GroupMetadataManager
( GroupMetadataManager
, mkGroupMetadataManager
, storeOffsets
, fetchOffsets
) where

import Control.Concurrent (MVar, modifyMVar_, newMVar,
withMVar)
import Control.Concurrent.MVar (readMVar)
import Control.Monad (unless)
import Data.Hashable
import qualified Data.HashMap.Strict as HM
import Data.Int (Int32, Int64)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word64)
import GHC.Generics (Generic)
import HStream.Kafka.Group.OffsetsStore (OffsetStorage (..),
OffsetStore)
import qualified HStream.Logger as Log
import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray))
import qualified Kafka.Protocol.Error as K
import Kafka.Protocol.Message (OffsetCommitRequestPartitionV0 (..),
OffsetCommitResponsePartitionV0 (..),
OffsetFetchResponsePartitionV0 (..))

data GroupMetadataManager = GroupMetadataManager
{ serverId :: Int
, groupName :: T.Text
, offsetsStore :: OffsetStore
, offsetsCache :: MVar (Map.Map TopicPartition Int64)
, partitionsMap :: MVar (HM.HashMap TopicPartition Word64)
-- ^ partitionsMap maps TopicPartition to the underlying logID
}

mkGroupMetadataManager :: OffsetStore -> Int -> T.Text -> IO GroupMetadataManager
mkGroupMetadataManager offsetsStore serverId groupName = do
offsetsCache <- newMVar Map.empty
partitionsMap <- newMVar HM.empty

return GroupMetadataManager{..}
where
rebuildCache = do
undefined

storeOffsets
:: GroupMetadataManager
-> T.Text
-> KaArray OffsetCommitRequestPartitionV0
-> IO (KaArray OffsetCommitResponsePartitionV0)
storeOffsets GroupMetadataManager{..} topicName arrayOffsets = do
let offsets = fromMaybe V.empty (unKaArray arrayOffsets)

-- check if a TopicPartition that has an offset to be committed is contained in current
-- consumer group's partitionsMap. If not, server will return a UNKNOWN_TOPIC_OR_PARTITION
-- error, and that error will be convert to COORDINATOR_NOT_AVAILABLE error finally
partitionsInfo <- readMVar partitionsMap
let (notFoundErrs, offsets') = V.partitionWith
( \OffsetCommitRequestPartitionV0{..} ->
let key = mkTopicPartition topicName partitionIndex
in case HM.lookup key partitionsInfo of
Just logId -> Right $ (key, logId, fromIntegral committedOffset)
Nothing -> Left $ (partitionIndex, K.COORDINATOR_NOT_AVAILABLE)
) offsets
unless (V.null notFoundErrs) $ do
Log.info $ "consumer group " <> Log.build groupName <> " receive OffsetCommitRequestPartition with unknown topic or partion"
<> ", topic name: " <> Log.build topicName
<> ", partitions: " <> Log.build (show $ V.map fst notFoundErrs)

-- write checkpoints
let checkPoints = V.foldl' (\acc (_, logId, offset) -> Map.insert logId offset acc) Map.empty offsets'
commitOffsets offsetsStore topicName checkPoints
Log.debug $ "consumer group " <> Log.build groupName <> " commit offsets {" <> Log.build (show checkPoints)
<> "} to topic " <> Log.build topicName

-- update cache
modifyMVar_ offsetsCache $ \cache -> do
let updates = V.foldl' (\acc (key, _, offset) -> Map.insert key (fromIntegral offset) acc) Map.empty offsets'
return $ Map.union updates cache

let suc = V.map (\(TopicPartition{topicPartitionIdx}, _, _) -> (topicPartitionIdx, K.NONE)) offsets'
res = V.map (\(partitionIndex, errorCode) -> OffsetCommitResponsePartitionV0{..}) (suc <> notFoundErrs)
return KaArray {unKaArray = Just res}

fetchOffsets
:: GroupMetadataManager
-> T.Text
-> KaArray Int32
-> IO (KaArray OffsetFetchResponsePartitionV0)
fetchOffsets GroupMetadataManager{..} topicName partitions = do
let partitions' = fromMaybe V.empty (unKaArray partitions)
res <- withMVar offsetsCache $ \cache -> do
traverse
(
\ partitionIdx -> do
let key = mkTopicPartition topicName partitionIdx
in case Map.lookup key cache of
Just offset -> return $ OffsetFetchResponsePartitionV0
{ committedOffset = offset
, metadata = Nothing
, partitionIndex= partitionIdx
, errorCode = K.NONE
}
Nothing -> return $ OffsetFetchResponsePartitionV0
{ committedOffset = -1
, metadata = Nothing
, partitionIndex= partitionIdx
-- TODO: check the error code here
, errorCode = K.NONE
}
) partitions'

return $ KaArray {unKaArray = Just res}

-------------------------------------------------------------------------------------------------
-- helper

data TopicPartition = TopicPartition
{ topicName :: T.Text
, topicPartitionIdx :: Int32
-- ^ logId of each partition, a.k.a partitionIndex in Kafka
} deriving(Eq, Generic, Ord)

instance Hashable TopicPartition

instance Show TopicPartition where
show TopicPartition{..} = show topicName <> "-" <> show topicPartitionIdx

mkTopicPartition :: T.Text -> Int32 -> TopicPartition
mkTopicPartition topicName topicPartitionIdx = TopicPartition{..}
47 changes: 47 additions & 0 deletions hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module HStream.Kafka.Group.OffsetsStore
( OffsetStorage(..)
, OffsetStore(..)
, mkCkpOffsetStorage
)
where

import Control.Monad (unless)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import Data.Word (Word64)
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)

class OffsetStorage s where
commitOffsets :: s -> T.Text -> Map Word64 Word64 -> IO ()

data OffsetStore = Ckp CkpOffsetStorage

instance OffsetStorage OffsetStore where
commitOffsets (Ckp s) = commitOffsets s

--------------------------------------------------------------------------------

data CkpOffsetStorage = CkpOffsetStorage
{ ckpStore :: S.LDCheckpointStore
, ckpStoreName :: T.Text
, ckpStoreId :: Word64
-- ^ __consumer_offsets logID
}

mkCkpOffsetStorage :: S.LDClient -> T.Text -> IO CkpOffsetStorage
mkCkpOffsetStorage client ckpStoreName = do
let cbGroupName = textToCBytes ckpStoreName
-- FIXME: need to get log attributes from somewhere
S.initOffsetCheckpointDir client S.def
ckpStoreId <- S.allocOffsetCheckpointId client cbGroupName
ckpStore <- S.newRSMBasedCheckpointStore client ckpStoreId 5000
Log.info $ "mkCkpOffsetStorage with name: " <> Log.build ckpStoreName <> ", storeId: " <> Log.build ckpStoreId
return CkpOffsetStorage{..}

instance OffsetStorage CkpOffsetStorage where
commitOffsets CkpOffsetStorage{..} offsetsKey offsets = do
unless (Map.null offsets) $ do
S.ckpStoreUpdateMultiLSN ckpStore (textToCBytes offsetsKey) offsets
4 changes: 4 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module HStream.Kafka.Server.Handler (handlers) where

import HStream.Kafka.Server.Handler.Basic
import HStream.Kafka.Server.Handler.Consume
import HStream.Kafka.Server.Handler.Offset
import HStream.Kafka.Server.Handler.Produce
import HStream.Kafka.Server.Handler.Topic
import HStream.Kafka.Server.Types (ServerContext (..))
Expand All @@ -28,4 +29,7 @@ handlers sc =
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "produce") (handleProduceV2 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "fetch") (handleFetchV0 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "offsetCommit") (handleOffsetCommitV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "offsetFetch") (handleOffsetFetchV0 sc)
]
50 changes: 50 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module HStream.Kafka.Server.Handler.Offset
( handleOffsetCommitV0
, handleOffsetFetchV0
)
where

import Control.Concurrent (withMVar)
import qualified Data.HashMap.Strict as HM
import qualified Data.Vector as V
import HStream.Kafka.Group.GroupMetadataManager (fetchOffsets,
storeOffsets)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Service as K

--------------------
-- 8: OffsetCommit
--------------------
handleOffsetCommitV0
:: ServerContext -> K.RequestContext -> K.OffsetCommitRequestV0 -> IO K.OffsetCommitResponseV0
handleOffsetCommitV0 ServerContext{..} _ K.OffsetCommitRequestV0{..} = do
case K.unKaArray topics of
Nothing -> undefined
Just topics' -> do
mgr <- withMVar scGroupMetadataManagers $ return . HM.lookup groupId
case mgr of
Nothing -> undefined
Just groupMgr -> do
response <- V.forM topics' $ \K.OffsetCommitRequestTopicV0{..} -> do
res <- storeOffsets groupMgr name partitions
return $ K.OffsetCommitResponseTopicV0 {partitions = res, name = name}
return . K.OffsetCommitResponseV0 $ K.KaArray {unKaArray = Just response}

--------------------
-- 9: OffsetFetch
--------------------
handleOffsetFetchV0
:: ServerContext -> K.RequestContext -> K.OffsetFetchRequestV0 -> IO K.OffsetFetchResponseV0
handleOffsetFetchV0 ServerContext{..} _ K.OffsetFetchRequestV0{..} = do
case K.unKaArray topics of
Nothing -> undefined
Just topics' -> do
mgr <- withMVar scGroupMetadataManagers $ return . HM.lookup groupId
case mgr of
Nothing -> undefined
Just groupMgr -> do
response <- V.forM topics' $ \K.OffsetFetchRequestTopicV0{..} -> do
res <- fetchOffsets groupMgr name partitionIndexes
return $ K.OffsetFetchResponseTopicV0 {partitions = res, name = name}
return . K.OffsetFetchResponseV0 $ K.KaArray {unKaArray = Just response}
36 changes: 21 additions & 15 deletions hstream-kafka/HStream/Kafka/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@ module HStream.Kafka.Server.Types
, transToStreamName
) where

import Control.Concurrent.STM
import Data.Text (Text)
import Data.Text (Text)
import Data.Word

import HStream.Common.ConsistentHashing (HashRing)
import HStream.Common.Server.HashRing (LoadBalanceHashRing,
initializeHashRing)
import HStream.Gossip.Types (Epoch, GossipContext)
import HStream.Kafka.Common.OffsetManager (OffsetManager,
newOffsetManager)
import HStream.Kafka.Server.Config (ServerOpts (..))
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaHandle)
import HStream.Stats (newServerStatsHolder)
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)
import Control.Concurrent (MVar, newMVar)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T
import HStream.Common.Server.HashRing (LoadBalanceHashRing,
initializeHashRing)
import HStream.Gossip.Types (GossipContext)
import HStream.Kafka.Common.OffsetManager (OffsetManager,
newOffsetManager)
import HStream.Kafka.Group.GroupMetadataManager (GroupMetadataManager)
import HStream.Kafka.Server.Config (ServerOpts (..))
import HStream.MetaStore.Types (MetaHandle)
import HStream.Stats (newServerStatsHolder)
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)

data ServerContext = ServerContext
{ serverID :: !Word32
Expand All @@ -36,6 +38,8 @@ data ServerContext = ServerContext
, loadBalanceHashRing :: !LoadBalanceHashRing
, gossipContext :: !GossipContext
, scOffsetManager :: !OffsetManager
, scGroupMetadataManagers :: MVar (HashMap T.Text GroupMetadataManager)
-- ^ {groupID: GroupMetadataManager}
}

initServerContext
Expand All @@ -49,6 +53,7 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do
statsHolder <- newServerStatsHolder
epochHashRing <- initializeHashRing gossipContext
offsetManager <- newOffsetManager ldclient 1000{- TODO: maxLogs -}
groupMetadataManager <- newMVar HM.empty

return
ServerContext
Expand All @@ -64,6 +69,7 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do
, loadBalanceHashRing = epochHashRing
, gossipContext = gossipContext
, scOffsetManager = offsetManager
, scGroupMetadataManagers = groupMetadataManager
}

transToStreamName :: Text -> S.StreamId
Expand Down
5 changes: 5 additions & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ library
exposed-modules:
HStream.Kafka.Common.OffsetManager
HStream.Kafka.Common.RecordFormat
HStream.Kafka.Group.GroupMetadataManager
HStream.Kafka.Group.OffsetsStore
HStream.Kafka.Network
HStream.Kafka.Server.Config
HStream.Kafka.Server.Handler
Expand All @@ -120,6 +122,7 @@ library
HStream.Kafka.Server.Handler.Consume
HStream.Kafka.Server.Handler.Produce
HStream.Kafka.Server.Handler.Topic
HStream.Kafka.Server.Handler.Offset

hs-source-dirs: .
build-depends:
Expand All @@ -131,6 +134,7 @@ library
, digest
, directory
, hashtables
, hashable
, hstream-api-hs
, hstream-common
, hstream-common-base
Expand All @@ -147,6 +151,7 @@ library
, vector
, yaml
, Z-Data
, unordered-containers

default-language: GHC2021
default-extensions:
Expand Down

0 comments on commit 7edacd7

Please sign in to comment.