Skip to content

Commit

Permalink
merge upstream main
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Sep 21, 2023
1 parent 5b4d1ac commit 5a0eebf
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 48 deletions.
12 changes: 9 additions & 3 deletions hie.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ cradle:
- path: "hstream/./app/server.hs"
component: "hstream:exe:hstream-server"

- path: "hstream/./app/kafka-server.hs"
component: "hstream:exe:hstream-kafka-server"

- path: "hstream/./app/client.hs"
component: "hstream:exe:hstream"

Expand Down Expand Up @@ -273,11 +276,14 @@ cradle:
- path: "hstream-io/./"
component: "lib:hstream-io"

- path: "hstream-kafka/src"
- path: "hstream-kafka/."
component: "lib:hstream-kafka"

- path: "hstream-kafka/test"
component: "hstream-kafka:test:hstream-kafka-test"
- path: "hstream-kafka/protocol"
component: "lib:kafka-protocol"

- path: "hstream-kafka/protocol/test"
component: "hstream-kafka:test:kafka-protocol-test"

- path: "hstream-processing/src"
component: "lib:hstream-processing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
module Kafka.Group.GroupMetadataManager
module HStream.Kafka.Group.GroupMetadataManager
( GroupMetadataManager
, mkGroupMetadataManager
, storeOffsets
, fetchOffsets
) where

import Control.Concurrent (MVar, modifyMVar_, newMVar,
withMVar)
import Control.Concurrent.MVar (readMVar)
import Control.Monad (unless)
import Control.Concurrent (MVar, modifyMVar_, newMVar,
withMVar)
import Control.Concurrent.MVar (readMVar)
import Control.Monad (unless)
import Data.Hashable
import qualified Data.HashMap.Strict as HM
import Data.Int (Int32, Int64)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word64)
import GHC.Generics (Generic)
import qualified HStream.Logger as Log
import Kafka.Group.OffsetsStore (OffsetStorage (..), OffsetStore)
import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray))
import qualified Kafka.Protocol.Error as K
import Kafka.Protocol.Message.Struct (OffsetCommitRequestPartitionV0 (..),
OffsetCommitResponsePartitionV0 (..),
OffsetFetchResponsePartitionV0 (..))
import qualified Data.HashMap.Strict as HM
import Data.Int (Int32, Int64)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word64)
import GHC.Generics (Generic)
import HStream.Kafka.Group.OffsetsStore (OffsetStorage (..),
OffsetStore)
import qualified HStream.Logger as Log
import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray))
import qualified Kafka.Protocol.Error as K
import Kafka.Protocol.Message (OffsetCommitRequestPartitionV0 (..),
OffsetCommitResponsePartitionV0 (..),
OffsetFetchResponsePartitionV0 (..))

data GroupMetadataManager = GroupMetadataManager
{ serverId :: Int
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Kafka.Group.OffsetsStore
module HStream.Kafka.Group.OffsetsStore
( OffsetStorage(..)
, OffsetStore(..)
, mkCkpOffsetStorage
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ module HStream.Kafka.Server.Handler (handlers) where

import HStream.Kafka.Server.Handler.Basic
import HStream.Kafka.Server.Handler.Consume
import HStream.Kafka.Server.Handler.Offset
import HStream.Kafka.Server.Handler.Produce
import HStream.Kafka.Server.Handler.Topic
import HStream.Kafka.Server.Handler.Offset
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
Expand Down
17 changes: 9 additions & 8 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
module HStream.Server.KafkaHandler.Offset
module HStream.Kafka.Server.Handler.Offset
( handleOffsetCommitV0
, handleOffsetFetchV0
)
where

import Control.Concurrent (withMVar)
import qualified Data.HashMap.Strict as HM
import qualified Data.Vector as V
import HStream.Server.Types (ServerContext (..))
import Kafka.Group.GroupMetadataManager (fetchOffsets, storeOffsets)
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Service as K
import Control.Concurrent (withMVar)
import qualified Data.HashMap.Strict as HM
import qualified Data.Vector as V
import HStream.Kafka.Group.GroupMetadataManager (fetchOffsets,
storeOffsets)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Service as K

--------------------
-- 8: OffsetCommit
Expand Down
36 changes: 21 additions & 15 deletions hstream-kafka/HStream/Kafka/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@ module HStream.Kafka.Server.Types
, transToStreamName
) where

import Control.Concurrent.STM
import Data.Text (Text)
import Data.Text (Text)
import Data.Word

import HStream.Common.ConsistentHashing (HashRing)
import HStream.Common.Server.HashRing (LoadBalanceHashRing,
initializeHashRing)
import HStream.Gossip.Types (Epoch, GossipContext)
import HStream.Kafka.Common.OffsetManager (OffsetManager,
newOffsetManager)
import HStream.Kafka.Server.Config (ServerOpts (..))
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaHandle)
import HStream.Stats (newServerStatsHolder)
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)
import Control.Concurrent (MVar, newMVar)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T
import HStream.Common.Server.HashRing (LoadBalanceHashRing,
initializeHashRing)
import HStream.Gossip.Types (GossipContext)
import HStream.Kafka.Common.OffsetManager (OffsetManager,
newOffsetManager)
import HStream.Kafka.Group.GroupMetadataManager (GroupMetadataManager)
import HStream.Kafka.Server.Config (ServerOpts (..))
import HStream.MetaStore.Types (MetaHandle)
import HStream.Stats (newServerStatsHolder)
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)

data ServerContext = ServerContext
{ serverID :: !Word32
Expand All @@ -36,6 +38,8 @@ data ServerContext = ServerContext
, loadBalanceHashRing :: !LoadBalanceHashRing
, gossipContext :: !GossipContext
, scOffsetManager :: !OffsetManager
, scGroupMetadataManagers :: MVar (HashMap T.Text GroupMetadataManager)
-- ^ {groupID: GroupMetadataManager}
}

initServerContext
Expand All @@ -49,6 +53,7 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do
statsHolder <- newServerStatsHolder
epochHashRing <- initializeHashRing gossipContext
offsetManager <- newOffsetManager ldclient 1000{- TODO: maxLogs -}
groupMetadataManager <- newMVar HM.empty

return
ServerContext
Expand All @@ -64,6 +69,7 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do
, loadBalanceHashRing = epochHashRing
, gossipContext = gossipContext
, scOffsetManager = offsetManager
, scGroupMetadataManagers = groupMetadataManager
}

transToStreamName :: Text -> S.StreamId
Expand Down
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ library
HStream.Kafka.Server.Handler.Consume
HStream.Kafka.Server.Handler.Produce
HStream.Kafka.Server.Handler.Topic
HStream.Kafka.Server.Handler.Offset

hs-source-dirs: .
build-depends:
Expand Down

0 comments on commit 5a0eebf

Please sign in to comment.