Skip to content

Commit

Permalink
Fix lookupNodePersist: wait for cluster epoch to sync before executin…
Browse files Browse the repository at this point in the history
…g a new assignment (#1676)
  • Loading branch information
daleiz authored Nov 10, 2023
1 parent 8d0fab2 commit aae088b
Showing 1 changed file with 16 additions and 18 deletions.
34 changes: 16 additions & 18 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,22 @@ lookupNodePersist metaHandle gossipContext loadBalanceHashRing
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
Nothing -> do
(epoch', hashRing) <- readTVarIO loadBalanceHashRing
if epoch' > epoch
then do
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'
else do
let errmsg = "the server has not yet synced with the latest member list"
Log.warning $ "lookupNodePersist: " <> Log.buildString errmsg
throwIO $ HE.ResourceAllocationException errmsg
(epoch', hashRing) <- atomically $ do
(epoch', hashRing) <- readTVar loadBalanceHashRing
if epoch' > epoch
then pure (epoch', hashRing)
else retry
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'

data KafkaResource
= KafkaResTopic Text
Expand Down

0 comments on commit aae088b

Please sign in to comment.