Skip to content

Commit

Permalink
hadmin-server: add connector resource type for meta command
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jun 3, 2024
1 parent b1820d8 commit 608e204
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 13 deletions.
4 changes: 2 additions & 2 deletions hstream-admin/server/HStream/Admin/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,13 @@ metaCmdParser = O.hsubparser
<> O.short 'r'
<> O.metavar "RESOURCE_CATEGORY"
<> O.help ("The category of the resource, currently support: "
<> "[subscription|query-info|view-info|qv-relation]")))
<> "[subscription|query-info|view-info|qv-relation|connectors|connector-infos]")))
(O.progDesc "List all metadata of specific resource"))
<> O.command "get" (O.info (MetaCmdGet <$> O.strOption ( O.long "resource"
<> O.short 'r'
<> O.metavar "RESOURCE_CATEGORY"
<> O.help ("The category of the resource, currently support: "
<> "[subscription|query-info|query-status|view-info|qv-relation]"))
<> "[subscription|query-info|query-status|view-info|qv-relation|connector|connector-info]"))
<*> O.strOption ( O.long "id"
<> O.short 'i'
<> O.metavar "RESOURCE_ID"
Expand Down
31 changes: 20 additions & 11 deletions hstream/src/HStream/Server/Handler/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import qualified HStream.Server.Core.View as HC
#endif
import HStream.Common.Server.MetaData (TaskAllocation,
renderTaskAllocationsToTable)
import HStream.IO.Types (TaskIdMeta, TaskMeta)
import HStream.Server.Exception (catchDefaultEx,
defaultExceptionHandle)
import qualified HStream.Server.HStreamApi as API
Expand All @@ -69,6 +70,8 @@ import HStream.Server.MetaData (QVRelation, QueryInfo,
renderQVRelationToTable,
renderQueryInfosToTable,
renderQueryStatusToTable,
renderTaskIdMetaMapToTable,
renderTaskMetaMapToTable,
renderViewInfosToTable)
import HStream.Server.Types
import qualified HStream.Stats as Stats
Expand Down Expand Up @@ -244,19 +247,25 @@ getResType resType =
runMeta :: ServerContext -> AT.MetaCommand -> IO Text
runMeta ServerContext{..} (AT.MetaCmdList resType) = do
case resType of
"subscription" -> pure <$> AT.tableResponse . renderSubscriptionWrapToTable =<< M.listMeta @SubscriptionWrap metaHandle
"query-info" -> pure <$> AT.plainResponse . renderQueryInfosToTable =<< M.listMeta @QueryInfo metaHandle
"view-info" -> pure <$> AT.plainResponse . renderViewInfosToTable =<< M.listMeta @ViewInfo metaHandle
"qv-relation" -> pure <$> AT.tableResponse . renderQVRelationToTable =<< M.listMeta @QVRelation metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try [subscription|query-info|view-info|qv-relateion]"
"subscription" -> pure <$> AT.tableResponse . renderSubscriptionWrapToTable =<< M.listMeta @SubscriptionWrap metaHandle
"query-info" -> pure <$> AT.plainResponse . renderQueryInfosToTable =<< M.listMeta @QueryInfo metaHandle
"view-info" -> pure <$> AT.plainResponse . renderViewInfosToTable =<< M.listMeta @ViewInfo metaHandle
"qv-relation" -> pure <$> AT.tableResponse . renderQVRelationToTable =<< M.listMeta @QVRelation metaHandle
"connectors" -> pure <$> AT.tableResponse . renderTaskIdMetaMapToTable =<< M.getAllMeta @TaskIdMeta metaHandle
"connector-infos" -> pure <$> AT.tableResponse . renderTaskMetaMapToTable =<< M.getAllMeta @TaskMeta metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try "
<> "[subscription|query-info|view-info|qv-relateion|connectors|connector-infos]"
runMeta ServerContext{..} (AT.MetaCmdGet resType rId) = do
case resType of
"subscription" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderSubscriptionWrapToTable .L.singleton) =<< M.getMeta @SubscriptionWrap rId metaHandle
"query-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderQueryInfosToTable . L.singleton) =<< M.getMeta @QueryInfo rId metaHandle
"query-status" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQueryStatusToTable . L.singleton) =<< M.getMeta @QueryStatus rId metaHandle
"view-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderViewInfosToTable . L.singleton) =<< M.getMeta @ViewInfo rId metaHandle
"qv-relation" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQVRelationToTable . L.singleton) =<< M.getMeta @QVRelation rId metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try [subscription|query-info|query-status|view-info|qv-relateion]"
"subscription" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderSubscriptionWrapToTable .L.singleton) =<< M.getMeta @SubscriptionWrap rId metaHandle
"query-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderQueryInfosToTable . L.singleton) =<< M.getMeta @QueryInfo rId metaHandle
"query-status" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQueryStatusToTable . L.singleton) =<< M.getMeta @QueryStatus rId metaHandle
"view-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderViewInfosToTable . L.singleton) =<< M.getMeta @ViewInfo rId metaHandle
"qv-relation" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQVRelationToTable . L.singleton) =<< M.getMeta @QVRelation rId metaHandle
"connector" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskIdMetaMapToTable . Map.singleton rId) =<< M.getMeta @TaskIdMeta rId metaHandle
"connector-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskMetaMapToTable . Map.singleton rId) =<< M.getMeta @TaskMeta rId metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try "
<> "[subscription|query-info|query-status|view-info|qv-relateion|connector|connector-info]"
runMeta ServerContext{serverOpts=ServerOpts{..}} AT.MetaCmdInfo = do
let headers = ["Meta Type" :: Text, "Connection Info"]
rows = case _metaStore of
Expand Down
30 changes: 30 additions & 0 deletions hstream/src/HStream/Server/MetaData/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ module HStream.Server.MetaData.Types
, renderQueryStatusToTable
, renderViewInfosToTable
, renderQVRelationToTable
, renderTaskMetaMapToTable
, renderTaskIdMetaMapToTable

#ifdef HStreamEnableSchema
, hstreamColumnCatalogToColumnCatalog
Expand All @@ -55,8 +57,11 @@ import Data.Int (Int64)
import qualified Data.IntMap as IntMap
import Data.IORef
import qualified Data.List as L
import Data.Map (Map)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import Data.Time.Clock.System (SystemTime (MkSystemTime),
getSystemTime)
import qualified Data.Vector as V
Expand All @@ -67,6 +72,9 @@ import GHC.Stack

import HStream.Common.Server.MetaData (rootPath)
import HStream.Common.ZookeeperClient (ZookeeperClient)
import HStream.IO.Types (TaskIdMeta (..),
TaskInfo (..),
TaskMeta (..))
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (FHandle, HasPath (..),
MetaHandle,
Expand All @@ -79,6 +87,7 @@ import qualified HStream.Server.HStreamApi as API
import HStream.Server.MetaData.Exception
import HStream.Server.Types (SubscriptionWrap (..))
import qualified HStream.Store as S
import HStream.ThirdParty.Protobuf (toRFC3339)
import HStream.Utils
#ifdef HStreamUseV2Engine
import DiffFlow.Types
Expand Down Expand Up @@ -157,6 +166,27 @@ renderQVRelationToTable relations =
rows = map (\QVRelation{..} -> [qvRelationQueryName, qvRelationViewName]) relations
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]

renderTaskMetaMapToTable :: Map Text TaskMeta -> Aeson.Value
renderTaskMetaMapToTable mp =
let headers = [ "Connector Name" :: Text
, "Task Id" :: Text
, "Task Type" :: Text
, "Created Time" :: Text
, "State" :: Text
]
rows = map getMetaInfo $ M.toList mp
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]
where
getMetaInfo (taskId, TaskMeta{taskInfoMeta=TaskInfo{..}, ..}) =
let createTime = TL.toStrict . toRFC3339 $ taskCreatedTime
in [taskName, taskId, T.pack . show $ taskType, createTime, T.pack . show $ taskStateMeta]

renderTaskIdMetaMapToTable :: Map Text TaskIdMeta -> Aeson.Value
renderTaskIdMetaMapToTable mp =
let headers = ["Connector Name" :: Text , "Task Id"]
rows = map (\(name, TaskIdMeta{..}) -> [name, taskIdMeta]) $ M.toList mp
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]

type SourceStreams = [Text]
type SinkStream = Text
type RelatedStreams = (SourceStreams, SinkStream)
Expand Down

0 comments on commit 608e204

Please sign in to comment.