Skip to content

Commit

Permalink
fix get consumer info error from subCtx (#850)
Browse files Browse the repository at this point in the history
  • Loading branch information
daleiz authored Mar 23, 2022
1 parent bad34ce commit 3487dea
Showing 1 changed file with 65 additions and 54 deletions.
119 changes: 65 additions & 54 deletions hstream/src/HStream/Server/Handler/Subscription.hs
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,17 @@ sendRecords ctx@ServerContext {..} subState subCtx@SubscribeContext {..} = do
else pure ()

s2c <- readTVar shard2Consumer
let consumer = s2c HM.! logId
ccs <- readTVar subConsumerContexts
let ConsumerContext {..} = ccs HM.! consumer
iv <- readTVar ccIsValid
if iv
then return $ Just (ccConsumerName, ccStreamSend)
else return Nothing
case HM.lookup logId s2c of
Nothing -> return Nothing
Just consumer -> do
ccs <- readTVar subConsumerContexts
case HM.lookup consumer ccs of
Nothing -> return Nothing
Just ConsumerContext {..} -> do
iv <- readTVar ccIsValid
if iv
then return $ Just (ccConsumerName, ccStreamSend)
else return Nothing
case mres of
Nothing -> return False
Just (consumerName, streamSend) ->
Expand Down Expand Up @@ -630,30 +634,35 @@ assignWaitingConsumers assignment@Assignment {..} = do
Just ConsumerWorkload {..} -> do
if cwShardCount > 1
then do
shard <- removeOneShardFromConsumer cwConsumerName
doAssign assignment consumerName shard False
return True
res <- removeOneShardFromConsumer cwConsumerName
case res of
Nothing -> return False
Just shard -> do
doAssign assignment consumerName shard False
return True
else do
return False

removeOneShardFromConsumer :: ConsumerName -> STM S.C_LogID
removeOneShardFromConsumer :: ConsumerName -> STM (Maybe S.C_LogID)
removeOneShardFromConsumer consumerName = do
c2s <- readTVar consumer2Shards
let shardsTVar = c2s HM.! consumerName
shards <- readTVar shardsTVar
let (shard, newShards) = Set.deleteFindMax shards
writeTVar shardsTVar newShards
workloads <- readTVar consumerWorkloads
let oldCount = Set.size shards
target = ConsumerWorkload {cwConsumerName = consumerName, cwShardCount = oldCount}
tempWorkloads = Set.delete target workloads
newWorkloads = Set.insert target {cwShardCount = oldCount - 1} tempWorkloads
writeTVar consumerWorkloads newWorkloads

s2c <- readTVar shard2Consumer
let newS2c = HM.delete shard s2c
writeTVar shard2Consumer newS2c
return shard
case HM.lookup consumerName c2s of
Nothing -> pure Nothing
Just shardsTVar -> do
shards <- readTVar shardsTVar
let (shard, newShards) = Set.deleteFindMax shards
writeTVar shardsTVar newShards
workloads <- readTVar consumerWorkloads
let oldCount = Set.size shards
target = ConsumerWorkload {cwConsumerName = consumerName, cwShardCount = oldCount}
tempWorkloads = Set.delete target workloads
newWorkloads = Set.insert target {cwShardCount = oldCount - 1} tempWorkloads
writeTVar consumerWorkloads newWorkloads

s2c <- readTVar shard2Consumer
let newS2c = HM.delete shard s2c
writeTVar shard2Consumer newS2c
return $ Just shard

recvAcks :: ServerContext -> TVar SubscribeState -> SubscribeContext -> ConsumerContext -> (StreamRecv StreamingFetchRequest) -> IO ()
recvAcks ServerContext {..} subState subCtx@SubscribeContext {..} ConsumerContext {..} streamRecv = loop
Expand Down Expand Up @@ -752,35 +761,37 @@ doAck ldclient SubscribeContext {..} logId recordIds= do
invalidConsumer :: SubscribeContext -> ConsumerName -> STM ()
invalidConsumer SubscribeContext{..} consumer = do
ccs <- readTVar subConsumerContexts
let cc@ConsumerContext {..} = ccs HM.! consumer
iv <- readTVar ccIsValid
if iv
then do
writeTVar ccIsValid False
case HM.lookup consumer ccs of
Nothing -> pure ()
Just cc@ConsumerContext {..} -> do
iv <- readTVar ccIsValid
if iv
then do
writeTVar ccIsValid False

let Assignment {..} = subAssignment
c2s <- readTVar consumer2Shards
let worksTVar = c2s HM.! consumer
works <- readTVar worksTVar
writeTVar worksTVar Set.empty
let nc2s = HM.delete consumer c2s
writeTVar consumer2Shards nc2s

rs <- readTVar waitingReassignedShards
s2c <- readTVar shard2Consumer
(nrs, ns2c) <- foldM
(
\ (nrs, ns2c) s ->
return (nrs ++ [s], HM.delete s ns2c)
)
(rs, s2c)
works
writeTVar waitingReassignedShards nrs
writeTVar shard2Consumer ns2c

let nccs = HM.delete consumer ccs
writeTVar subConsumerContexts nccs
else pure ()
let Assignment {..} = subAssignment
c2s <- readTVar consumer2Shards
let worksTVar = c2s HM.! consumer
works <- readTVar worksTVar
writeTVar worksTVar Set.empty
let nc2s = HM.delete consumer c2s
writeTVar consumer2Shards nc2s

rs <- readTVar waitingReassignedShards
s2c <- readTVar shard2Consumer
(nrs, ns2c) <- foldM
(
\ (nrs, ns2c) s ->
return (nrs ++ [s], HM.delete s ns2c)
)
(rs, s2c)
works
writeTVar waitingReassignedShards nrs
writeTVar shard2Consumer ns2c

let nccs = HM.delete consumer ccs
writeTVar subConsumerContexts nccs
else pure ()

tryUpdateWindowLowerBound
:: Map.Map ShardRecordId ShardRecordIdRange -- ^ ackedRanges
Expand Down

0 comments on commit 3487dea

Please sign in to comment.