From a67cb5efce77e11653737830783de2b1ff770c22 Mon Sep 17 00:00:00 2001 From: YangKian <1207783292@qq.com> Date: Mon, 29 Apr 2024 11:30:36 +0800 Subject: [PATCH 1/2] hstream-store: add 'scdEnabled', 'localScdEnabled' and 'stickyCopySets' LogAttributes --- .../store/HStream/Admin/Store/Command/Logs.hs | 3 ++ .../Store/Internal/LogDevice/LogAttributes.hs | 47 +++++++++++++++---- .../cbits/logdevice/hs_log_attributes.cpp | 14 ++++-- .../test/HStream/Store/LogDeviceSpec.hs | 14 +++++- 4 files changed, 64 insertions(+), 14 deletions(-) diff --git a/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs b/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs index 7ea2698c5..5babb17a7 100644 --- a/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs +++ b/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs @@ -255,6 +255,9 @@ printLogAttributes level LogAttributes{..} = do emit $ _SHOW_ATTR(logSyncedCopies) emit $ _SHOW_ATTR(logBacklogDuration) emit $ _SHOW_ATTR(logReplicateAcross) + emit $ _SHOW_ATTR(logScdEnabled) + emit $ _SHOW_ATTR(logLocalScdEnabled) + emit $ _SHOW_ATTR(logStickyCopySets) forM_ (Map.toList logAttrsExtras) $ \(k, v) -> emit $ Just $ unpack k <> ": " <> unpack v #undef _SHOW_ATTR diff --git a/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs b/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs index f71c62fd7..e3d81e562 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs @@ -95,15 +95,15 @@ data LogAttributes = LogAttributes -- -- ^ Maximum amount of time to artificially delay delivery of newly written -- -- records (increases delivery latency but improves server and client -- -- performance), in milliseconds. - -- , logScdEnabled :: Attribute Bool - -- -- ^ Indicate whether or not the Single Copy Delivery optimization should be - -- -- used. - -- , logLocalScdEnabled :: Attribute Bool - -- -- ^ Indicate whether or not to use Local Single Copy Delivery. This is - -- -- ignored if scdEnabled_ is false. - -- , logStickyCopySets :: Attribute Bool - -- -- ^ True if copysets on this log should be "sticky". See docblock in - -- -- StickyCopySetManager.h + , logScdEnabled :: Attribute Bool + -- ^ Indicate whether or not the Single Copy Delivery optimization should be + -- used. + , logLocalScdEnabled :: Attribute Bool + -- ^ Indicate whether or not to use Local Single Copy Delivery. This is + -- ignored if scdEnabled_ is false. + , logStickyCopySets :: Attribute Bool + -- ^ True if copysets on this log should be "sticky". See docblock in + -- StickyCopySetManager.h -- , logMutablePerEpochLogMetadataEnabled ::Attribute Bool -- -- ^ If true, write mutable per-epoch metadata along with every data record. -- , logSequencerAffinity :: Attribute (Maybe CBytes) @@ -149,6 +149,9 @@ pokeLogAttributes LogAttributes{..} = withAllocMaybePrim id _ARG(logSyncReplicationScope) withPrimListPairUnsafe _MAYBE_LIST_PAIR_ARG(logReplicateAcross) withAllocMaybePrim2 fromIntegral _MAYBE_ARG(logBacklogDuration) + withAllocMaybePrim bool2cbool _ARG(logScdEnabled) + withAllocMaybePrim bool2cbool _ARG(logLocalScdEnabled) + withAllocMaybePrim bool2cbool _ARG(logStickyCopySets) withHsCBytesMapUnsafe logAttrsExtras $ \l ks vs -> do #define _ARG_TO(name) name##' (attrInherited name) #define _MAYBE_ARG_TO(name) name##_flag name##' (attrInherited name) @@ -160,6 +163,9 @@ pokeLogAttributes LogAttributes{..} = _ARG_TO(logSyncReplicationScope) _MAYBE_LIST_PAIR_TO(logReplicateAcross) _MAYBE_ARG_TO(logBacklogDuration) + _ARG_TO(logScdEnabled) + _ARG_TO(logLocalScdEnabled) + _ARG_TO(logStickyCopySets) l ks vs newForeignPtr free_log_attributes_fun i #undef _ARG @@ -183,7 +189,10 @@ peekLogAttributes ptr = do , ( logSyncReplicationScope , ( logReplicateAcross , ( logBacklogDuration - , _))))))) <- + , ( logScdEnabled + , ( logLocalScdEnabled + , ( logStickyCopySets + , _)))))))))) <- runPeek id $ \_ARG(replicationFactor) -> runPeek id $ \_ARG(syncedCopies) -> runPeek id $ \_ARG(maxWritesInFlight) -> @@ -191,6 +200,9 @@ peekLogAttributes ptr = do runPeek NodeLocationScope $ \_ARG(syncReplicationScope) -> runPeekMaybeListPair replicateAcross_size $ \_MAYBE_LIST_PAIR(replicateAcross) -> runPeekMaybe id $ \_MAYBE_ARG(backlogDuration) -> + runPeek cbool2bool $ \_ARG(scdEnabled) -> + runPeek cbool2bool $ \_ARG(localScdEnabled) -> + runPeek cbool2bool $ \_ARG(stickyCopySets) -> peek_log_attributes ptr _ARG(replicationFactor) @@ -200,6 +212,9 @@ peekLogAttributes ptr = do _ARG(syncReplicationScope) _MAYBE_LIST_PAIR(replicateAcross) _MAYBE_ARG(backlogDuration) + _ARG(scdEnabled) + _ARG(localScdEnabled) + _ARG(stickyCopySets) logAttrsExtras <- peekLogAttributesExtras ptr return LogAttributes{..} #undef _ARG @@ -244,6 +259,12 @@ foreign import ccall unsafe "hs_logdevice.h poke_log_attributes" -- ^ logReplicateAcross -> Bool -> Ptr CInt -> Bool -- ^ logBacklogDuration + -> Ptr CBool -> Bool + -- ^ logScdEnabled + -> Ptr CBool -> Bool + -- ^ logLocalScdEnabled + -> Ptr CBool -> Bool + -- ^ logStickyCopySets -> Int -> BAArray# Word8 -> BAArray# Word8 -- ^ extras -> IO (Ptr LogDeviceLogAttributes) @@ -265,6 +286,12 @@ foreign import ccall unsafe "hs_logdevice.h peek_log_attributes" -- ^ logReplicateAcross -> MBA# CBool -> MBA# CBool -> MBA# Int -> MBA# CBool -- ^ logBacklogDuration + -> MBA# CBool -> MBA# CBool -> MBA# CBool + -- ^ logScdEnabled + -> MBA# CBool -> MBA# CBool -> MBA# CBool + -- ^ logLocalScdEnabled + -> MBA# CBool -> MBA# CBool -> MBA# CBool + -- ^ logStickyCopySets -> IO () foreign import ccall unsafe "hs_logdevice.h free_log_attributes" diff --git a/hstream-store/cbits/logdevice/hs_log_attributes.cpp b/hstream-store/cbits/logdevice/hs_log_attributes.cpp index 762a13d4e..bfe2ffd5d 100644 --- a/hstream-store/cbits/logdevice/hs_log_attributes.cpp +++ b/hstream-store/cbits/logdevice/hs_log_attributes.cpp @@ -17,7 +17,8 @@ poke_log_attributes(_ARG(int, replicationFactor), _ARG(int, syncedCopies), syncReplicationScope), _LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope, HsInt), - _MAYBE_ARG(int, backlogDuration), + _MAYBE_ARG(int, backlogDuration), _ARG(bool, scdEnabled), + _ARG(bool, localScdEnabled), _ARG(bool, stickyCopySets), // HsInt extras_len, StgArrBytes** keys, StgArrBytes** vals) { #undef _ARG @@ -52,6 +53,9 @@ poke_log_attributes(_ARG(int, replicationFactor), _ARG(int, syncedCopies), attrs = attrs.with_replicateAcross(rs); } ADD_MAYBE_ATTR(backlogDuration, std::chrono::seconds, std::chrono::seconds) + ADD_ATTR(scdEnabled) + ADD_ATTR(localScdEnabled) + ADD_ATTR(stickyCopySets) #undef ADD_ATTR #undef ADD_MAYBE_ATTR @@ -79,7 +83,8 @@ void peek_log_attributes( ARG(facebook::logdevice::NodeLocationScope, syncReplicationScope), ARG_LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope, HsInt), - ARG_MAYBE(HsInt, backlogDuration)) + ARG_MAYBE(HsInt, backlogDuration), ARG(bool, scdEnabled), + ARG(bool, localScdEnabled), ARG(bool, stickyCopySets)) #undef ARG #undef ARG_MAYBE #undef ARG_LIST_PAIR @@ -104,7 +109,7 @@ void peek_log_attributes( *name##_inh = attrs->name().isInherited(); \ if (name##_len > 0 && attrs->name().hasValue()) { \ auto& val = attrs->name().value(); \ - for (int i = 0; i < name##_len; i++) { \ + for (int i = 0; i < name##_len; i++) { \ name##_keys[i] = val[i].first; \ name##_vals[i] = val[i].second; \ } \ @@ -117,6 +122,9 @@ void peek_log_attributes( PEEK_LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope, HsInt); PEEK_MAYBE(backlogDuration, .count()); + PEEK(scdEnabled); + PEEK(localScdEnabled); + PEEK(stickyCopySets); #undef PEEK #undef PEEK_MAYBE #undef PEEK_LIST_PAIR diff --git a/hstream-store/test/HStream/Store/LogDeviceSpec.hs b/hstream-store/test/HStream/Store/LogDeviceSpec.hs index a91ed2fd1..77a5ecdd7 100644 --- a/hstream-store/test/HStream/Store/LogDeviceSpec.hs +++ b/hstream-store/test/HStream/Store/LogDeviceSpec.hs @@ -18,6 +18,9 @@ logdirSpec :: Spec logdirSpec = describe "LogDirectory" $ do let attrs = S.def{ I.logReplicationFactor = I.defAttr1 1 , I.logBacklogDuration = I.defAttr1 (Just 60) + , I.logScdEnabled = I.defAttr1 False + , I.logLocalScdEnabled = I.defAttr1 True + , I.logStickyCopySets = I.defAttr1 False , I.logAttrsExtras = Map.fromList [("A", "B")] } @@ -84,6 +87,9 @@ logdirSpec = describe "LogDirectory" $ do I.logReplicationFactor attrs' `shouldBe` I.Attribute (Just 1) True I.logBacklogDuration attrs' `shouldBe` I.Attribute (Just (Just 60)) True Map.lookup "A" (I.logAttrsExtras attrs') `shouldBe` Just "B" + I.logScdEnabled attrs' `shouldBe` I.Attribute (Just False) True + I.logLocalScdEnabled attrs' `shouldBe` I.Attribute (Just True) True + I.logStickyCopySets attrs' `shouldBe` I.Attribute (Just False) True I.syncLogsConfigVersion client =<< I.removeLogDirectory client dirname True loggroupAround' :: SpecWith (CBytes, S.C_LogID) -> Spec @@ -92,9 +98,12 @@ loggroupAround' = , I.logBacklogDuration = I.defAttr1 (Just 60) , I.logSingleWriter = I.defAttr1 True , I.logSyncReplicationScope = I.defAttr1 S.NodeLocationScope_DATA_CENTER + , I.logScdEnabled = I.defAttr1 False + , I.logLocalScdEnabled = I.defAttr1 True + , I.logStickyCopySets = I.defAttr1 False , I.logAttrsExtras = Map.fromList [("A", "B")] } - logid = 104 + logid = 105 logname = "LogDeviceSpec_LogGroupSpec" in loggroupAround logid logname attrs @@ -106,6 +115,9 @@ loggroupSpec = describe "LogGroup" $ loggroupAround' $ parallel $ do I.logReplicationFactor attrs' `shouldBe` I.defAttr1 1 I.logBacklogDuration attrs' `shouldBe` I.defAttr1 (Just 60) I.logSingleWriter attrs' `shouldBe` I.defAttr1 True + I.logScdEnabled attrs' `shouldBe` I.defAttr1 False + I.logLocalScdEnabled attrs' `shouldBe` I.defAttr1 True + I.logStickyCopySets attrs' `shouldBe` I.defAttr1 False I.logSyncReplicationScope attrs' `shouldBe` I.defAttr1 S.NodeLocationScope_DATA_CENTER Map.lookup "A" (I.logAttrsExtras attrs') `shouldBe` Just "B" From 33c37c1573525811ecb8585a559e285f69fcf19b Mon Sep 17 00:00:00 2001 From: YangKian <1207783292@qq.com> Date: Fri, 17 May 2024 11:42:32 +0800 Subject: [PATCH 2/2] kafka: support new log attrs for kafka server --- conf/hstream.yaml | 3 +++ hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs | 3 +++ hstream-kafka/HStream/Kafka/Server/Config/Types.hs | 3 +++ hstream-kafka/HStream/Kafka/Server/Core/Topic.hs | 5 +++++ 4 files changed, 14 insertions(+) diff --git a/conf/hstream.yaml b/conf/hstream.yaml index 4bfe06f94..ad8387959 100644 --- a/conf/hstream.yaml +++ b/conf/hstream.yaml @@ -306,6 +306,9 @@ kafka: # fetch-mode: 1 # TODO: Currently, only mode 1 is supported # fetch-reader-timeout: 50 # 50ms, default timeout of each read, 0 means nonblocking # fetch-maxlen: 1000 # default max size of each read + # scd-enabled: false # enable Single Copy Delivery mode, default is false + # local-scd-enabled: false + # sticky-copysets: false # enable sticky copyset, default is false # Configuration for HStream Store # The configuration for hstore is **Optional**. When the values are not provided, diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs index 26cb87e96..6f16b18d8 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs @@ -108,6 +108,9 @@ parseJSONToOptions CliOptions{..} obj = do storageCfg <- nodeCfgObj .:? "storage" .!= mempty fetchReaderTimeout <- storageCfg .:? "fetch-reader-timeout" .!= 50 fetchMaxLen <- storageCfg .:? "fetch-maxlen" .!= 1000 + scdEnabled <- storageCfg .:? "scd-enabled" .!= False + localScdEnabled <- storageCfg .:? "local-scd-enabled" .!= False + stickyCopysets <- storageCfg .:? "sticky-copysets" .!= False let _storage = StorageOptions{..} -- SASL config diff --git a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs index 9bb0bff52..20844d4ae 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs @@ -227,6 +227,9 @@ parseMetaStoreAddr t = data StorageOptions = StorageOptions { fetchReaderTimeout :: Int , fetchMaxLen :: Int + , scdEnabled :: Bool + , localScdEnabled :: Bool + , stickyCopysets :: Bool } deriving (Show, Eq) data ExperimentalFeature diff --git a/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs b/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs index 782de6c5a..2a3a49036 100644 --- a/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs @@ -21,6 +21,8 @@ import GHC.Stack (HasCallStack) import qualified HStream.Base.Time as BaseTime import qualified HStream.Kafka.Server.Config.KafkaConfig as KC +import HStream.Kafka.Server.Config.Types (ServerOpts (..), + StorageOptions (..)) import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log import qualified HStream.Store as S @@ -53,6 +55,9 @@ createTopic ServerContext{..} name replicationFactor numPartitions configs = do attrs = S.def { S.logReplicationFactor = S.defAttr1 replica , S.logAttrsExtras = extraAttr , S.logBacklogDuration = S.defAttr1 (getBacklogDuration topicConfigs) + , S.logScdEnabled = S.defAttr1 serverOpts._storage.scdEnabled + , S.logLocalScdEnabled = S.defAttr1 serverOpts._storage.localScdEnabled + , S.logStickyCopySets = S.defAttr1 serverOpts._storage.stickyCopysets } try (S.createStream scLDClient streamId attrs) >>= \case Left (e :: SomeException)