Skip to content

Commit

Permalink
breaking change(kafka): use partitionId as loggroup name (#1756)
Browse files Browse the repository at this point in the history
Note that you need to clear the existing datas

Co-authored-by: YangKian <[email protected]>
  • Loading branch information
4eUeP and YangKian authored Feb 4, 2024
1 parent 4feaea8 commit cdae1a3
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 25 deletions.
4 changes: 2 additions & 2 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ loadOffsetsFromStorage GroupOffsetManager{..} = do
True -> do
(streamId, _) <- S.getStreamIdFromLogId ldClient lgId
modifyIORef' topicNum (+1)
partitions <- V.toList <$> S.listStreamPartitionsOrdered ldClient streamId
partitions <- V.toList <$> S.listStreamPartitionsOrderedByName ldClient streamId
let topicName = T.pack $ S.showStreamName streamId
tpWithLogId = zipWith (\(_, logId) idx -> (mkTopicPartition topicName idx, logId)) partitions ([0..])
res' = tpWithLogId : res
Expand Down Expand Up @@ -152,7 +152,7 @@ getOffsetsInfo GroupOffsetManager{..} topicName requestOffsets = do
Nothing -> do
Log.info $ "can't find topic-partition " <> Log.build (show tp) <> " in partitionsMap: " <> Log.build (show mp)
-- read partitions and build partitionsMap
partitions <- S.listStreamPartitionsOrdered ldClient (S.transToTopicStreamName topicName)
partitions <- S.listStreamPartitionsOrderedByName ldClient (S.transToTopicStreamName topicName)
Log.info $ "list all partitions for topic " <> Log.build topicName <> ": " <> Log.build (show partitions)
case partitions V.!? (fromIntegral partitionIndex) of
Nothing -> do
Expand Down
36 changes: 22 additions & 14 deletions hstream-kafka/HStream/Kafka/Server/Core/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ module HStream.Kafka.Server.Core.Topic
import Control.Exception (Exception (displayException, fromException),
SomeException, try)
import Control.Monad (forM)
import qualified Data.Aeson as J
import Data.Bifunctor (Bifunctor (bimap))
import Data.Int (Int16, Int32)
import qualified Data.Map as Map
import qualified Data.Map.Strict as M
import Data.Maybe (isJust)
import Data.Text (Text)
import qualified Data.Text as T
import GHC.Stack (HasCallStack)

import qualified Data.Aeson as J
import Data.Bifunctor (Bifunctor (bimap))
import qualified Data.Map as Map
import qualified HStream.Base.Time as BaseTime
import qualified HStream.Common.Server.Shard as Shard
import qualified HStream.Common.Types as CommonTypes
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
Expand All @@ -33,7 +32,7 @@ createTopic
-> Int16
-> Int32
-> Map.Map T.Text (Maybe T.Text)
-> IO ((K.ErrorCode, T.Text), [Shard.Shard])
-> IO ((K.ErrorCode, T.Text), [S.C_LogID])
createTopic ServerContext{..} name replicationFactor numPartitions configs = do
let streamId = S.transToTopicStreamName name
timeStamp <- BaseTime.getSystemNsTimestamp
Expand Down Expand Up @@ -63,23 +62,32 @@ createTopic ServerContext{..} name replicationFactor numPartitions configs = do
return ((K.UNKNOWN_SERVER_ERROR, "Unexpected Server error"), [])
Right _ -> do
let partitions = if numPartitions == -1
then kafkaBrokerConfigs.numPartitions._value
else fromIntegral numPartitions
let keyTups = CommonTypes.devideKeySpace partitions
shards_e <-
try $ forM (keyTups `zip` [0..]) $ \((startKey, endKey), i) -> do
let shard = Shard.mkShard i streamId startKey endKey (fromIntegral numPartitions)
Shard.createShard scLDClient shard
then fromIntegral kafkaBrokerConfigs.numPartitions._value
else numPartitions
shards_e <- try $ createTopicPartitions scLDClient streamId partitions
case shards_e of
Left (e :: SomeException) -> do
Log.warning $ "Exception occurs when creating shards of topic " <> Log.build name <> ": " <> Log.build (show e)
return ((K.INVALID_PARTITIONS, "Create shard for topic " <> name <> " error: " <> T.pack (displayException e)), [])
Right shards -> do
Log.info $ "Created " <> Log.build (show (length shards)) <> " shards for topic " <> Log.build name <> ": " <> Log.build (show (Shard.shardId <$> shards))
Log.info $ "Created " <> Log.build (show (length shards)) <> " shards for topic " <> Log.build name <> ": " <> Log.build (show shards)
return ((K.NONE, T.empty), shards)
where
getBacklogDuration KC.KafkaTopicConfigs{cleanupPolicy=cleanupPolicy, retentionMs=KC.RetentionMs retentionMs}
| cleanupPolicy == KC.CleanupPolicyCompact = Nothing
| retentionMs `div` 1000 > 0 = Just (fromIntegral retentionMs `div` 1000)
| otherwise = Nothing

-------------------------------------------------------------------------------

createTopicPartitions :: HasCallStack => S.LDClient -> S.StreamId -> Int32 -> IO [S.C_LogID]
createTopicPartitions client streamId partitions = do
totalCnt <- getTotalPartitionCount client streamId
forM [0..partitions-1] $ \i -> do
let key = Utils.intToCBytesWithPadding . fromIntegral $ totalCnt + i
S.createStreamPartition client streamId (Just key) M.empty

-- Get the total number of partitions of a topic
getTotalPartitionCount :: HasCallStack => S.LDClient -> S.StreamId -> IO Int32
getTotalPartitionCount client streamId = do
fromIntegral . M.size <$> S.listStreamPartitions client streamId
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ handleMetadata ctx reqCtx req = do
, partitions = K.emptyKaArray
, isInternal = False
}
shards_e <- try ((V.map snd) <$> S.listStreamPartitionsOrdered ctx.scLDClient streamId)
shards_e <- try ((V.map snd) <$> S.listStreamPartitionsOrderedByName ctx.scLDClient streamId)
case shards_e of
-- FIXME: Are the following error codes proper?
-- FIXME: We passed `Nothing` as partitions when an error occurs. Is this proper?
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ preProcessRequest ServerContext{..} r = do
let K.NonNullKaArray partitionReqs = t.partitions
-- FIXME: we can also cache this in FetchContext, however, we need to
-- consider the following: what if someone delete the topic?
orderedParts <- S.listStreamPartitionsOrdered scLDClient
orderedParts <- S.listStreamPartitionsOrderedByName scLDClient
(S.transToTopicStreamName t.topic)
ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do
M.withLabel M.totalConsumeRequest (t.topic, T.pack . show $ p.partition) $
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ listOffsetTopicPartitions
listOffsetTopicPartitions _ topicName Nothing = do
return $ K.ListOffsetsTopicResponse {partitions = K.KaArray {unKaArray = Nothing}, name = topicName}
listOffsetTopicPartitions ServerContext{..} topicName (Just offsetsPartitions) = do
orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topicName)
orderedParts <- S.listStreamPartitionsOrderedByName scLDClient (S.transToTopicStreamName topicName)
res <- V.forM offsetsPartitions $ \K.ListOffsetsPartition{..} -> do
-- TODO: handle Nothing
let partition = orderedParts V.! (fromIntegral partitionIndex)
Expand Down
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ handleProduce
-> K.RequestContext
-> K.ProduceRequest
-> IO K.ProduceResponse
handleProduce ServerContext{..} reqCtx req = do
handleProduce ServerContext{..} _reqCtx req = do
-- TODO: handle request args: acks, timeoutMs
let topicData = fromMaybe V.empty (K.unKaArray req.topicData)

responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do
-- A topic is a stream. Here we donot need to check the topic existence,
-- because the metadata api already does(?)
partitions <- S.listStreamPartitionsOrdered
scLDClient (S.transToTopicStreamName topic.name)
partitions <- S.listStreamPartitionsOrderedByName
scLDClient (S.transToTopicStreamName topic.name)
let partitionData = fromMaybe V.empty (K.unKaArray topic.partitionData)
-- TODO: limit total concurrencies ?
let loopPart = if V.length partitionData > 1
Expand Down
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ module HStream.Kafka.Server.Handler.Topic

import Control.Exception
import Control.Monad
import qualified Data.Map as Map
import Data.Maybe (isNothing)
import qualified Data.Text as T
import qualified Data.Vector as V

import qualified Data.Map as Map
import Data.Maybe (isNothing)
import HStream.Kafka.Common.OffsetManager (cleanOffsetCache)
import qualified HStream.Kafka.Common.Utils as Utils
import qualified HStream.Kafka.Server.Core.Topic as Core
Expand Down Expand Up @@ -149,7 +149,7 @@ handleDeleteTopics ServerContext{..} _ K.DeleteTopicsRequest{..} =
--
-- XXX: Normally we do not need to delete this because the logid is a
-- random number and will unlikely be reused.
partitions <- S.listStreamPartitionsOrdered scLDClient streamId
partitions <- S.listStreamPartitionsOrderedByName scLDClient streamId
V.forM_ partitions $ \(_, logid) ->
cleanOffsetCache scOffsetManager logid
S.removeStream scLDClient streamId
Expand Down

0 comments on commit cdae1a3

Please sign in to comment.