Skip to content

Commit

Permalink
fix assign consumer error (#855)
Browse files Browse the repository at this point in the history
  • Loading branch information
daleiz authored Mar 25, 2022
1 parent e8fa9a9 commit 37c892a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
24 changes: 13 additions & 11 deletions hstream/src/HStream/Server/Handler/Subscription.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word32, Word64)
import Debug.Trace
import Network.GRPC.HighLevel (StreamRecv, StreamSend)
import Network.GRPC.HighLevel.Generated
import Z.Data.Vector (Bytes)
Expand Down Expand Up @@ -165,9 +166,6 @@ streamingFetchInternal ctx (ServerBiDiRequest _ streamRecv streamSend) = do
Right Nothing -> throwIO GRPCStreamRecvCloseError
Right (Just firstReq) -> return firstReq

-- FIXME: seems SubscribeContextNewWrapper is never used after initSub, which means
-- its content will never be updated.
-- Check if a subscription is running. If not, complete init, and spawn a new thread to perform sendRecords.
initSub :: ServerContext -> SubscriptionId -> IO SubscribeContextWrapper
initSub serverCtx@ServerContext {..} subId = do
(needInit, SubscribeContextNewWrapper {..}) <- atomically $ do
Expand Down Expand Up @@ -463,7 +461,7 @@ sendRecords ctx subState subCtx@SubscribeContext {..} = do
Left err -> do
Log.fatal $ "sendReceivedRecords failed: logId=" <> Log.buildInt logId <> ", batchId=" <> Log.buildInt batchId
<> ", num of records=" <> Log.buildInt (V.length shardRecordIds) <> "\n"
<> "send records error, will remove the consumer " <> Log.buildText consumerName <> ": " <> Log.buildString (show err)
<> "will remove the consumer " <> Log.buildText consumerName <> ": " <> Log.buildString (show err)
atomically $ invalidConsumer subCtx consumerName
unless isResent $ do
resetReadingOffset logId batchId
Expand Down Expand Up @@ -566,10 +564,9 @@ assignShards assignment@Assignment {..} = do
doAssign assignment consumer logId needStartReading
return True
else do
-- FIXME: a waiting consumer was assigned a shard now, but it still
-- in waitingConsumers list, is it right?
let waiter = head waiters
doAssign assignment waiter logId needStartReading
writeTVar waitingConsumers (tail waiters)
return True

doAssign :: Assignment -> ConsumerName -> S.C_LogID -> Bool -> STM ()
Expand All @@ -595,14 +592,19 @@ doAssign Assignment {..} consumerName logId needStartReading = do
assignWaitingConsumers :: Assignment -> STM ()
assignWaitingConsumers assignment@Assignment {..} = do
consumers <- readTVar waitingConsumers
foldM_
( \goOn consumer ->
(_, successCount)<- foldM
( \(goOn, successCount) consumer ->
if goOn
then tryAssignConsumer consumer
else return goOn
then do
success <- tryAssignConsumer consumer
if success
then return (True, successCount + 1)
else return (False, successCount)
else return (goOn, successCount)
)
True
(True, 0)
consumers
writeTVar waitingConsumers (L.drop successCount consumers)
where
tryAssignConsumer :: ConsumerName -> STM Bool
tryAssignConsumer consumerName = do
Expand Down
2 changes: 1 addition & 1 deletion hstream/src/HStream/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ data ConsumerWorkload = ConsumerWorkload
instance Eq ConsumerWorkload where
(==) w1 w2 = cwConsumerName w1 == cwConsumerName w2 && cwShardCount w1 == cwShardCount w2
instance Ord ConsumerWorkload where
(<=) w1 w2 = w1 == w2 || cwShardCount w1 < cwShardCount w2
(<=) w1 w2 = w1 == w2 || cwShardCount w1 <= cwShardCount w2

type SubscriptionId = T.Text
type OrderingKey = T.Text
Expand Down

0 comments on commit 37c892a

Please sign in to comment.