diff --git a/hstream-kafka/HStream/Kafka/Server/Config.hs b/hstream-kafka/HStream/Kafka/Server/Config.hs index 85dbedd64..758a3d237 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config.hs @@ -15,20 +15,25 @@ module HStream.Kafka.Server.Config , advertisedListenersToPB , StorageOptions (..) , ExperimentalFeature (..) + + , KafkaBrokerConfigs + , mergeBrokerConfigs + , mkKafkaBrokerConfigs ) where -import Control.Exception (throwIO) -import qualified Data.Text as Text -import Data.Yaml (ParseException (..), - decodeFileThrow, - parseEither) -import System.Directory (makeAbsolute) +import Control.Exception (throwIO) +import qualified Data.Text as Text +import Data.Yaml (ParseException (..), + decodeFileThrow, + parseEither) +import System.Directory (makeAbsolute) -import HStream.Common.Types (getHStreamVersion) +import HStream.Common.Types (getHStreamVersion) import HStream.Kafka.Server.Config.FromCli import HStream.Kafka.Server.Config.FromJson +import HStream.Kafka.Server.Config.KafkaConfig import HStream.Kafka.Server.Config.Types -import qualified HStream.Server.HStreamApi as A +import qualified HStream.Server.HStreamApi as A runServerConfig :: [String] -> (ServerOpts -> IO ()) -> IO () diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs index 7bd4f05c6..a76c97732 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs @@ -17,7 +17,6 @@ module HStream.Kafka.Server.Config.FromCli import qualified Data.Attoparsec.Text as AP import Data.Bifunctor (second) import Data.ByteString (ByteString) -import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import qualified Data.Set as Set import Data.Text (Text) @@ -301,7 +300,7 @@ experimentalFeatureParser = option parseExperimentalFeature $ long "experimental" <> metavar "ExperimentalFeature" brokerConfigsParser :: O.Parser KC.KafkaBrokerConfigs -brokerConfigsParser = toKafkaBrokerConfigs . Map.fromList +brokerConfigsParser = KC.mkKafkaBrokerConfigs . Map.fromList <$> O.many ( O.option propertyReader ( O.long "prop" @@ -315,12 +314,6 @@ brokerConfigsParser = toKafkaBrokerConfigs . Map.fromList let (k, v) = second tail $ span (/= '=') kv in Right (T.pack k, T.pack v) - toKafkaBrokerConfigs :: Map Text Text -> KC.KafkaBrokerConfigs - toKafkaBrokerConfigs mp = - case KC.mkConfigs (mp Map.!?) of - Left msg -> errorWithoutStackTrace (T.unpack msg) - Right v -> v - ------------------------------------------------------------------------------- parserOpt :: (Text -> Either String a) -> O.Mod O.OptionFields a -> O.Parser a diff --git a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs index 1555632cb..7ed0974bc 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs @@ -9,7 +9,8 @@ import qualified Data.Aeson.Key as Y import qualified Data.Aeson.Text as Y import Data.Int (Int32) import Data.List (intercalate) -import qualified Data.Map as Map +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map import qualified Data.Text as T import qualified Data.Text.Lazy as TL import qualified Data.Text.Read as T @@ -177,6 +178,12 @@ allBrokerConfigs = V.fromList . Map.elems . dumpConfigs mergeBrokerConfigs :: KafkaBrokerConfigs -> KafkaBrokerConfigs -> KafkaBrokerConfigs mergeBrokerConfigs = mergeConfigs +mkKafkaBrokerConfigs :: Map T.Text T.Text -> KafkaBrokerConfigs +mkKafkaBrokerConfigs mp = + case mkConfigs (mp Map.!?) of + Left msg -> errorWithoutStackTrace (T.unpack msg) + Right v -> v + --------------------------------------------------------------------------- -- Config Helpers --------------------------------------------------------------------------- @@ -187,6 +194,9 @@ class KafkaConfigs a where mkConfigs :: Lookup -> Either T.Text a dumpConfigs :: a -> ConfigMap defaultConfigs :: a + -- mergeConfig will use the second config to update the first one. + -- value in the second will overwrite the first one. + -- if some value is not be set in the second config, the first one will be used. mergeConfigs :: a -> a -> a default mkConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => Lookup -> Either T.Text a @@ -213,7 +223,7 @@ instance KafkaConfig c => GKafkaConfigs (G.K1 i c) where Just textValue -> fromText @c textValue gdumpConfigs (G.K1 x) = (Map.singleton (name x) (KafkaConfigInstance x)) gdefaultConfigs = G.K1 (defaultConfig @c) - gmergeConfigs (G.K1 x) (G.K1 y) = G.K1 (if isDefaultValue x then y else x) + gmergeConfigs (G.K1 x) (G.K1 y) = G.K1 (if isDefaultValue y then x else y) instance GKafkaConfigs f => GKafkaConfigs (G.M1 i c f) where gmkConfigs lk = G.M1 <$> (gmkConfigs lk) diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 21596a8b0..31eef9df9 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -264,6 +264,7 @@ test-suite hstream-kafka-test HStream.Kafka.Common.AuthorizerSpec HStream.Kafka.Common.OffsetManagerSpec HStream.Kafka.Common.TestUtils + HStream.Kafka.Common.ConfigSpec hs-source-dirs: tests build-depends: diff --git a/hstream-kafka/tests/HStream/Kafka/Common/ConfigSpec.hs b/hstream-kafka/tests/HStream/Kafka/Common/ConfigSpec.hs new file mode 100644 index 000000000..2cc3aa93e --- /dev/null +++ b/hstream-kafka/tests/HStream/Kafka/Common/ConfigSpec.hs @@ -0,0 +1,35 @@ +{-# OPTIONS_GHC -Wno-orphans #-} + +module HStream.Kafka.Common.ConfigSpec where + +import qualified Data.Map.Strict as M +import HStream.Kafka.Server.Config +import Test.Hspec + +spec :: Spec +spec = describe "KafkaConfigTest" $ do + + it "mergeConfigs" $ do + let kc1 = mkKafkaBrokerConfigs $ M.fromList + [ + ("auto.create.topics.enable", "false"), + ("num.partitions", "2"), + ("offsets.topic.replication.factor", "1") + ] + let kc2 = mkKafkaBrokerConfigs $ M.fromList + [ + ("num.partitions", "3"), + ("default.replication.factor", "2"), + ("offsets.topic.replication.factor", "2") + ] + let expected = mkKafkaBrokerConfigs $ M.fromList + [ + -- values not set by kc2 will be set by kc1 + ("auto.create.topics.enable", "false"), + -- values set by kc2 will overrid values set by kc1 + ("num.partitions", "3"), + ("default.replication.factor", "2"), + ("offsets.topic.replication.factor", "2") + ] + let kc = mergeBrokerConfigs kc1 kc2 + kc `shouldBe` expected