Skip to content

Commit

Permalink
fix(connector): add fixed-connector-image options (#1752)
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f authored Jan 31, 2024
1 parent 61a0cc6 commit f861289
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 6 deletions.
6 changes: 6 additions & 0 deletions conf/hstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ hserver:
# so the tasks-network should be the network that can connect to HStreamDB and external systems.
tasks-network: host
extra-docker-args: "-m 4g"

# when restarting a connector, whether use fixed connector image(if images of config file was updated),
# true -> read image from metastore
# false -> read image from config file
fixed-connector-image: true

# source images
source-images:
mysql: hstreamdb/source-mysql:latest
Expand Down
4 changes: 4 additions & 0 deletions hstream-io/HStream/IO/Meta.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ updateConfig :: MetaHandle -> T.Text -> J.Object -> IO ()
updateConfig h taskId cfg = do
updateMetaWith taskId (\(Just tm) -> tm {taskInfoMeta=tm.taskInfoMeta{connectorConfig = cfg}}) Nothing h

updateTaskConfig :: MetaHandle -> T.Text -> TaskConfig -> IO ()
updateTaskConfig h taskId cfg = do
updateMetaWith taskId (\(Just tm) -> tm {taskInfoMeta=tm.taskInfoMeta{taskConfig = cfg}}) Nothing h

mapKvKey :: T.Text -> T.Text -> T.Text
mapKvKey taskId key = taskId <> "_" <> key

Expand Down
11 changes: 6 additions & 5 deletions hstream-io/HStream/IO/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ data IOTaskStatus
deriving (Show, Eq, Generic, J.FromJSON, J.ToJSON)

data IOOptions = IOOptions
{ optTasksNetwork :: T.Text
, optTasksPath :: T.Text
, optSourceImages :: HM.HashMap T.Text T.Text
, optSinkImages :: HM.HashMap T.Text T.Text
, optExtraDockerArgs :: T.Text
{ optTasksNetwork :: T.Text
, optTasksPath :: T.Text
, optSourceImages :: HM.HashMap T.Text T.Text
, optSinkImages :: HM.HashMap T.Text T.Text
, optExtraDockerArgs :: T.Text
, optFixedConnectorImage :: Bool
} deriving (Show, Eq)

type TaskProcess = TP.Process IO.Handle IO.Handle ()
Expand Down
10 changes: 9 additions & 1 deletion hstream-io/HStream/IO/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,15 @@ recoverTask worker@Worker{..} name = do
Nothing -> throwIO $ HE.ConnectorNotFound name
Just (taskId, TaskMeta{taskInfoMeta=taskInfo@TaskInfo{..}}) -> do
let newConnCfg = J.insert "hstream" (J.toJSON hsConfig) connectorConfig
createIOTaskFromTaskInfo worker taskId taskInfo{connectorConfig=newConnCfg} options True False False
newImage = if options.optFixedConnectorImage
then taskConfig.tcImage
else makeImage taskType taskTarget options
newTaskConfig = taskConfig{tcImage=newImage}
when (newImage /= taskConfig.tcImage) $ do
Log.info $ "connector:" <> Log.build name <> " image changed, "
<> Log.build taskConfig.tcImage <> " -> " <> Log.build newImage
M.updateTaskConfig workerHandle taskId newTaskConfig
createIOTaskFromTaskInfo worker taskId taskInfo{connectorConfig=newConnCfg, taskConfig=newTaskConfig} options True False False

-- update config and restart
alterConnectorConfig :: Worker -> T.Text -> T.Text -> IO ()
Expand Down
1 change: 1 addition & 0 deletions hstream/src/HStream/Server/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ parseJSONToOptions CliOptions{..} obj = do
nodeSourceImages <- nodeIOCfg .:? "source-images" .!= HM.empty
nodeSinkImages <- nodeIOCfg .:? "sink-images" .!= HM.empty
optExtraDockerArgs <- nodeIOCfg .:? "extra-docker-args" .!= ""
optFixedConnectorImage <- nodeIOCfg .:? "fixed-connector-image" .!= True
(optSourceImages, optSinkImages) <- foldrM
(\img (ss, sk) -> do
-- "source mysql IMAGE" -> ("source" "mysq" "IMAGE")
Expand Down

0 comments on commit f861289

Please sign in to comment.