diff --git a/conf/hstream.yaml b/conf/hstream.yaml index 4bfe06f94..ad8387959 100644 --- a/conf/hstream.yaml +++ b/conf/hstream.yaml @@ -306,6 +306,9 @@ kafka: # fetch-mode: 1 # TODO: Currently, only mode 1 is supported # fetch-reader-timeout: 50 # 50ms, default timeout of each read, 0 means nonblocking # fetch-maxlen: 1000 # default max size of each read + # scd-enabled: false # enable Single Copy Delivery mode, default is false + # local-scd-enabled: false + # sticky-copysets: false # enable sticky copyset, default is false # Configuration for HStream Store # The configuration for hstore is **Optional**. When the values are not provided, diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs index 26cb87e96..6f16b18d8 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs @@ -108,6 +108,9 @@ parseJSONToOptions CliOptions{..} obj = do storageCfg <- nodeCfgObj .:? "storage" .!= mempty fetchReaderTimeout <- storageCfg .:? "fetch-reader-timeout" .!= 50 fetchMaxLen <- storageCfg .:? "fetch-maxlen" .!= 1000 + scdEnabled <- storageCfg .:? "scd-enabled" .!= False + localScdEnabled <- storageCfg .:? "local-scd-enabled" .!= False + stickyCopysets <- storageCfg .:? "sticky-copysets" .!= False let _storage = StorageOptions{..} -- SASL config diff --git a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs index 9bb0bff52..20844d4ae 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs @@ -227,6 +227,9 @@ parseMetaStoreAddr t = data StorageOptions = StorageOptions { fetchReaderTimeout :: Int , fetchMaxLen :: Int + , scdEnabled :: Bool + , localScdEnabled :: Bool + , stickyCopysets :: Bool } deriving (Show, Eq) data ExperimentalFeature diff --git a/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs b/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs index 782de6c5a..2a3a49036 100644 --- a/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs @@ -21,6 +21,8 @@ import GHC.Stack (HasCallStack) import qualified HStream.Base.Time as BaseTime import qualified HStream.Kafka.Server.Config.KafkaConfig as KC +import HStream.Kafka.Server.Config.Types (ServerOpts (..), + StorageOptions (..)) import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log import qualified HStream.Store as S @@ -53,6 +55,9 @@ createTopic ServerContext{..} name replicationFactor numPartitions configs = do attrs = S.def { S.logReplicationFactor = S.defAttr1 replica , S.logAttrsExtras = extraAttr , S.logBacklogDuration = S.defAttr1 (getBacklogDuration topicConfigs) + , S.logScdEnabled = S.defAttr1 serverOpts._storage.scdEnabled + , S.logLocalScdEnabled = S.defAttr1 serverOpts._storage.localScdEnabled + , S.logStickyCopySets = S.defAttr1 serverOpts._storage.stickyCopysets } try (S.createStream scLDClient streamId attrs) >>= \case Left (e :: SomeException)