diff --git a/hie.yaml b/hie.yaml index e1be36f7a..3a9285712 100644 --- a/hie.yaml +++ b/hie.yaml @@ -225,6 +225,9 @@ cradle: - path: "hstream/./app/server.hs" component: "hstream:exe:hstream-server" + - path: "hstream/./app/kafka-server.hs" + component: "hstream:exe:hstream-kafka-server" + - path: "hstream/./app/client.hs" component: "hstream:exe:hstream" @@ -273,11 +276,14 @@ cradle: - path: "hstream-io/./" component: "lib:hstream-io" - - path: "hstream-kafka/src" + - path: "hstream-kafka/." component: "lib:hstream-kafka" - - path: "hstream-kafka/test" - component: "hstream-kafka:test:hstream-kafka-test" + - path: "hstream-kafka/protocol" + component: "lib:kafka-protocol" + + - path: "hstream-kafka/protocol/test" + component: "hstream-kafka:test:kafka-protocol-test" - path: "hstream-processing/src" component: "lib:hstream-processing" diff --git a/hstream-kafka/src/Kafka/Group/GroupMetadataManager.hs b/hstream-kafka/HStream/Kafka/Group/GroupMetadataManager.hs similarity index 76% rename from hstream-kafka/src/Kafka/Group/GroupMetadataManager.hs rename to hstream-kafka/HStream/Kafka/Group/GroupMetadataManager.hs index 8f2d011ad..89986fa64 100644 --- a/hstream-kafka/src/Kafka/Group/GroupMetadataManager.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupMetadataManager.hs @@ -1,30 +1,31 @@ -module Kafka.Group.GroupMetadataManager +module HStream.Kafka.Group.GroupMetadataManager ( GroupMetadataManager , mkGroupMetadataManager , storeOffsets , fetchOffsets ) where -import Control.Concurrent (MVar, modifyMVar_, newMVar, - withMVar) -import Control.Concurrent.MVar (readMVar) -import Control.Monad (unless) +import Control.Concurrent (MVar, modifyMVar_, newMVar, + withMVar) +import Control.Concurrent.MVar (readMVar) +import Control.Monad (unless) import Data.Hashable -import qualified Data.HashMap.Strict as HM -import Data.Int (Int32, Int64) -import qualified Data.Map.Strict as Map -import Data.Maybe (fromMaybe) -import qualified Data.Text as T -import qualified Data.Vector as V -import Data.Word (Word64) -import GHC.Generics (Generic) -import qualified HStream.Logger as Log -import Kafka.Group.OffsetsStore (OffsetStorage (..), OffsetStore) -import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray)) -import qualified Kafka.Protocol.Error as K -import Kafka.Protocol.Message.Struct (OffsetCommitRequestPartitionV0 (..), - OffsetCommitResponsePartitionV0 (..), - OffsetFetchResponsePartitionV0 (..)) +import qualified Data.HashMap.Strict as HM +import Data.Int (Int32, Int64) +import qualified Data.Map.Strict as Map +import Data.Maybe (fromMaybe) +import qualified Data.Text as T +import qualified Data.Vector as V +import Data.Word (Word64) +import GHC.Generics (Generic) +import HStream.Kafka.Group.OffsetsStore (OffsetStorage (..), + OffsetStore) +import qualified HStream.Logger as Log +import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray)) +import qualified Kafka.Protocol.Error as K +import Kafka.Protocol.Message (OffsetCommitRequestPartitionV0 (..), + OffsetCommitResponsePartitionV0 (..), + OffsetFetchResponsePartitionV0 (..)) data GroupMetadataManager = GroupMetadataManager { serverId :: Int diff --git a/hstream-kafka/src/Kafka/Group/OffsetsStore.hs b/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs similarity index 97% rename from hstream-kafka/src/Kafka/Group/OffsetsStore.hs rename to hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs index cdf35591f..0e228ade2 100644 --- a/hstream-kafka/src/Kafka/Group/OffsetsStore.hs +++ b/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs @@ -1,4 +1,4 @@ -module Kafka.Group.OffsetsStore +module HStream.Kafka.Group.OffsetsStore ( OffsetStorage(..) , OffsetStore(..) , mkCkpOffsetStorage diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hs b/hstream-kafka/HStream/Kafka/Server/Handler.hs index 49ec3058e..8bc2a26bf 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hs @@ -5,9 +5,9 @@ module HStream.Kafka.Server.Handler (handlers) where import HStream.Kafka.Server.Handler.Basic import HStream.Kafka.Server.Handler.Consume +import HStream.Kafka.Server.Handler.Offset import HStream.Kafka.Server.Handler.Produce import HStream.Kafka.Server.Handler.Topic -import HStream.Kafka.Server.Handler.Offset import HStream.Kafka.Server.Types (ServerContext (..)) import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index 743641da3..48663ef4c 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -1,16 +1,17 @@ -module HStream.Server.KafkaHandler.Offset +module HStream.Kafka.Server.Handler.Offset ( handleOffsetCommitV0 , handleOffsetFetchV0 ) where -import Control.Concurrent (withMVar) -import qualified Data.HashMap.Strict as HM -import qualified Data.Vector as V -import HStream.Server.Types (ServerContext (..)) -import Kafka.Group.GroupMetadataManager (fetchOffsets, storeOffsets) -import qualified Kafka.Protocol as K -import qualified Kafka.Protocol.Service as K +import Control.Concurrent (withMVar) +import qualified Data.HashMap.Strict as HM +import qualified Data.Vector as V +import HStream.Kafka.Group.GroupMetadataManager (fetchOffsets, + storeOffsets) +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified Kafka.Protocol as K +import qualified Kafka.Protocol.Service as K -------------------- -- 8: OffsetCommit diff --git a/hstream-kafka/HStream/Kafka/Server/Types.hs b/hstream-kafka/HStream/Kafka/Server/Types.hs index 767e7f1ea..3313e8f07 100644 --- a/hstream-kafka/HStream/Kafka/Server/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Types.hs @@ -5,23 +5,25 @@ module HStream.Kafka.Server.Types , transToStreamName ) where -import Control.Concurrent.STM -import Data.Text (Text) +import Data.Text (Text) import Data.Word -import HStream.Common.ConsistentHashing (HashRing) -import HStream.Common.Server.HashRing (LoadBalanceHashRing, - initializeHashRing) -import HStream.Gossip.Types (Epoch, GossipContext) -import HStream.Kafka.Common.OffsetManager (OffsetManager, - newOffsetManager) -import HStream.Kafka.Server.Config (ServerOpts (..)) -import qualified HStream.Logger as Log -import HStream.MetaStore.Types (MetaHandle) -import HStream.Stats (newServerStatsHolder) -import qualified HStream.Stats as Stats -import qualified HStream.Store as S -import HStream.Utils (textToCBytes) +import Control.Concurrent (MVar, newMVar) +import Data.HashMap.Strict (HashMap) +import qualified Data.HashMap.Strict as HM +import qualified Data.Text as T +import HStream.Common.Server.HashRing (LoadBalanceHashRing, + initializeHashRing) +import HStream.Gossip.Types (GossipContext) +import HStream.Kafka.Common.OffsetManager (OffsetManager, + newOffsetManager) +import HStream.Kafka.Group.GroupMetadataManager (GroupMetadataManager) +import HStream.Kafka.Server.Config (ServerOpts (..)) +import HStream.MetaStore.Types (MetaHandle) +import HStream.Stats (newServerStatsHolder) +import qualified HStream.Stats as Stats +import qualified HStream.Store as S +import HStream.Utils (textToCBytes) data ServerContext = ServerContext { serverID :: !Word32 @@ -36,6 +38,8 @@ data ServerContext = ServerContext , loadBalanceHashRing :: !LoadBalanceHashRing , gossipContext :: !GossipContext , scOffsetManager :: !OffsetManager + , scGroupMetadataManagers :: MVar (HashMap T.Text GroupMetadataManager) + -- ^ {groupID: GroupMetadataManager} } initServerContext @@ -49,6 +53,7 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do statsHolder <- newServerStatsHolder epochHashRing <- initializeHashRing gossipContext offsetManager <- newOffsetManager ldclient 1000{- TODO: maxLogs -} + groupMetadataManager <- newMVar HM.empty return ServerContext @@ -64,6 +69,7 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do , loadBalanceHashRing = epochHashRing , gossipContext = gossipContext , scOffsetManager = offsetManager + , scGroupMetadataManagers = groupMetadataManager } transToStreamName :: Text -> S.StreamId diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index f28875e5e..554dd1188 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -122,6 +122,7 @@ library HStream.Kafka.Server.Handler.Consume HStream.Kafka.Server.Handler.Produce HStream.Kafka.Server.Handler.Topic + HStream.Kafka.Server.Handler.Offset hs-source-dirs: . build-depends: