Skip to content

Commit

Permalink
server: delay resource allocation only on startup rather than every r…
Browse files Browse the repository at this point in the history
…eallocation
  • Loading branch information
Commelina committed May 30, 2024
1 parent 87a87f5 commit 2d1b1d7
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 56 deletions.
32 changes: 28 additions & 4 deletions common/server/HStream/Common/Server/HashRing.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module HStream.Common.Server.HashRing
( LoadBalanceHashRing
, readLoadBalanceHashRing
, initializeHashRing
, updateHashRing
) where
Expand All @@ -12,22 +13,45 @@ import HStream.Common.ConsistentHashing (HashRing, constructServerMap)
import HStream.Gossip.Types (Epoch, GossipContext)
import HStream.Gossip.Utils (getMemberListWithEpochSTM)

type LoadBalanceHashRing = TVar (Epoch, HashRing)
-- FIXME: The 'Bool' flag means "if we think the HashRing can be used for
-- resource allocation now". This is because a server node can
-- only see a part of the cluster during the early stage of startup.
-- FIXME: This is just a mitigation for the consistency problem.
type LoadBalanceHashRing = TVar (Epoch, HashRing, Bool)

readLoadBalanceHashRing :: LoadBalanceHashRing -> STM (Epoch, HashRing)
readLoadBalanceHashRing hashRing = do
(epoch, hashRing, isReady) <- readTVar hashRing
if isReady
then return (epoch, hashRing)
else retry

initializeHashRing :: GossipContext -> IO LoadBalanceHashRing
initializeHashRing gc = atomically $ do
(epoch, serverNodes) <- getMemberListWithEpochSTM gc
newTVar (epoch, constructServerMap . sort $ serverNodes)
newTVar (epoch, constructServerMap . sort $ serverNodes, False)

-- However, reconstruct hashRing every time can be expensive
-- when we have a large number of nodes in the cluster.
-- FIXME: We delayed for several seconds to make sure the node has seen
-- the whole cluster. This is only a mitigation. See the comment
-- above.
-- FIXME: Hard-coded constant!
-- WARNING: This should be called exactly once on startup!
updateHashRing :: GossipContext -> LoadBalanceHashRing -> IO ()
updateHashRing gc hashRing = loop (0,[])
updateHashRing gc hashRing = do
forkIO (earlyStageDelay 5000)
loop (0,[])
where
earlyStageDelay timeoutMs = do
threadDelay (timeoutMs * 1000)
atomically $ modifyTVar' hashRing (\(epoch, hashRing, _) -> (epoch, hashRing, True))

loop (epoch, list)=
loop =<< atomically
( do (epoch', list') <- getMemberListWithEpochSTM gc
when (epoch == epoch' && list == list') retry
writeTVar hashRing (epoch', constructServerMap list')
modifyTVar' hashRing
(\(_,_,isReady) -> (epoch', constructServerMap list', isReady))
return (epoch', list')
)
85 changes: 33 additions & 52 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ module HStream.Common.Server.Lookup
, kafkaResourceMetaId
) where

import Control.Concurrent (threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException (..), throwIO,
try)
Expand All @@ -19,7 +18,8 @@ import Data.Text (Text)
import qualified Data.Vector as V

import HStream.Common.ConsistentHashing (getResNode)
import HStream.Common.Server.HashRing (LoadBalanceHashRing)
import HStream.Common.Server.HashRing (LoadBalanceHashRing,
readLoadBalanceHashRing)
import HStream.Common.Server.MetaData (TaskAllocation (..))
import HStream.Common.Types (fromInternalServerNodeWithKey)
import qualified HStream.Exception as HE
Expand All @@ -30,7 +30,7 @@ import qualified HStream.Server.HStreamApi as A

lookupNode :: LoadBalanceHashRing -> Text -> Maybe Text -> IO A.ServerNode
lookupNode loadBalanceHashRing key advertisedListenersKey = do
(_, hashRing) <- readTVarIO loadBalanceHashRing
(_, hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing)
theNode <- getResNode hashRing key advertisedListenersKey
return theNode

Expand All @@ -42,62 +42,43 @@ lookupNodePersist
-> Text
-> Maybe Text
-> IO A.ServerNode
lookupNodePersist metaHandle_ gossipContext_ loadBalanceHashRing_
key_ metaId_ advertisedListenersKey_ =
-- FIXME: This is only a mitigation for the case that the node has not
-- known the full cluster info. Reinvestigate it!!!
-- And as you see, a hard-coded constant...
go metaHandle_ gossipContext_ loadBalanceHashRing_ key_ metaId_ advertisedListenersKey_ 5
where
-- TODO: Currerntly, 'leftRetries' only works before a re-allocation. It can be also
-- used on other cases such as encountering an exception.
go metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey leftRetries = do
-- FIXME: it will insert the results of lookup no matter the resource exists
-- or not
M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey = do
-- FIXME: it will insert the results of lookup no matter the resource exists
-- or not
M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case
Nothing -> do
(epoch, hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing)
theNode <- getResNode hashRing key advertisedListenersKey
try (M.insertMeta @TaskAllocation
metaId
(TaskAllocation epoch (A.serverNodeId theNode))
metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode
Just (TaskAllocation epoch nodeId, version) -> do
serverList <- getMemberList gossipContext >>=
fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey)
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
Nothing -> do
(epoch, hashRing) <- readTVarIO loadBalanceHashRing
theNode <- getResNode hashRing key advertisedListenersKey
try (M.insertMeta @TaskAllocation
metaId
(TaskAllocation epoch (A.serverNodeId theNode))
metaHandle) >>= \case
(epoch', hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing)
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode
Just (TaskAllocation epoch nodeId, version) -> do
serverList <- getMemberList gossipContext >>=
fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey)
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
Nothing -> do
if leftRetries > 0
then do
Log.info $ "<lookupNodePersist> on <key=" <> Log.buildString' key <> ", metaId=" <>
Log.buildString' metaId <> ">: found on Node=" <> Log.buildString' nodeId <>
", but not sure if it's really dead. Left " <> Log.buildString' leftRetries <>
" retries before re-allocate it..."
threadDelay (1 * 1000 * 1000)
go metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey (leftRetries - 1)
else do
(epoch', hashRing) <- readTVarIO loadBalanceHashRing
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'
Right () -> return theNode'

data KafkaResource
= KafkaResTopic Text
Expand Down

0 comments on commit 2d1b1d7

Please sign in to comment.