Skip to content

Commit

Permalink
connector: avoid creating connectors with the same name
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed May 28, 2024
1 parent cbde395 commit 2630c2d
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions hstream-io/HStream/IO/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ createIOTaskFromTaskInfo
=> Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO ()
createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..}
ioOptions cleanIfExists createMetaData enableCheck = do
getIOTask worker taskName >>= \case
M.getIOTaskFromName workerHandle taskName >>= \case
Nothing -> pure ()
Just _ -> do
if cleanIfExists
Expand All @@ -106,7 +106,7 @@ createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..}

when createMetaData $ M.createIOTaskMeta workerHandle taskName taskId taskInfo
C.modifyMVar_ ioTasksM $ \ioTasks -> do
-- FIXME: already check ioTask exist in `getIOTask worker` step, no need check again
-- FIXME: already check ioTask exist in `getIOTaskFromName` step, no need check again
case HM.lookup taskName ioTasks of
Just _ -> throwIO $ HE.ConnectorExists taskName
Nothing -> do
Expand Down Expand Up @@ -220,9 +220,21 @@ updateConnectorConfig worker name config = do
<> ", new config:" <> Log.buildString' newConnCfg
return True

-- WARNING: This function uses only cache in memory, which can be
-- outdated, especially under complex cluster circumstances.
-- Please be very careful when using this function, e.g. check
-- if a task already exists before creating it.
-- And remember there are <name -> task id -> task meta> mappings
-- in meta store, and the latter is never cleaned up!!!
getIOTask :: Worker -> T.Text -> IO (Maybe IOTask)
getIOTask Worker{..} name = HM.lookup name <$> C.readMVar ioTasksM

-- WARNING: This function uses only cache in memory, which can be
-- outdated, especially under complex cluster circumstances.
-- Please be very careful when using this function, e.g. check
-- if a task already exists before creating it.
-- And remember there are <name -> task id -> task meta> mappings
-- in meta store, and the latter is never cleaned up!!!
getIOTask_ :: Worker -> T.Text -> IO IOTask
getIOTask_ Worker{..} name = do
ioTasks <- C.readMVar ioTasksM
Expand Down

0 comments on commit 2630c2d

Please sign in to comment.