Skip to content

Commit

Permalink
hadmin-server: add metaClean subcommand to support clean dirty connec…
Browse files Browse the repository at this point in the history
…tor meta
  • Loading branch information
YangKian committed Jun 3, 2024
1 parent 608e204 commit 86f49e7
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
13 changes: 12 additions & 1 deletion hstream-admin/server/HStream/Admin/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ data MetaCommand
= MetaCmdList Text
| MetaCmdGet Text Text
| MetaCmdTask MetaTaskCommand
| MetaCmdClean MetaCleanCommand
| MetaCmdInfo
deriving (Show)

Expand All @@ -296,7 +297,17 @@ metaCmdParser = O.hsubparser
<> O.help "The Id of the resource"))
(O.progDesc "Get metadata of specific resource"))
<> O.command "info" (O.info (pure MetaCmdInfo) (O.progDesc "Get meta info"))
) O.<|> MetaCmdTask <$> metaTaskCmdParser
)
O.<|> MetaCmdTask <$> metaTaskCmdParser
O.<|> MetaCmdClean <$> metaCleanCmdParser

data MetaCleanCommand = CleanConnectors
deriving (Show)

metaCleanCmdParser :: O.Parser MetaCleanCommand
metaCleanCmdParser = O.hsubparser
( O.command "clean-connectors" (O.info (pure CleanConnectors) (O.progDesc "Clean up the taskMeta of connectors in deleted state."))
)

data MetaTaskCommand
= MetaTaskGet Text Text
Expand Down
20 changes: 17 additions & 3 deletions hstream/src/HStream/Server/Handler/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ module HStream.Server.Handler.Admin

import Control.Concurrent (readMVar, tryReadMVar)
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Monad (forM, void)
import Control.Exception (catch, throw)
import Control.Monad (forM, void, when)
import Data.Aeson ((.=))
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Text as Aeson
import qualified Data.HashMap.Strict as HM
import qualified Data.List as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as ST
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Lazy as TL
Expand All @@ -36,7 +38,6 @@ import Proto3.Suite (Enumerated (Enumerated),
import qualified Z.Data.CBytes as CB
import Z.Data.CBytes (CBytes)

import Control.Exception (throw)
import qualified HStream.Admin.Server.Types as AT
import HStream.Base (rmTrailingZeros)
import qualified HStream.Exception as HE
Expand All @@ -61,7 +62,7 @@ import qualified HStream.Server.Core.View as HC
#endif
import HStream.Common.Server.MetaData (TaskAllocation,
renderTaskAllocationsToTable)
import HStream.IO.Types (TaskIdMeta, TaskMeta)
import HStream.IO.Types (TaskIdMeta (..), TaskMeta)
import HStream.Server.Exception (catchDefaultEx,
defaultExceptionHandle)
import qualified HStream.Server.HStreamApi as API
Expand All @@ -81,6 +82,7 @@ import HStream.Utils (Interval (..), cBytesToText,
returnResp, showNodeStatus,
structToJsonObject,
timestampToMsTimestamp)
import ZooKeeper.Exception (ZNONODE)

-------------------------------------------------------------------------------
-- All command line data types are defined in 'HStream.Admin.Types'
Expand Down Expand Up @@ -275,12 +277,24 @@ runMeta ServerContext{serverOpts=ServerOpts{..}} AT.MetaCmdInfo = do
content = Aeson.object ["headers" .= headers, "rows" .= rows]
return $ AT.tableResponse content
runMeta sc (AT.MetaCmdTask taskCmd) = runMetaTask sc taskCmd
runMeta sc (AT.MetaCmdClean cmd) = runMetaCleanTask sc cmd

runMetaTask :: ServerContext -> AT.MetaTaskCommand -> IO Text
runMetaTask ServerContext{..} (AT.MetaTaskGet resType rId) = do
let metaId = mkAllocationKey (getResType resType) rId
pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskAllocationsToTable . L.singleton) =<< M.getMeta @TaskAllocation metaId metaHandle

runMetaCleanTask :: ServerContext -> AT.MetaCleanCommand -> IO Text
runMetaCleanTask ServerContext{..} AT.CleanConnectors = do
activIds <- ST.fromList . map taskIdMeta <$> M.listMeta @TaskIdMeta metaHandle
allTaskMetas <- M.getAllMeta @TaskMeta metaHandle
void $ Map.traverseWithKey (\k _ -> removeMeta k activIds) allTaskMetas
return $ AT.plainResponse "OK"
where
removeMeta key ids = do
when (ST.notMember key ids) $
catch (M.deleteMeta @TaskMeta key Nothing metaHandle) $ \(_ :: ZNONODE) -> return ()

-------------------------------------------------------------------------------
-- Admin Stream Command

Expand Down

0 comments on commit 86f49e7

Please sign in to comment.