diff --git a/hstream-admin/server/HStream/Admin/Server/Types.hs b/hstream-admin/server/HStream/Admin/Server/Types.hs index 7a861b189..ecb1e2ab4 100644 --- a/hstream-admin/server/HStream/Admin/Server/Types.hs +++ b/hstream-admin/server/HStream/Admin/Server/Types.hs @@ -274,6 +274,7 @@ data MetaCommand = MetaCmdList Text | MetaCmdGet Text Text | MetaCmdTask MetaTaskCommand + | MetaCmdClean MetaCleanCommand | MetaCmdInfo deriving (Show) @@ -283,20 +284,22 @@ metaCmdParser = O.hsubparser <> O.short 'r' <> O.metavar "RESOURCE_CATEGORY" <> O.help ("The category of the resource, currently support: " - <> "[subscription|query-info|view-info|qv-relation]"))) + <> "[subscription|query-info|view-info|qv-relation|connectors|connector-infos]"))) (O.progDesc "List all metadata of specific resource")) <> O.command "get" (O.info (MetaCmdGet <$> O.strOption ( O.long "resource" <> O.short 'r' <> O.metavar "RESOURCE_CATEGORY" <> O.help ("The category of the resource, currently support: " - <> "[subscription|query-info|query-status|view-info|qv-relation]")) + <> "[subscription|query-info|query-status|view-info|qv-relation|connector|connector-info]")) <*> O.strOption ( O.long "id" <> O.short 'i' <> O.metavar "RESOURCE_ID" <> O.help "The Id of the resource")) (O.progDesc "Get metadata of specific resource")) <> O.command "info" (O.info (pure MetaCmdInfo) (O.progDesc "Get meta info")) - ) O.<|> MetaCmdTask <$> metaTaskCmdParser + ) + O.<|> MetaCmdTask <$> metaTaskCmdParser + O.<|> MetaCmdClean <$> metaCleanCmdParser data MetaTaskCommand = MetaTaskGet Text Text @@ -307,7 +310,8 @@ metaTaskCmdParser = O.hsubparser ( O.command "get-task" (O.info (MetaTaskGet <$> O.strOption ( O.long "resource" <> O.short 'r' <> O.metavar "RESOURCE_CATEGORY" - <> O.help "The category of the resource") + <> O.help ("The category of the resource, currently support: " + <> "[stream|subscription|query|view|connector|shard|shard-reader]")) <*> O.strOption ( O.long "id" <> O.short 'i' <> O.metavar "RESOURCE_ID" @@ -315,6 +319,14 @@ metaTaskCmdParser = O.hsubparser (O.progDesc "Get task allocation metadata of specific resource")) ) +data MetaCleanCommand = CleanConnectors + deriving (Show) + +metaCleanCmdParser :: O.Parser MetaCleanCommand +metaCleanCmdParser = O.hsubparser + ( O.command "clean-connectors" (O.info (pure CleanConnectors) (O.progDesc "Clean up the taskMeta of connectors in deleted state.")) + ) + ------------------------------------------------------------------------------- -- TODO: auto generate from commom/stats/include/*.inc diff --git a/hstream/src/HStream/Server/Handler/Admin.hs b/hstream/src/HStream/Server/Handler/Admin.hs index 3bee010fa..ccf63e0ab 100644 --- a/hstream/src/HStream/Server/Handler/Admin.hs +++ b/hstream/src/HStream/Server/Handler/Admin.hs @@ -12,7 +12,8 @@ module HStream.Server.Handler.Admin import Control.Concurrent (readMVar, tryReadMVar) import Control.Concurrent.STM.TVar (readTVarIO) -import Control.Monad (forM, void) +import Control.Exception (catch, throw) +import Control.Monad (forM, void, when) import Data.Aeson ((.=)) import qualified Data.Aeson as Aeson import qualified Data.Aeson.Text as Aeson @@ -20,6 +21,7 @@ import qualified Data.HashMap.Strict as HM import qualified Data.List as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map +import qualified Data.Set as ST import Data.Text (Text) import qualified Data.Text as Text import qualified Data.Text.Lazy as TL @@ -36,7 +38,6 @@ import Proto3.Suite (Enumerated (Enumerated), import qualified Z.Data.CBytes as CB import Z.Data.CBytes (CBytes) -import Control.Exception (throw) import qualified HStream.Admin.Server.Types as AT import HStream.Base (rmTrailingZeros) import qualified HStream.Exception as HE @@ -61,6 +62,7 @@ import qualified HStream.Server.Core.View as HC #endif import HStream.Common.Server.MetaData (TaskAllocation, renderTaskAllocationsToTable) +import HStream.IO.Types (TaskIdMeta (..), TaskMeta) import HStream.Server.Exception (catchDefaultEx, defaultExceptionHandle) import qualified HStream.Server.HStreamApi as API @@ -69,6 +71,8 @@ import HStream.Server.MetaData (QVRelation, QueryInfo, renderQVRelationToTable, renderQueryInfosToTable, renderQueryStatusToTable, + renderTaskIdMetaMapToTable, + renderTaskMetaMapToTable, renderViewInfosToTable) import HStream.Server.Types import qualified HStream.Stats as Stats @@ -78,6 +82,7 @@ import HStream.Utils (Interval (..), cBytesToText, returnResp, showNodeStatus, structToJsonObject, timestampToMsTimestamp) +import ZooKeeper.Exception (ZNONODE) ------------------------------------------------------------------------------- -- All command line data types are defined in 'HStream.Admin.Types' @@ -244,19 +249,25 @@ getResType resType = runMeta :: ServerContext -> AT.MetaCommand -> IO Text runMeta ServerContext{..} (AT.MetaCmdList resType) = do case resType of - "subscription" -> pure <$> AT.tableResponse . renderSubscriptionWrapToTable =<< M.listMeta @SubscriptionWrap metaHandle - "query-info" -> pure <$> AT.plainResponse . renderQueryInfosToTable =<< M.listMeta @QueryInfo metaHandle - "view-info" -> pure <$> AT.plainResponse . renderViewInfosToTable =<< M.listMeta @ViewInfo metaHandle - "qv-relation" -> pure <$> AT.tableResponse . renderQVRelationToTable =<< M.listMeta @QVRelation metaHandle - _ -> return $ AT.errorResponse "invalid resource type, try [subscription|query-info|view-info|qv-relateion]" + "subscription" -> pure <$> AT.tableResponse . renderSubscriptionWrapToTable =<< M.listMeta @SubscriptionWrap metaHandle + "query-info" -> pure <$> AT.plainResponse . renderQueryInfosToTable =<< M.listMeta @QueryInfo metaHandle + "view-info" -> pure <$> AT.plainResponse . renderViewInfosToTable =<< M.listMeta @ViewInfo metaHandle + "qv-relation" -> pure <$> AT.tableResponse . renderQVRelationToTable =<< M.listMeta @QVRelation metaHandle + "connectors" -> pure <$> AT.tableResponse . renderTaskIdMetaMapToTable =<< M.getAllMeta @TaskIdMeta metaHandle + "connector-infos" -> pure <$> AT.tableResponse . renderTaskMetaMapToTable =<< M.getAllMeta @TaskMeta metaHandle + _ -> return $ AT.errorResponse "invalid resource type, try " + <> "[subscription|query-info|view-info|qv-relateion|connectors|connector-infos]" runMeta ServerContext{..} (AT.MetaCmdGet resType rId) = do case resType of - "subscription" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderSubscriptionWrapToTable .L.singleton) =<< M.getMeta @SubscriptionWrap rId metaHandle - "query-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderQueryInfosToTable . L.singleton) =<< M.getMeta @QueryInfo rId metaHandle - "query-status" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQueryStatusToTable . L.singleton) =<< M.getMeta @QueryStatus rId metaHandle - "view-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderViewInfosToTable . L.singleton) =<< M.getMeta @ViewInfo rId metaHandle - "qv-relation" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQVRelationToTable . L.singleton) =<< M.getMeta @QVRelation rId metaHandle - _ -> return $ AT.errorResponse "invalid resource type, try [subscription|query-info|query-status|view-info|qv-relateion]" + "subscription" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderSubscriptionWrapToTable .L.singleton) =<< M.getMeta @SubscriptionWrap rId metaHandle + "query-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderQueryInfosToTable . L.singleton) =<< M.getMeta @QueryInfo rId metaHandle + "query-status" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQueryStatusToTable . L.singleton) =<< M.getMeta @QueryStatus rId metaHandle + "view-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderViewInfosToTable . L.singleton) =<< M.getMeta @ViewInfo rId metaHandle + "qv-relation" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQVRelationToTable . L.singleton) =<< M.getMeta @QVRelation rId metaHandle + "connector" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskIdMetaMapToTable . Map.singleton rId) =<< M.getMeta @TaskIdMeta rId metaHandle + "connector-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskMetaMapToTable . Map.singleton rId) =<< M.getMeta @TaskMeta rId metaHandle + _ -> return $ AT.errorResponse "invalid resource type, try " + <> "[subscription|query-info|query-status|view-info|qv-relateion|connector|connector-info]" runMeta ServerContext{serverOpts=ServerOpts{..}} AT.MetaCmdInfo = do let headers = ["Meta Type" :: Text, "Connection Info"] rows = case _metaStore of @@ -266,12 +277,24 @@ runMeta ServerContext{serverOpts=ServerOpts{..}} AT.MetaCmdInfo = do content = Aeson.object ["headers" .= headers, "rows" .= rows] return $ AT.tableResponse content runMeta sc (AT.MetaCmdTask taskCmd) = runMetaTask sc taskCmd +runMeta sc (AT.MetaCmdClean cmd) = runMetaCleanTask sc cmd runMetaTask :: ServerContext -> AT.MetaTaskCommand -> IO Text runMetaTask ServerContext{..} (AT.MetaTaskGet resType rId) = do let metaId = mkAllocationKey (getResType resType) rId pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskAllocationsToTable . L.singleton) =<< M.getMeta @TaskAllocation metaId metaHandle +runMetaCleanTask :: ServerContext -> AT.MetaCleanCommand -> IO Text +runMetaCleanTask ServerContext{..} AT.CleanConnectors = do + activIds <- ST.fromList . map taskIdMeta <$> M.listMeta @TaskIdMeta metaHandle + allTaskMetas <- M.getAllMeta @TaskMeta metaHandle + void $ Map.traverseWithKey (\k _ -> removeMeta k activIds) allTaskMetas + return $ AT.plainResponse "OK" + where + removeMeta key ids = do + when (ST.notMember key ids) $ + catch (M.deleteMeta @TaskMeta key Nothing metaHandle) $ \(_ :: ZNONODE) -> return () + ------------------------------------------------------------------------------- -- Admin Stream Command diff --git a/hstream/src/HStream/Server/MetaData/Types.hs b/hstream/src/HStream/Server/MetaData/Types.hs index 0818e4b80..28cb678b7 100644 --- a/hstream/src/HStream/Server/MetaData/Types.hs +++ b/hstream/src/HStream/Server/MetaData/Types.hs @@ -32,6 +32,8 @@ module HStream.Server.MetaData.Types , renderQueryStatusToTable , renderViewInfosToTable , renderQVRelationToTable + , renderTaskMetaMapToTable + , renderTaskIdMetaMapToTable #ifdef HStreamEnableSchema , hstreamColumnCatalogToColumnCatalog @@ -55,8 +57,11 @@ import Data.Int (Int64) import qualified Data.IntMap as IntMap import Data.IORef import qualified Data.List as L +import Data.Map (Map) +import qualified Data.Map.Strict as M import Data.Text (Text) import qualified Data.Text as T +import qualified Data.Text.Lazy as TL import Data.Time.Clock.System (SystemTime (MkSystemTime), getSystemTime) import qualified Data.Vector as V @@ -67,6 +72,9 @@ import GHC.Stack import HStream.Common.Server.MetaData (rootPath) import HStream.Common.ZookeeperClient (ZookeeperClient) +import HStream.IO.Types (TaskIdMeta (..), + TaskInfo (..), + TaskMeta (..)) import qualified HStream.Logger as Log import HStream.MetaStore.Types (FHandle, HasPath (..), MetaHandle, @@ -79,6 +87,7 @@ import qualified HStream.Server.HStreamApi as API import HStream.Server.MetaData.Exception import HStream.Server.Types (SubscriptionWrap (..)) import qualified HStream.Store as S +import HStream.ThirdParty.Protobuf (toRFC3339) import HStream.Utils #ifdef HStreamUseV2Engine import DiffFlow.Types @@ -157,6 +166,27 @@ renderQVRelationToTable relations = rows = map (\QVRelation{..} -> [qvRelationQueryName, qvRelationViewName]) relations in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows] +renderTaskMetaMapToTable :: Map Text TaskMeta -> Aeson.Value +renderTaskMetaMapToTable mp = + let headers = [ "Connector Name" :: Text + , "Task Id" :: Text + , "Task Type" :: Text + , "Created Time" :: Text + , "State" :: Text + ] + rows = map getMetaInfo $ M.toList mp + in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows] + where + getMetaInfo (taskId, TaskMeta{taskInfoMeta=TaskInfo{..}, ..}) = + let createTime = TL.toStrict . toRFC3339 $ taskCreatedTime + in [taskName, taskId, T.pack . show $ taskType, createTime, T.pack . show $ taskStateMeta] + +renderTaskIdMetaMapToTable :: Map Text TaskIdMeta -> Aeson.Value +renderTaskIdMetaMapToTable mp = + let headers = ["Connector Name" :: Text , "Task Id"] + rows = map (\(name, TaskIdMeta{..}) -> [name, taskIdMeta]) $ M.toList mp + in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows] + type SourceStreams = [Text] type SinkStream = Text type RelatedStreams = (SourceStreams, SinkStream)