Skip to content

Commit

Permalink
cli(hadmin-store): support update replicate-across attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Sep 18, 2023
1 parent 5cba616 commit b597fcd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
11 changes: 11 additions & 0 deletions hstream-admin/store/HStream/Admin/Store/Command/Logs.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import Control.Monad (forM_, guard, unless, when,
(>=>))
import Data.Bits (shiftR, (.&.))
import Data.Char (toUpper)
import qualified Data.List as L
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, isJust, isNothing)
import Foreign.ForeignPtr (withForeignPtr)
Expand Down Expand Up @@ -53,12 +54,15 @@ runLogsUpdate conf UpdateLogsOpts{..} = do
case res of
Right loggroup -> S.logGroupGetAttrs loggroup
Left (_ :: S.NOTFOUND) -> S.logDirectoryGetAttrs =<< S.getLogDirectory client updatePath
putStrLn $ "updateReplicateAcross: " <> show updateReplicateAcross
let attrs' =
attrs { logReplicationFactor = maybe logReplicationFactor S.defAttr1 updateReplicationFactor
, logSyncedCopies = maybe logSyncedCopies S.defAttr1 updateSyncedCopies
, logBacklogDuration = maybe logBacklogDuration (S.defAttr1 . Just) updateBacklogDuration
, logReplicateAcross = updateRepAcross updateReplicateAcross logReplicateAcross
, logAttrsExtras = updateExtras `Map.union` logAttrsExtras
}
putStrLn $ "new attrs: " <> show attrs'
attrsPtr <- S.pokeLogAttributes attrs'
withForeignPtr attrsPtr $ S.ldWriteAttributes client updatePath
case res of
Expand All @@ -68,6 +72,12 @@ runLogsUpdate conf UpdateLogsOpts{..} = do
Left (e :: S.SomeHStoreException) ->
putStrLn . formatWith [red] $ "Cannot update attributes for "
<> show updatePath <> " for reason: " <> show e
where
updateRepAcross :: S.ScopeReplicationFactors -> S.Attribute S.ScopeReplicationFactors -> S.Attribute S.ScopeReplicationFactors
updateRepAcross r1 originAttr =
case S.attrValue originAttr of
Just r2 -> S.defAttr1 $ L.unionBy (\(k1,_) (k2,_) -> k1 == k2) r1 r2
Nothing -> S.defAttr1 r1

runLogsInfo :: HeaderConfig AdminAPI -> S.C_LogID -> IO ()
runLogsInfo conf logid = do
Expand Down Expand Up @@ -245,6 +255,7 @@ printLogAttributes level LogAttributes{..} = do
emit $ _SHOW_ATTR(logReplicationFactor)
emit $ _SHOW_ATTR(logSyncedCopies)
emit $ _SHOW_ATTR(logBacklogDuration)
emit $ _SHOW_ATTR(logReplicateAcross)
forM_ (Map.toList logAttrsExtras) $ \(k, v) ->
emit $ unpack k <> ": " <> unpack v
#undef _SHOW_ATTR
Expand Down
34 changes: 34 additions & 0 deletions hstream-admin/store/HStream/Admin/Store/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ data UpdateLogsOpts = UpdateLogsOpts
, updateReplicationFactor :: Maybe Int
, updateSyncedCopies :: Maybe Int
, updateBacklogDuration :: Maybe Int
, updateReplicateAcross :: [(S.NodeLocationScope, Int)]
, updateExtras :: Map.Map CBytes CBytes
} deriving (Show)

Expand Down Expand Up @@ -424,6 +425,11 @@ updateLogsOptsParser = UpdateLogsOpts
<> help ( "Duration that a record can exist in the log before it expires and"
<> "gets deleted (in senconds). Valid value must be at least 1 second.")
))
<*> (many (option parseLogReplicateAcross
( long "replicate-across"
<> metavar "SCOPE:REPLICATE"
<> help "Cross-domain replication. Valid scopes: [node|rack|row|cluster|region|root]"
)))
<*> (Map.fromList <$> many (option parseLogExtraAttr
( long "extra-attributes"
<> metavar "KEY:VALUE"
Expand Down Expand Up @@ -480,6 +486,34 @@ logIDParser = option auto ( long "id"
<> help "the log ID to query"
)

parseLogReplicateAcross :: ReadM (S.NodeLocationScope, Int)
parseLogReplicateAcross = eitherReader $ parse . V.packASCII
where
parse :: Bytes -> Either String (S.NodeLocationScope, Int)
parse bs =
case P.parse' parser bs of
Left er -> Left $ "cannot parse value: " <> show er
Right i -> Right i
parser = do
P.skipSpaces
n <- P.takeTill (== c2w ':')
P.char8 ':'
s <- P.int
P.skipSpaces
return (bytesToNodeLocationScope n, s)

bytesToNodeLocationScope :: Bytes -> S.NodeLocationScope
bytesToNodeLocationScope bs =
case fromBytes bs of
"node" -> S.NodeLocationScope_NODE
"rack" -> S.NodeLocationScope_RACK
"row" -> S.NodeLocationScope_ROW
"cluster" -> S.NodeLocationScope_CLUSTER
"data-center" -> S.NodeLocationScope_DATA_CENTER
"region" -> S.NodeLocationScope_REGION
"root" -> S.NodeLocationScope_ROOT
_ -> S.NodeLocationScope_INVALID

parseLogExtraAttr :: ReadM (CBytes, CBytes)
parseLogExtraAttr = eitherReader $ parse . V.packASCII
where
Expand Down

0 comments on commit b597fcd

Please sign in to comment.