diff --git a/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs b/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs index e35d49f64..ab35b38b1 100644 --- a/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs +++ b/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs @@ -11,6 +11,7 @@ import Control.Monad (forM_, guard, unless, when, (>=>)) import Data.Bits (shiftR, (.&.)) import Data.Char (toUpper) +import qualified Data.List as L import qualified Data.Map.Strict as Map import Data.Maybe (fromJust, isJust, isNothing) import Foreign.ForeignPtr (withForeignPtr) @@ -53,12 +54,15 @@ runLogsUpdate conf UpdateLogsOpts{..} = do case res of Right loggroup -> S.logGroupGetAttrs loggroup Left (_ :: S.NOTFOUND) -> S.logDirectoryGetAttrs =<< S.getLogDirectory client updatePath + putStrLn $ "updateReplicateAcross: " <> show updateReplicateAcross let attrs' = attrs { logReplicationFactor = maybe logReplicationFactor S.defAttr1 updateReplicationFactor , logSyncedCopies = maybe logSyncedCopies S.defAttr1 updateSyncedCopies , logBacklogDuration = maybe logBacklogDuration (S.defAttr1 . Just) updateBacklogDuration + , logReplicateAcross = updateRepAcross updateReplicateAcross logReplicateAcross , logAttrsExtras = updateExtras `Map.union` logAttrsExtras } + putStrLn $ "new attrs: " <> show attrs' attrsPtr <- S.pokeLogAttributes attrs' withForeignPtr attrsPtr $ S.ldWriteAttributes client updatePath case res of @@ -68,6 +72,12 @@ runLogsUpdate conf UpdateLogsOpts{..} = do Left (e :: S.SomeHStoreException) -> putStrLn . formatWith [red] $ "Cannot update attributes for " <> show updatePath <> " for reason: " <> show e + where + updateRepAcross :: S.ScopeReplicationFactors -> S.Attribute S.ScopeReplicationFactors -> S.Attribute S.ScopeReplicationFactors + updateRepAcross r1 originAttr = + case S.attrValue originAttr of + Just r2 -> S.defAttr1 $ L.unionBy (\(k1,_) (k2,_) -> k1 == k2) r1 r2 + Nothing -> S.defAttr1 r1 runLogsInfo :: HeaderConfig AdminAPI -> S.C_LogID -> IO () runLogsInfo conf logid = do @@ -245,6 +255,7 @@ printLogAttributes level LogAttributes{..} = do emit $ _SHOW_ATTR(logReplicationFactor) emit $ _SHOW_ATTR(logSyncedCopies) emit $ _SHOW_ATTR(logBacklogDuration) + emit $ _SHOW_ATTR(logReplicateAcross) forM_ (Map.toList logAttrsExtras) $ \(k, v) -> emit $ unpack k <> ": " <> unpack v #undef _SHOW_ATTR diff --git a/hstream-admin/store/HStream/Admin/Store/Types.hs b/hstream-admin/store/HStream/Admin/Store/Types.hs index abeb7b0dc..d9b297351 100644 --- a/hstream-admin/store/HStream/Admin/Store/Types.hs +++ b/hstream-admin/store/HStream/Admin/Store/Types.hs @@ -393,6 +393,7 @@ data UpdateLogsOpts = UpdateLogsOpts , updateReplicationFactor :: Maybe Int , updateSyncedCopies :: Maybe Int , updateBacklogDuration :: Maybe Int + , updateReplicateAcross :: [(S.NodeLocationScope, Int)] , updateExtras :: Map.Map CBytes CBytes } deriving (Show) @@ -424,6 +425,11 @@ updateLogsOptsParser = UpdateLogsOpts <> help ( "Duration that a record can exist in the log before it expires and" <> "gets deleted (in senconds). Valid value must be at least 1 second.") )) + <*> (many (option parseLogReplicateAcross + ( long "replicate-across" + <> metavar "SCOPE:REPLICATE" + <> help "Cross-domain replication. Valid scopes: [node|rack|row|cluster|region|root]" + ))) <*> (Map.fromList <$> many (option parseLogExtraAttr ( long "extra-attributes" <> metavar "KEY:VALUE" @@ -480,6 +486,34 @@ logIDParser = option auto ( long "id" <> help "the log ID to query" ) +parseLogReplicateAcross :: ReadM (S.NodeLocationScope, Int) +parseLogReplicateAcross = eitherReader $ parse . V.packASCII + where + parse :: Bytes -> Either String (S.NodeLocationScope, Int) + parse bs = + case P.parse' parser bs of + Left er -> Left $ "cannot parse value: " <> show er + Right i -> Right i + parser = do + P.skipSpaces + n <- P.takeTill (== c2w ':') + P.char8 ':' + s <- P.int + P.skipSpaces + return (bytesToNodeLocationScope n, s) + + bytesToNodeLocationScope :: Bytes -> S.NodeLocationScope + bytesToNodeLocationScope bs = + case fromBytes bs of + "node" -> S.NodeLocationScope_NODE + "rack" -> S.NodeLocationScope_RACK + "row" -> S.NodeLocationScope_ROW + "cluster" -> S.NodeLocationScope_CLUSTER + "data-center" -> S.NodeLocationScope_DATA_CENTER + "region" -> S.NodeLocationScope_REGION + "root" -> S.NodeLocationScope_ROOT + _ -> S.NodeLocationScope_INVALID + parseLogExtraAttr :: ReadM (CBytes, CBytes) parseLogExtraAttr = eitherReader $ parse . V.packASCII where