Skip to content

Commit

Permalink
fix(sub): fix invalidConsumer (#1517)
Browse files Browse the repository at this point in the history
  • Loading branch information
daleiz authored Jul 18, 2023
1 parent 27eb68f commit 77068e6
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions hstream/src/HStream/Server/Core/Subscription.hs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do
<> ", num of records=" <> Log.build (V.length shardRecordIds) <> "\n"
<> "will remove the consumer " <> Log.build consumerName <> ": " <> Log.buildString (show err)
atomically $ invalidConsumer subCtx consumerName
Log.warning $ "Sub " <> Log.build subSubscriptionId <> " invalided consumer " <> Log.build consumerName
if isResent
then registerResend logId batchId shardRecordIds
else resetReadingOffset logId batchId
Expand Down Expand Up @@ -989,11 +990,13 @@ recvAcks ServerContext {..} subState subCtx@SubscribeContext{..} ConsumerContext
<> " trigger a streamRecv error: " <> Log.build (show err)
-- invalid consumer
atomically $ invalidConsumer subCtx ccConsumerName
Log.warning $ "Sub " <> Log.build subSubscriptionId <> " invalided consumer " <> Log.build ccConsumerName
throwIO $ HE.StreamReadError "Consumer recv error"
Right Nothing -> do
Log.info $ "Consumer " <> Log.build ccConsumerName <> " finished ack-sending stream to sub " <> Log.build subSubscriptionId
-- This means that the consumer finished sending acks actively.
atomically $ invalidConsumer subCtx ccConsumerName
Log.warning $ "Sub " <> Log.build subSubscriptionId <> " invalided consumer " <> Log.build ccConsumerName
-- throwIO $ HE.StreamReadClose "Consumer is closed"
Right (Just StreamingFetchRequest {..}) -> do
Log.debug $ "Sub " <> Log.build subSubscriptionId <> " receive " <> Log.build (V.length streamingFetchRequestAckIds)
Expand Down Expand Up @@ -1194,8 +1197,8 @@ invalidConsumer SubscribeContext{subAssignment = Assignment{..}, ..} consumer =
modifyTVar consumerWorkloads
(Set.filter (\ConsumerWorkload{..} ->
cwConsumerName /= consumer))
let newConsumerCtx = HM.delete consumer ccs
writeTVar subConsumerContexts newConsumerCtx
let newConsumerCtx = HM.delete consumer ccs
writeTVar subConsumerContexts newConsumerCtx
else do
-- traceM "consumer is invalid, just return"
pure ()
Expand Down

0 comments on commit 77068e6

Please sign in to comment.