Skip to content

Commit

Permalink
feat(kafka): naive consumer group (#1629)
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f authored Oct 13, 2023
1 parent 4081e96 commit 45f7e57
Show file tree
Hide file tree
Showing 13 changed files with 980 additions and 115 deletions.
18 changes: 14 additions & 4 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,18 @@ lookupNodePersist metaHandle gossipContext loadBalanceHashRing

data KafkaResource
= KafkaResTopic Text
| KafkaResGroup Text

kafkaResourceKey :: KafkaResource -> Text
kafkaResourceKey (KafkaResTopic name) = name
kafkaResourceKey (KafkaResGroup name) = name

kafkaResourceMetaId :: KafkaResource -> Text
kafkaResourceMetaId (KafkaResTopic name) = "KafkaResTopic_" <> name
kafkaResourceMetaId (KafkaResGroup name) = "KafkaResGroup_" <> name

lookupKafka :: LoadBalanceHashRing -> Maybe Text -> KafkaResource -> IO A.ServerNode
lookupKafka lbhr alk (KafkaResTopic topicId) = lookupNode lbhr topicId alk
lookupKafka lbhr alk res = lookupNode lbhr (kafkaResourceKey res) alk

lookupKafkaPersist
:: M.MetaHandle
Expand All @@ -96,6 +105,7 @@ lookupKafkaPersist
-> Maybe Text
-> KafkaResource
-> IO A.ServerNode
lookupKafkaPersist mh gc lbhr alk (KafkaResTopic topicId) =
let metaId = "KafkaResTopic_" <> topicId
in lookupNodePersist mh gc lbhr topicId metaId alk
lookupKafkaPersist mh gc lbhr alk kafkaResource =
let key = kafkaResourceKey kafkaResource
metaId = kafkaResourceMetaId kafkaResource
in lookupNodePersist mh gc lbhr key metaId alk
11 changes: 11 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/KafkaException.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module HStream.Kafka.Common.KafkaException
( ErrorCodeException (..)
) where

import qualified Control.Exception as E
import qualified Kafka.Protocol.Error as K

-------------------------------------------------------------------------------

newtype ErrorCodeException = ErrorCodeException K.ErrorCode deriving Show
instance E.Exception ErrorCodeException
45 changes: 45 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/Utils.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DuplicateRecordFields #-}

module HStream.Kafka.Common.Utils where

import Control.Exception (throw)
import qualified Control.Monad as M
import qualified Data.HashTable.IO as H
import Data.Maybe (fromMaybe)
import qualified Data.Vector as V
import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException))
import qualified Kafka.Protocol.Encoding as K

type HashTable k v = H.BasicHashTable k v

hashtableGet hashTable key errorCode = H.lookup hashTable key >>= \case
Nothing -> throw (ErrorCodeException errorCode)
Just v -> return v

hashtableDeleteAll hashTable = do
lst <- H.toList hashTable
M.forM_ lst $ \(key, _) -> H.delete hashTable key

kaArrayToList :: K.KaArray a -> [a]
kaArrayToList = undefined

listToKaArray :: [a] -> K.KaArray a
listToKaArray = undefined

kaArrayToVector :: K.KaArray a -> V.Vector a
kaArrayToVector kaArray = fromMaybe V.empty (K.unKaArray kaArray)

vectorToKaArray :: V.Vector a -> K.KaArray a
vectorToKaArray vec = K.KaArray (Just vec)

mapKaArray :: (a -> b) -> K.KaArray a -> K.KaArray b
mapKaArray f arr = K.KaArray (fmap (V.map f) (K.unKaArray arr))

mapKaArrayM :: (a -> IO b) -> K.KaArray a -> IO (K.KaArray b)
mapKaArrayM f arr = case K.unKaArray arr of
Nothing -> return (K.KaArray Nothing)
Just vec -> K.KaArray . Just <$> V.mapM f vec

forKaArrayM :: K.KaArray a -> (a -> IO b) -> IO (K.KaArray b)
forKaArrayM = flip mapKaArrayM
Loading

0 comments on commit 45f7e57

Please sign in to comment.