Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hadmin: remove the "node" field from the result of the listConnector command #1825

Merged
merged 4 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ module HStream.Common.Server.Lookup
) where

import Control.Concurrent.STM
import Control.Exception (SomeException (..), throwIO,
try)
import Control.Exception (SomeException (..), try)
import Data.List (find)
import Data.Text (Text)
import qualified Data.Vector as V
Expand All @@ -22,7 +21,6 @@ import HStream.Common.Server.HashRing (LoadBalanceHashRing,
readLoadBalanceHashRing)
import HStream.Common.Server.MetaData (TaskAllocation (..))
import HStream.Common.Types (fromInternalServerNodeWithKey)
import qualified HStream.Exception as HE
import HStream.Gossip (GossipContext, getMemberList)
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as M
Expand All @@ -31,8 +29,7 @@ import qualified HStream.Server.HStreamApi as A
lookupNode :: LoadBalanceHashRing -> Text -> Maybe Text -> IO A.ServerNode
lookupNode loadBalanceHashRing key advertisedListenersKey = do
(_, hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing)
theNode <- getResNode hashRing key advertisedListenersKey
return theNode
getResNode hashRing key advertisedListenersKey

lookupNodePersist
:: M.MetaHandle
Expand Down
5 changes: 3 additions & 2 deletions hstream-io/HStream/IO/Meta.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import qualified Data.Text as T
import GHC.Stack (HasCallStack)

import qualified Data.Aeson as J
import qualified Data.Map.Strict as M
import HStream.IO.Types
import qualified HStream.IO.Types as Types
import HStream.MetaStore.Types (MetaHandle, MetaStore (..))
Expand All @@ -23,8 +24,8 @@ createIOTaskMeta h taskName taskId taskInfo = do
insertMeta taskName (TaskIdMeta taskId) h

listIOTaskMeta :: MetaHandle -> IO [API.Connector]
listIOTaskMeta h = do
map convertTaskMeta . filter (\TaskMeta{..} -> taskStateMeta /= DELETED) <$> listMeta @TaskMeta h
listIOTaskMeta h =
map convertTaskMeta . filter (\(_, TaskMeta{..}) -> taskStateMeta /= DELETED) . M.toList <$> getAllMeta @TaskMeta h

getIOTaskMeta :: MetaHandle -> T.Text -> IO (Maybe TaskMeta)
getIOTaskMeta h tid = getMeta tid h
Expand Down
5 changes: 3 additions & 2 deletions hstream-io/HStream/IO/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,14 @@ instance HasPath TaskIdMeta FHandle where
instance HasPath TaskKvMeta FHandle where
myRootPath = "ioTaskKvs"

convertTaskMeta :: TaskMeta -> API.Connector
convertTaskMeta TaskMeta {..} =
convertTaskMeta :: (T.Text, TaskMeta) -> API.Connector
convertTaskMeta (taskId, TaskMeta {..}) =
def { API.connectorName = taskName taskInfoMeta
, API.connectorType = ioTaskTypeToText . taskType $ taskInfoMeta
, API.connectorTarget = taskTarget taskInfoMeta
, API.connectorCreationTime = Just . taskCreatedTime $ taskInfoMeta
, API.connectorStatus = ioTaskStatusToText taskStateMeta
, API.connectorTaskId = taskId
, API.connectorConfig = TL.toStrict . J.encodeToLazyText . J.lookup "connector" $ connectorConfig taskInfoMeta
}

Expand Down
5 changes: 2 additions & 3 deletions hstream-io/HStream/IO/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ createIOTask worker@Worker{..} name typ target cfg = do
createIOTaskFromTaskInfo
:: HasCallStack
=> Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO ()
createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..}
createIOTaskFromTaskInfo Worker{..} taskId taskInfo@TaskInfo {..}
ioOptions cleanIfExists createMetaData enableCheck = do
M.getIOTaskFromName workerHandle taskName >>= \case
Nothing -> pure ()
Expand Down Expand Up @@ -132,9 +132,8 @@ showIOTask_ worker@Worker{..} name = do
Nothing -> throwIO $ HE.ConnectorNotFound name
Just c -> do
dockerStatus <- getDockerStatus task
let connector = convertTaskMeta c
let connector = convertTaskMeta (taskId, c)
return $ connector { API.connectorOffsets = taskOffsets
, API.connectorTaskId = taskId
, API.connectorNode = fromMaybe "" (getServerNode connectorConfig)
, API.connectorConfig = getConnectorConfig connectorConfig
, API.connectorImage = tcImage taskConfig
Expand Down
3 changes: 1 addition & 2 deletions hstream/src/HStream/Server/Handler/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,11 @@ runQuery sc AT.QueryCmdList = do
runConnector :: ServerContext -> AT.ConnectorCommand -> IO Text
runConnector ServerContext{..} AT.ConnectorCmdList = do
connectors <- HC.listIOTasks scIOWorker
let headers = ["Connector Name" :: Text, "Type", "Target", "Node", "Task ID", "Status", "CreatedTime"]
let headers = ["Connector Name" :: Text, "Type", "Target", "Task ID", "Status", "CreatedTime"]
rows <- forM connectors $ \API.Connector{..} -> do
return [ connectorName
, connectorType
, connectorTarget
, connectorNode
, connectorTaskId
, connectorStatus
, maybe "unknown" (Text.pack . show . timestampToMsTimestamp) connectorCreationTime
Expand Down
Loading