Skip to content

Commit

Permalink
kafka: add commit offset request counter (#1718)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Dec 26, 2023
1 parent 24c1168 commit 5e74730
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 1 deletion.
12 changes: 12 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/Metrics/ConsumeStats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ totalConsumeRequest =
P.Info "total_consume_request" "Total consume request for a topic"
{-# NOINLINE totalConsumeRequest #-}

totalOffsetCommitRequest :: P.Vector P.Label1 P.Counter
totalOffsetCommitRequest =
P.unsafeRegister . P.vector "consumer_group" . P.counter $
P.Info "total_offset_commit_request" "Total offset commit request for a consumer group"
{-# NOINLINE totalOffsetCommitRequest #-}

totalFailedOffsetCommitRequest :: P.Vector P.Label1 P.Counter
totalFailedOffsetCommitRequest =
P.unsafeRegister . P.vector "consumer_group" . P.counter $
P.Info "total_failed_offset_commit_request" "Total failed offset commit request for a consumer group"
{-# NOINLINE totalFailedOffsetCommitRequest #-}

consumerGroupCommittedOffsets :: P.Vector P.Label3 P.Gauge
consumerGroupCommittedOffsets =
P.unsafeRegister . P.vector ("consumer_group", "topicName", "partition") . P.gauge $
Expand Down
3 changes: 3 additions & 0 deletions hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import qualified HStream.Common.Server.Lookup as Lookup
import qualified HStream.Common.Server.MetaData as CM
import qualified HStream.Common.Server.TaskManager as TM
import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException))
import qualified HStream.Kafka.Common.Metrics as Metrics
import qualified HStream.Kafka.Common.Utils as Utils
import HStream.Kafka.Group.Group (Group)
import qualified HStream.Kafka.Group.Group as G
Expand Down Expand Up @@ -140,12 +141,14 @@ heartbeat coordinator req = do
commitOffsets :: GroupCoordinator -> K.OffsetCommitRequest -> IO K.OffsetCommitResponse
commitOffsets coordinator req = do
handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do
Metrics.withLabel Metrics.totalOffsetCommitRequest req.groupId Metrics.incCounter
group <- if req.generationId < 0 then do
getOrMaybeCreateGroup coordinator req.groupId ""
else do
getGroup coordinator req.groupId
G.commitOffsets group req
where makeErrorResponse code = do
Metrics.withLabel Metrics.totalFailedOffsetCommitRequest req.groupId Metrics.incCounter
let resp = K.OffsetCommitResponse {topics = Utils.mapKaArray (mapTopic code) req.topics, throttleTimeMs=0}
Log.fatal $ "commitOffsets error with code: " <> Log.build (show code)
<> "\n\trequest: " <> Log.build (show req)
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ storeOffsets gmm@GroupOffsetManager{..} topicName arrayOffsets = do
let checkPoints = V.foldl' (\acc (_, logId, offset) -> Map.insert logId offset acc) Map.empty offsetsInfo
commitOffsets offsetStorage groupName checkPoints
Log.debug $ "consumer group " <> Log.build groupName <> " commit offsets {" <> Log.build (show checkPoints)
<> "} to topic " <> Log.build topicName
<> "} for topic " <> Log.build topicName

V.forM_ offsetsInfo $ \(tp, _, offset) -> do
M.withLabel M.consumerGroupCommittedOffsets (groupName, topicName, T.pack . show $ tp.topicPartitionIdx) $
Expand Down

0 comments on commit 5e74730

Please sign in to comment.