From 4c6a13cc9967568c3e94c52c2e9dbc5b5ea4870b Mon Sep 17 00:00:00 2001 From: YangKian <1207783292@qq.com> Date: Mon, 29 Apr 2024 11:30:36 +0800 Subject: [PATCH] hstream-store: add 'scdEnabled', 'localScdEnabled' and 'stickyCopySets' LogAttributes --- .../store/HStream/Admin/Store/Command/Logs.hs | 3 ++ .../Store/Internal/LogDevice/LogAttributes.hs | 47 +++++++++++++++---- .../cbits/logdevice/hs_log_attributes.cpp | 14 ++++-- .../test/HStream/Store/LogDeviceSpec.hs | 14 +++++- 4 files changed, 64 insertions(+), 14 deletions(-) diff --git a/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs b/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs index 7ea2698c5..5babb17a7 100644 --- a/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs +++ b/hstream-admin/store/HStream/Admin/Store/Command/Logs.hs @@ -255,6 +255,9 @@ printLogAttributes level LogAttributes{..} = do emit $ _SHOW_ATTR(logSyncedCopies) emit $ _SHOW_ATTR(logBacklogDuration) emit $ _SHOW_ATTR(logReplicateAcross) + emit $ _SHOW_ATTR(logScdEnabled) + emit $ _SHOW_ATTR(logLocalScdEnabled) + emit $ _SHOW_ATTR(logStickyCopySets) forM_ (Map.toList logAttrsExtras) $ \(k, v) -> emit $ Just $ unpack k <> ": " <> unpack v #undef _SHOW_ATTR diff --git a/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs b/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs index f71c62fd7..e3d81e562 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs @@ -95,15 +95,15 @@ data LogAttributes = LogAttributes -- -- ^ Maximum amount of time to artificially delay delivery of newly written -- -- records (increases delivery latency but improves server and client -- -- performance), in milliseconds. - -- , logScdEnabled :: Attribute Bool - -- -- ^ Indicate whether or not the Single Copy Delivery optimization should be - -- -- used. - -- , logLocalScdEnabled :: Attribute Bool - -- -- ^ Indicate whether or not to use Local Single Copy Delivery. This is - -- -- ignored if scdEnabled_ is false. - -- , logStickyCopySets :: Attribute Bool - -- -- ^ True if copysets on this log should be "sticky". See docblock in - -- -- StickyCopySetManager.h + , logScdEnabled :: Attribute Bool + -- ^ Indicate whether or not the Single Copy Delivery optimization should be + -- used. + , logLocalScdEnabled :: Attribute Bool + -- ^ Indicate whether or not to use Local Single Copy Delivery. This is + -- ignored if scdEnabled_ is false. + , logStickyCopySets :: Attribute Bool + -- ^ True if copysets on this log should be "sticky". See docblock in + -- StickyCopySetManager.h -- , logMutablePerEpochLogMetadataEnabled ::Attribute Bool -- -- ^ If true, write mutable per-epoch metadata along with every data record. -- , logSequencerAffinity :: Attribute (Maybe CBytes) @@ -149,6 +149,9 @@ pokeLogAttributes LogAttributes{..} = withAllocMaybePrim id _ARG(logSyncReplicationScope) withPrimListPairUnsafe _MAYBE_LIST_PAIR_ARG(logReplicateAcross) withAllocMaybePrim2 fromIntegral _MAYBE_ARG(logBacklogDuration) + withAllocMaybePrim bool2cbool _ARG(logScdEnabled) + withAllocMaybePrim bool2cbool _ARG(logLocalScdEnabled) + withAllocMaybePrim bool2cbool _ARG(logStickyCopySets) withHsCBytesMapUnsafe logAttrsExtras $ \l ks vs -> do #define _ARG_TO(name) name##' (attrInherited name) #define _MAYBE_ARG_TO(name) name##_flag name##' (attrInherited name) @@ -160,6 +163,9 @@ pokeLogAttributes LogAttributes{..} = _ARG_TO(logSyncReplicationScope) _MAYBE_LIST_PAIR_TO(logReplicateAcross) _MAYBE_ARG_TO(logBacklogDuration) + _ARG_TO(logScdEnabled) + _ARG_TO(logLocalScdEnabled) + _ARG_TO(logStickyCopySets) l ks vs newForeignPtr free_log_attributes_fun i #undef _ARG @@ -183,7 +189,10 @@ peekLogAttributes ptr = do , ( logSyncReplicationScope , ( logReplicateAcross , ( logBacklogDuration - , _))))))) <- + , ( logScdEnabled + , ( logLocalScdEnabled + , ( logStickyCopySets + , _)))))))))) <- runPeek id $ \_ARG(replicationFactor) -> runPeek id $ \_ARG(syncedCopies) -> runPeek id $ \_ARG(maxWritesInFlight) -> @@ -191,6 +200,9 @@ peekLogAttributes ptr = do runPeek NodeLocationScope $ \_ARG(syncReplicationScope) -> runPeekMaybeListPair replicateAcross_size $ \_MAYBE_LIST_PAIR(replicateAcross) -> runPeekMaybe id $ \_MAYBE_ARG(backlogDuration) -> + runPeek cbool2bool $ \_ARG(scdEnabled) -> + runPeek cbool2bool $ \_ARG(localScdEnabled) -> + runPeek cbool2bool $ \_ARG(stickyCopySets) -> peek_log_attributes ptr _ARG(replicationFactor) @@ -200,6 +212,9 @@ peekLogAttributes ptr = do _ARG(syncReplicationScope) _MAYBE_LIST_PAIR(replicateAcross) _MAYBE_ARG(backlogDuration) + _ARG(scdEnabled) + _ARG(localScdEnabled) + _ARG(stickyCopySets) logAttrsExtras <- peekLogAttributesExtras ptr return LogAttributes{..} #undef _ARG @@ -244,6 +259,12 @@ foreign import ccall unsafe "hs_logdevice.h poke_log_attributes" -- ^ logReplicateAcross -> Bool -> Ptr CInt -> Bool -- ^ logBacklogDuration + -> Ptr CBool -> Bool + -- ^ logScdEnabled + -> Ptr CBool -> Bool + -- ^ logLocalScdEnabled + -> Ptr CBool -> Bool + -- ^ logStickyCopySets -> Int -> BAArray# Word8 -> BAArray# Word8 -- ^ extras -> IO (Ptr LogDeviceLogAttributes) @@ -265,6 +286,12 @@ foreign import ccall unsafe "hs_logdevice.h peek_log_attributes" -- ^ logReplicateAcross -> MBA# CBool -> MBA# CBool -> MBA# Int -> MBA# CBool -- ^ logBacklogDuration + -> MBA# CBool -> MBA# CBool -> MBA# CBool + -- ^ logScdEnabled + -> MBA# CBool -> MBA# CBool -> MBA# CBool + -- ^ logLocalScdEnabled + -> MBA# CBool -> MBA# CBool -> MBA# CBool + -- ^ logStickyCopySets -> IO () foreign import ccall unsafe "hs_logdevice.h free_log_attributes" diff --git a/hstream-store/cbits/logdevice/hs_log_attributes.cpp b/hstream-store/cbits/logdevice/hs_log_attributes.cpp index 762a13d4e..bfe2ffd5d 100644 --- a/hstream-store/cbits/logdevice/hs_log_attributes.cpp +++ b/hstream-store/cbits/logdevice/hs_log_attributes.cpp @@ -17,7 +17,8 @@ poke_log_attributes(_ARG(int, replicationFactor), _ARG(int, syncedCopies), syncReplicationScope), _LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope, HsInt), - _MAYBE_ARG(int, backlogDuration), + _MAYBE_ARG(int, backlogDuration), _ARG(bool, scdEnabled), + _ARG(bool, localScdEnabled), _ARG(bool, stickyCopySets), // HsInt extras_len, StgArrBytes** keys, StgArrBytes** vals) { #undef _ARG @@ -52,6 +53,9 @@ poke_log_attributes(_ARG(int, replicationFactor), _ARG(int, syncedCopies), attrs = attrs.with_replicateAcross(rs); } ADD_MAYBE_ATTR(backlogDuration, std::chrono::seconds, std::chrono::seconds) + ADD_ATTR(scdEnabled) + ADD_ATTR(localScdEnabled) + ADD_ATTR(stickyCopySets) #undef ADD_ATTR #undef ADD_MAYBE_ATTR @@ -79,7 +83,8 @@ void peek_log_attributes( ARG(facebook::logdevice::NodeLocationScope, syncReplicationScope), ARG_LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope, HsInt), - ARG_MAYBE(HsInt, backlogDuration)) + ARG_MAYBE(HsInt, backlogDuration), ARG(bool, scdEnabled), + ARG(bool, localScdEnabled), ARG(bool, stickyCopySets)) #undef ARG #undef ARG_MAYBE #undef ARG_LIST_PAIR @@ -104,7 +109,7 @@ void peek_log_attributes( *name##_inh = attrs->name().isInherited(); \ if (name##_len > 0 && attrs->name().hasValue()) { \ auto& val = attrs->name().value(); \ - for (int i = 0; i < name##_len; i++) { \ + for (int i = 0; i < name##_len; i++) { \ name##_keys[i] = val[i].first; \ name##_vals[i] = val[i].second; \ } \ @@ -117,6 +122,9 @@ void peek_log_attributes( PEEK_LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope, HsInt); PEEK_MAYBE(backlogDuration, .count()); + PEEK(scdEnabled); + PEEK(localScdEnabled); + PEEK(stickyCopySets); #undef PEEK #undef PEEK_MAYBE #undef PEEK_LIST_PAIR diff --git a/hstream-store/test/HStream/Store/LogDeviceSpec.hs b/hstream-store/test/HStream/Store/LogDeviceSpec.hs index a91ed2fd1..77a5ecdd7 100644 --- a/hstream-store/test/HStream/Store/LogDeviceSpec.hs +++ b/hstream-store/test/HStream/Store/LogDeviceSpec.hs @@ -18,6 +18,9 @@ logdirSpec :: Spec logdirSpec = describe "LogDirectory" $ do let attrs = S.def{ I.logReplicationFactor = I.defAttr1 1 , I.logBacklogDuration = I.defAttr1 (Just 60) + , I.logScdEnabled = I.defAttr1 False + , I.logLocalScdEnabled = I.defAttr1 True + , I.logStickyCopySets = I.defAttr1 False , I.logAttrsExtras = Map.fromList [("A", "B")] } @@ -84,6 +87,9 @@ logdirSpec = describe "LogDirectory" $ do I.logReplicationFactor attrs' `shouldBe` I.Attribute (Just 1) True I.logBacklogDuration attrs' `shouldBe` I.Attribute (Just (Just 60)) True Map.lookup "A" (I.logAttrsExtras attrs') `shouldBe` Just "B" + I.logScdEnabled attrs' `shouldBe` I.Attribute (Just False) True + I.logLocalScdEnabled attrs' `shouldBe` I.Attribute (Just True) True + I.logStickyCopySets attrs' `shouldBe` I.Attribute (Just False) True I.syncLogsConfigVersion client =<< I.removeLogDirectory client dirname True loggroupAround' :: SpecWith (CBytes, S.C_LogID) -> Spec @@ -92,9 +98,12 @@ loggroupAround' = , I.logBacklogDuration = I.defAttr1 (Just 60) , I.logSingleWriter = I.defAttr1 True , I.logSyncReplicationScope = I.defAttr1 S.NodeLocationScope_DATA_CENTER + , I.logScdEnabled = I.defAttr1 False + , I.logLocalScdEnabled = I.defAttr1 True + , I.logStickyCopySets = I.defAttr1 False , I.logAttrsExtras = Map.fromList [("A", "B")] } - logid = 104 + logid = 105 logname = "LogDeviceSpec_LogGroupSpec" in loggroupAround logid logname attrs @@ -106,6 +115,9 @@ loggroupSpec = describe "LogGroup" $ loggroupAround' $ parallel $ do I.logReplicationFactor attrs' `shouldBe` I.defAttr1 1 I.logBacklogDuration attrs' `shouldBe` I.defAttr1 (Just 60) I.logSingleWriter attrs' `shouldBe` I.defAttr1 True + I.logScdEnabled attrs' `shouldBe` I.defAttr1 False + I.logLocalScdEnabled attrs' `shouldBe` I.defAttr1 True + I.logStickyCopySets attrs' `shouldBe` I.defAttr1 False I.logSyncReplicationScope attrs' `shouldBe` I.defAttr1 S.NodeLocationScope_DATA_CENTER Map.lookup "A" (I.logAttrsExtras attrs') `shouldBe` Just "B"