diff --git a/hstream/src/HStream/Server/Handler/Subscription.hs b/hstream/src/HStream/Server/Handler/Subscription.hs index 13edd28e0..28d610583 100644 --- a/hstream/src/HStream/Server/Handler/Subscription.hs +++ b/hstream/src/HStream/Server/Handler/Subscription.hs @@ -36,6 +36,7 @@ import qualified Data.Set as Set import qualified Data.Text as T import qualified Data.Vector as V import Data.Word (Word32, Word64) +import Debug.Trace import Network.GRPC.HighLevel (StreamRecv, StreamSend) import Network.GRPC.HighLevel.Generated import Z.Data.Vector (Bytes) @@ -165,9 +166,6 @@ streamingFetchInternal ctx (ServerBiDiRequest _ streamRecv streamSend) = do Right Nothing -> throwIO GRPCStreamRecvCloseError Right (Just firstReq) -> return firstReq --- FIXME: seems SubscribeContextNewWrapper is never used after initSub, which means --- its content will never be updated. --- Check if a subscription is running. If not, complete init, and spawn a new thread to perform sendRecords. initSub :: ServerContext -> SubscriptionId -> IO SubscribeContextWrapper initSub serverCtx@ServerContext {..} subId = do (needInit, SubscribeContextNewWrapper {..}) <- atomically $ do @@ -463,7 +461,7 @@ sendRecords ctx subState subCtx@SubscribeContext {..} = do Left err -> do Log.fatal $ "sendReceivedRecords failed: logId=" <> Log.buildInt logId <> ", batchId=" <> Log.buildInt batchId <> ", num of records=" <> Log.buildInt (V.length shardRecordIds) <> "\n" - <> "send records error, will remove the consumer " <> Log.buildText consumerName <> ": " <> Log.buildString (show err) + <> "will remove the consumer " <> Log.buildText consumerName <> ": " <> Log.buildString (show err) atomically $ invalidConsumer subCtx consumerName unless isResent $ do resetReadingOffset logId batchId @@ -566,10 +564,9 @@ assignShards assignment@Assignment {..} = do doAssign assignment consumer logId needStartReading return True else do - -- FIXME: a waiting consumer was assigned a shard now, but it still - -- in waitingConsumers list, is it right? let waiter = head waiters doAssign assignment waiter logId needStartReading + writeTVar waitingConsumers (tail waiters) return True doAssign :: Assignment -> ConsumerName -> S.C_LogID -> Bool -> STM () @@ -595,14 +592,19 @@ doAssign Assignment {..} consumerName logId needStartReading = do assignWaitingConsumers :: Assignment -> STM () assignWaitingConsumers assignment@Assignment {..} = do consumers <- readTVar waitingConsumers - foldM_ - ( \goOn consumer -> + (_, successCount)<- foldM + ( \(goOn, successCount) consumer -> if goOn - then tryAssignConsumer consumer - else return goOn + then do + success <- tryAssignConsumer consumer + if success + then return (True, successCount + 1) + else return (False, successCount) + else return (goOn, successCount) ) - True + (True, 0) consumers + writeTVar waitingConsumers (L.drop successCount consumers) where tryAssignConsumer :: ConsumerName -> STM Bool tryAssignConsumer consumerName = do diff --git a/hstream/src/HStream/Server/Types.hs b/hstream/src/HStream/Server/Types.hs index 69e81933f..27d6b21ad 100644 --- a/hstream/src/HStream/Server/Types.hs +++ b/hstream/src/HStream/Server/Types.hs @@ -143,7 +143,7 @@ data ConsumerWorkload = ConsumerWorkload instance Eq ConsumerWorkload where (==) w1 w2 = cwConsumerName w1 == cwConsumerName w2 && cwShardCount w1 == cwShardCount w2 instance Ord ConsumerWorkload where - (<=) w1 w2 = w1 == w2 || cwShardCount w1 < cwShardCount w2 + (<=) w1 w2 = w1 == w2 || cwShardCount w1 <= cwShardCount w2 type SubscriptionId = T.Text type OrderingKey = T.Text