Skip to content

Commit

Permalink
hstream-store: add 'scdEnabled', 'localScdEnabled' and 'stickyCopySet…
Browse files Browse the repository at this point in the history
…s' LogAttributes
  • Loading branch information
YangKian authored and 4eUeP committed May 17, 2024
1 parent 16f7cbf commit 4c6a13c
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 14 deletions.
3 changes: 3 additions & 0 deletions hstream-admin/store/HStream/Admin/Store/Command/Logs.hs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ printLogAttributes level LogAttributes{..} = do
emit $ _SHOW_ATTR(logSyncedCopies)
emit $ _SHOW_ATTR(logBacklogDuration)
emit $ _SHOW_ATTR(logReplicateAcross)
emit $ _SHOW_ATTR(logScdEnabled)
emit $ _SHOW_ATTR(logLocalScdEnabled)
emit $ _SHOW_ATTR(logStickyCopySets)
forM_ (Map.toList logAttrsExtras) $ \(k, v) ->
emit $ Just $ unpack k <> ": " <> unpack v
#undef _SHOW_ATTR
Expand Down
47 changes: 37 additions & 10 deletions hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ data LogAttributes = LogAttributes
-- -- ^ Maximum amount of time to artificially delay delivery of newly written
-- -- records (increases delivery latency but improves server and client
-- -- performance), in milliseconds.
-- , logScdEnabled :: Attribute Bool
-- -- ^ Indicate whether or not the Single Copy Delivery optimization should be
-- -- used.
-- , logLocalScdEnabled :: Attribute Bool
-- -- ^ Indicate whether or not to use Local Single Copy Delivery. This is
-- -- ignored if scdEnabled_ is false.
-- , logStickyCopySets :: Attribute Bool
-- -- ^ True if copysets on this log should be "sticky". See docblock in
-- -- StickyCopySetManager.h
, logScdEnabled :: Attribute Bool
-- ^ Indicate whether or not the Single Copy Delivery optimization should be
-- used.
, logLocalScdEnabled :: Attribute Bool
-- ^ Indicate whether or not to use Local Single Copy Delivery. This is
-- ignored if scdEnabled_ is false.
, logStickyCopySets :: Attribute Bool
-- ^ True if copysets on this log should be "sticky". See docblock in
-- StickyCopySetManager.h
-- , logMutablePerEpochLogMetadataEnabled ::Attribute Bool
-- -- ^ If true, write mutable per-epoch metadata along with every data record.
-- , logSequencerAffinity :: Attribute (Maybe CBytes)
Expand Down Expand Up @@ -149,6 +149,9 @@ pokeLogAttributes LogAttributes{..} =
withAllocMaybePrim id _ARG(logSyncReplicationScope)
withPrimListPairUnsafe _MAYBE_LIST_PAIR_ARG(logReplicateAcross)
withAllocMaybePrim2 fromIntegral _MAYBE_ARG(logBacklogDuration)
withAllocMaybePrim bool2cbool _ARG(logScdEnabled)
withAllocMaybePrim bool2cbool _ARG(logLocalScdEnabled)
withAllocMaybePrim bool2cbool _ARG(logStickyCopySets)
withHsCBytesMapUnsafe logAttrsExtras $ \l ks vs -> do
#define _ARG_TO(name) name##' (attrInherited name)
#define _MAYBE_ARG_TO(name) name##_flag name##' (attrInherited name)
Expand All @@ -160,6 +163,9 @@ pokeLogAttributes LogAttributes{..} =
_ARG_TO(logSyncReplicationScope)
_MAYBE_LIST_PAIR_TO(logReplicateAcross)
_MAYBE_ARG_TO(logBacklogDuration)
_ARG_TO(logScdEnabled)
_ARG_TO(logLocalScdEnabled)
_ARG_TO(logStickyCopySets)
l ks vs
newForeignPtr free_log_attributes_fun i
#undef _ARG
Expand All @@ -183,14 +189,20 @@ peekLogAttributes ptr = do
, ( logSyncReplicationScope
, ( logReplicateAcross
, ( logBacklogDuration
, _))))))) <-
, ( logScdEnabled
, ( logLocalScdEnabled
, ( logStickyCopySets
, _)))))))))) <-
runPeek id $ \_ARG(replicationFactor) ->
runPeek id $ \_ARG(syncedCopies) ->
runPeek id $ \_ARG(maxWritesInFlight) ->
runPeek cbool2bool $ \_ARG(singleWriter) ->
runPeek NodeLocationScope $ \_ARG(syncReplicationScope) ->
runPeekMaybeListPair replicateAcross_size $ \_MAYBE_LIST_PAIR(replicateAcross) ->
runPeekMaybe id $ \_MAYBE_ARG(backlogDuration) ->
runPeek cbool2bool $ \_ARG(scdEnabled) ->
runPeek cbool2bool $ \_ARG(localScdEnabled) ->
runPeek cbool2bool $ \_ARG(stickyCopySets) ->
peek_log_attributes
ptr
_ARG(replicationFactor)
Expand All @@ -200,6 +212,9 @@ peekLogAttributes ptr = do
_ARG(syncReplicationScope)
_MAYBE_LIST_PAIR(replicateAcross)
_MAYBE_ARG(backlogDuration)
_ARG(scdEnabled)
_ARG(localScdEnabled)
_ARG(stickyCopySets)
logAttrsExtras <- peekLogAttributesExtras ptr
return LogAttributes{..}
#undef _ARG
Expand Down Expand Up @@ -244,6 +259,12 @@ foreign import ccall unsafe "hs_logdevice.h poke_log_attributes"
-- ^ logReplicateAcross
-> Bool -> Ptr CInt -> Bool
-- ^ logBacklogDuration
-> Ptr CBool -> Bool
-- ^ logScdEnabled
-> Ptr CBool -> Bool
-- ^ logLocalScdEnabled
-> Ptr CBool -> Bool
-- ^ logStickyCopySets
-> Int -> BAArray# Word8 -> BAArray# Word8
-- ^ extras
-> IO (Ptr LogDeviceLogAttributes)
Expand All @@ -265,6 +286,12 @@ foreign import ccall unsafe "hs_logdevice.h peek_log_attributes"
-- ^ logReplicateAcross
-> MBA# CBool -> MBA# CBool -> MBA# Int -> MBA# CBool
-- ^ logBacklogDuration
-> MBA# CBool -> MBA# CBool -> MBA# CBool
-- ^ logScdEnabled
-> MBA# CBool -> MBA# CBool -> MBA# CBool
-- ^ logLocalScdEnabled
-> MBA# CBool -> MBA# CBool -> MBA# CBool
-- ^ logStickyCopySets
-> IO ()

foreign import ccall unsafe "hs_logdevice.h free_log_attributes"
Expand Down
14 changes: 11 additions & 3 deletions hstream-store/cbits/logdevice/hs_log_attributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ poke_log_attributes(_ARG(int, replicationFactor), _ARG(int, syncedCopies),
syncReplicationScope),
_LIST_PAIR(replicateAcross,
facebook::logdevice::NodeLocationScope, HsInt),
_MAYBE_ARG(int, backlogDuration),
_MAYBE_ARG(int, backlogDuration), _ARG(bool, scdEnabled),
_ARG(bool, localScdEnabled), _ARG(bool, stickyCopySets),
//
HsInt extras_len, StgArrBytes** keys, StgArrBytes** vals) {
#undef _ARG
Expand Down Expand Up @@ -52,6 +53,9 @@ poke_log_attributes(_ARG(int, replicationFactor), _ARG(int, syncedCopies),
attrs = attrs.with_replicateAcross(rs);
}
ADD_MAYBE_ATTR(backlogDuration, std::chrono::seconds, std::chrono::seconds)
ADD_ATTR(scdEnabled)
ADD_ATTR(localScdEnabled)
ADD_ATTR(stickyCopySets)
#undef ADD_ATTR
#undef ADD_MAYBE_ATTR

Expand Down Expand Up @@ -79,7 +83,8 @@ void peek_log_attributes(
ARG(facebook::logdevice::NodeLocationScope, syncReplicationScope),
ARG_LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope,
HsInt),
ARG_MAYBE(HsInt, backlogDuration))
ARG_MAYBE(HsInt, backlogDuration), ARG(bool, scdEnabled),
ARG(bool, localScdEnabled), ARG(bool, stickyCopySets))
#undef ARG
#undef ARG_MAYBE
#undef ARG_LIST_PAIR
Expand All @@ -104,7 +109,7 @@ void peek_log_attributes(
*name##_inh = attrs->name().isInherited(); \
if (name##_len > 0 && attrs->name().hasValue()) { \
auto& val = attrs->name().value(); \
for (int i = 0; i < name##_len; i++) { \
for (int i = 0; i < name##_len; i++) { \
name##_keys[i] = val[i].first; \
name##_vals[i] = val[i].second; \
} \
Expand All @@ -117,6 +122,9 @@ void peek_log_attributes(
PEEK_LIST_PAIR(replicateAcross, facebook::logdevice::NodeLocationScope,
HsInt);
PEEK_MAYBE(backlogDuration, .count());
PEEK(scdEnabled);
PEEK(localScdEnabled);
PEEK(stickyCopySets);
#undef PEEK
#undef PEEK_MAYBE
#undef PEEK_LIST_PAIR
Expand Down
14 changes: 13 additions & 1 deletion hstream-store/test/HStream/Store/LogDeviceSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ logdirSpec :: Spec
logdirSpec = describe "LogDirectory" $ do
let attrs = S.def{ I.logReplicationFactor = I.defAttr1 1
, I.logBacklogDuration = I.defAttr1 (Just 60)
, I.logScdEnabled = I.defAttr1 False
, I.logLocalScdEnabled = I.defAttr1 True
, I.logStickyCopySets = I.defAttr1 False
, I.logAttrsExtras = Map.fromList [("A", "B")]
}

Expand Down Expand Up @@ -84,6 +87,9 @@ logdirSpec = describe "LogDirectory" $ do
I.logReplicationFactor attrs' `shouldBe` I.Attribute (Just 1) True
I.logBacklogDuration attrs' `shouldBe` I.Attribute (Just (Just 60)) True
Map.lookup "A" (I.logAttrsExtras attrs') `shouldBe` Just "B"
I.logScdEnabled attrs' `shouldBe` I.Attribute (Just False) True
I.logLocalScdEnabled attrs' `shouldBe` I.Attribute (Just True) True
I.logStickyCopySets attrs' `shouldBe` I.Attribute (Just False) True
I.syncLogsConfigVersion client =<< I.removeLogDirectory client dirname True

loggroupAround' :: SpecWith (CBytes, S.C_LogID) -> Spec
Expand All @@ -92,9 +98,12 @@ loggroupAround' =
, I.logBacklogDuration = I.defAttr1 (Just 60)
, I.logSingleWriter = I.defAttr1 True
, I.logSyncReplicationScope = I.defAttr1 S.NodeLocationScope_DATA_CENTER
, I.logScdEnabled = I.defAttr1 False
, I.logLocalScdEnabled = I.defAttr1 True
, I.logStickyCopySets = I.defAttr1 False
, I.logAttrsExtras = Map.fromList [("A", "B")]
}
logid = 104
logid = 105
logname = "LogDeviceSpec_LogGroupSpec"
in loggroupAround logid logname attrs

Expand All @@ -106,6 +115,9 @@ loggroupSpec = describe "LogGroup" $ loggroupAround' $ parallel $ do
I.logReplicationFactor attrs' `shouldBe` I.defAttr1 1
I.logBacklogDuration attrs' `shouldBe` I.defAttr1 (Just 60)
I.logSingleWriter attrs' `shouldBe` I.defAttr1 True
I.logScdEnabled attrs' `shouldBe` I.defAttr1 False
I.logLocalScdEnabled attrs' `shouldBe` I.defAttr1 True
I.logStickyCopySets attrs' `shouldBe` I.defAttr1 False
I.logSyncReplicationScope attrs' `shouldBe` I.defAttr1 S.NodeLocationScope_DATA_CENTER
Map.lookup "A" (I.logAttrsExtras attrs') `shouldBe` Just "B"

Expand Down

0 comments on commit 4c6a13c

Please sign in to comment.