diff --git a/common/server/HStream/Common/Server/Lookup.hs b/common/server/HStream/Common/Server/Lookup.hs index f8e77d0f1..54036da7a 100644 --- a/common/server/HStream/Common/Server/Lookup.hs +++ b/common/server/HStream/Common/Server/Lookup.hs @@ -10,6 +10,7 @@ module HStream.Common.Server.Lookup , kafkaResourceMetaId ) where +import Control.Concurrent (threadDelay) import Control.Concurrent.STM import Control.Exception (SomeException (..), throwIO, try) @@ -41,47 +42,62 @@ lookupNodePersist -> Text -> Maybe Text -> IO A.ServerNode -lookupNodePersist metaHandle gossipContext loadBalanceHashRing - key metaId advertisedListenersKey = do - -- FIXME: it will insert the results of lookup no matter the resource exists - -- or not - M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case - Nothing -> do - (epoch, hashRing) <- readTVarIO loadBalanceHashRing - theNode <- getResNode hashRing key advertisedListenersKey - try (M.insertMeta @TaskAllocation - metaId - (TaskAllocation epoch (A.serverNodeId theNode)) - metaHandle) >>= \case - Left (e :: SomeException) -> do - -- TODO: add a retry limit here - Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e - <> ", retry..." - lookupNodePersist metaHandle gossipContext loadBalanceHashRing - key metaId advertisedListenersKey - Right () -> return theNode - Just (TaskAllocation epoch nodeId, version) -> do - serverList <- getMemberList gossipContext >>= - fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey) - case find ((nodeId == ) . A.serverNodeId) serverList of - Just theNode -> return theNode +lookupNodePersist metaHandle_ gossipContext_ loadBalanceHashRing_ + key_ metaId_ advertisedListenersKey_ = + -- FIXME: This is only a mitigation for the case that the node has not + -- known the full cluster info. Reinvestigate it!!! + -- And as you see, a hard-coded constant... + go metaHandle_ gossipContext_ loadBalanceHashRing_ key_ metaId_ advertisedListenersKey_ 5 + where + -- TODO: Currerntly, 'leftRetries' only works before a re-allocation. It can be also + -- used on other cases such as encountering an exception. + go metaHandle gossipContext loadBalanceHashRing + key metaId advertisedListenersKey leftRetries = do + -- FIXME: it will insert the results of lookup no matter the resource exists + -- or not + M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case Nothing -> do - (epoch', hashRing) <- atomically $ do - (epoch', hashRing) <- readTVar loadBalanceHashRing - if epoch' > epoch - then pure (epoch', hashRing) - else retry - theNode' <- getResNode hashRing key advertisedListenersKey - try (M.updateMeta @TaskAllocation metaId - (TaskAllocation epoch' (A.serverNodeId theNode')) - (Just version) metaHandle) >>= \case + (epoch, hashRing) <- readTVarIO loadBalanceHashRing + theNode <- getResNode hashRing key advertisedListenersKey + try (M.insertMeta @TaskAllocation + metaId + (TaskAllocation epoch (A.serverNodeId theNode)) + metaHandle) >>= \case Left (e :: SomeException) -> do -- TODO: add a retry limit here Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e <> ", retry..." lookupNodePersist metaHandle gossipContext loadBalanceHashRing key metaId advertisedListenersKey - Right () -> return theNode' + Right () -> return theNode + Just (TaskAllocation epoch nodeId, version) -> do + serverList <- getMemberList gossipContext >>= + fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey) + case find ((nodeId == ) . A.serverNodeId) serverList of + Just theNode -> return theNode + Nothing -> do + if leftRetries > 0 + then do + Log.info $ " on Log.buildString' key <> ", metaId=" <> + Log.buildString' metaId <> ">: found on Node=" <> Log.buildString' nodeId <> + ", but not sure if it's really dead. Left " <> Log.buildString' leftRetries <> + " retries before re-allocate it..." + threadDelay (1 * 1000 * 1000) + go metaHandle gossipContext loadBalanceHashRing + key metaId advertisedListenersKey (leftRetries - 1) + else do + (epoch', hashRing) <- readTVarIO loadBalanceHashRing + theNode' <- getResNode hashRing key advertisedListenersKey + try (M.updateMeta @TaskAllocation metaId + (TaskAllocation epoch' (A.serverNodeId theNode')) + (Just version) metaHandle) >>= \case + Left (e :: SomeException) -> do + -- TODO: add a retry limit here + Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e + <> ", retry..." + lookupNodePersist metaHandle gossipContext loadBalanceHashRing + key metaId advertisedListenersKey + Right () -> return theNode' data KafkaResource = KafkaResTopic Text diff --git a/hstream-io/HStream/IO/Worker.hs b/hstream-io/HStream/IO/Worker.hs index dee23dc41..514e3ebc9 100644 --- a/hstream-io/HStream/IO/Worker.hs +++ b/hstream-io/HStream/IO/Worker.hs @@ -91,7 +91,7 @@ createIOTaskFromTaskInfo => Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO () createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..} ioOptions cleanIfExists createMetaData enableCheck = do - getIOTask worker taskName >>= \case + M.getIOTaskFromName workerHandle taskName >>= \case Nothing -> pure () Just _ -> do if cleanIfExists @@ -106,7 +106,7 @@ createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..} when createMetaData $ M.createIOTaskMeta workerHandle taskName taskId taskInfo C.modifyMVar_ ioTasksM $ \ioTasks -> do - -- FIXME: already check ioTask exist in `getIOTask worker` step, no need check again + -- FIXME: already check ioTask exist in `getIOTaskFromName` step, no need check again case HM.lookup taskName ioTasks of Just _ -> throwIO $ HE.ConnectorExists taskName Nothing -> do @@ -220,9 +220,21 @@ updateConnectorConfig worker name config = do <> ", new config:" <> Log.buildString' newConnCfg return True +-- WARNING: This function uses only cache in memory, which can be +-- outdated, especially under complex cluster circumstances. +-- Please be very careful when using this function, e.g. check +-- if a task already exists before creating it. +-- And remember there are task id -> task meta> mappings +-- in meta store, and the latter is never cleaned up!!! getIOTask :: Worker -> T.Text -> IO (Maybe IOTask) getIOTask Worker{..} name = HM.lookup name <$> C.readMVar ioTasksM +-- WARNING: This function uses only cache in memory, which can be +-- outdated, especially under complex cluster circumstances. +-- Please be very careful when using this function, e.g. check +-- if a task already exists before creating it. +-- And remember there are task id -> task meta> mappings +-- in meta store, and the latter is never cleaned up!!! getIOTask_ :: Worker -> T.Text -> IO IOTask getIOTask_ Worker{..} name = do ioTasks <- C.readMVar ioTasksM diff --git a/hstream/app/lib/KafkaServer.hs b/hstream/app/lib/KafkaServer.hs index 732de1a98..6fc7a5173 100644 --- a/hstream/app/lib/KafkaServer.hs +++ b/hstream/app/lib/KafkaServer.hs @@ -176,7 +176,14 @@ serve sc@ServerContext{..} netOpts usingCppServer usingSparseOffset = do -- FIXME: Why need to call deleteAll here? -- Also in CI, getRqResult(common/hstream/HStream/MetaStore/RqliteUtils.hs) may throw a RQLiteUnspecifiedErr -- because the affected rows are more than 1, why that's invalid ? - deleteAllMeta @M.TaskAllocation metaHandle `catches` exceptionHandlers + -- FIXME: The following line is very delicate and can cause weird problems. + -- It was intended to re-allocate tasks after a server restart. However, + -- this should be done BEFORE any node serves any client request or + -- internal task. However, the current `serverOnStarted` is not + -- ensured to be called before serving outside. + -- TODO: I do not have 100% confidence this is correct. So it should be + -- carefully investigated and tested. + -- deleteAllMeta @M.TaskAllocation metaHandle `catches` exceptionHandlers Log.info "starting task detector" TM.runTaskDetector $ TM.TaskDetector { diff --git a/hstream/app/server.hs b/hstream/app/server.hs index fb1d33971..ebdec4d4d 100644 --- a/hstream/app/server.hs +++ b/hstream/app/server.hs @@ -205,7 +205,14 @@ serve sc@ServerContext{..} rpcOpts enableStreamV2 = do Gossip -> return () _ -> do getProtoTimestamp >>= \x -> upsertMeta @Proto.Timestamp clusterStartTimeId x metaHandle - handle (\(_ :: RQLiteRowNotFound) -> return ()) $ deleteAllMeta @TaskAllocation metaHandle + -- FIXME: The following line is very delicate and can cause weird problems. + -- It was intended to re-allocate tasks after a server restart. However, + -- this should be done BEFORE any node serves any client request or + -- internal task. However, the current `serverOnStarted` is not + -- ensured to be called before serving outside. + -- TODO: I do not have 100% confidence this is correct. So it should be + -- carefully investigated and tested. + -- handle (\(_ :: RQLiteRowNotFound) -> return ()) $ deleteAllMeta @TaskAllocation metaHandle -- recover tasks Log.info "recovering local io tasks" Cluster.recoverLocalTasks sc scIOWorker