Skip to content

Commit

Permalink
kafka: fix merge configs
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Apr 28, 2024
1 parent 128261e commit 80fdd6e
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 18 deletions.
21 changes: 13 additions & 8 deletions hstream-kafka/HStream/Kafka/Server/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ module HStream.Kafka.Server.Config
, advertisedListenersToPB
, StorageOptions (..)
, ExperimentalFeature (..)

, KafkaBrokerConfigs
, mergeBrokerConfigs
, mkKafkaBrokerConfigs
) where

import Control.Exception (throwIO)
import qualified Data.Text as Text
import Data.Yaml (ParseException (..),
decodeFileThrow,
parseEither)
import System.Directory (makeAbsolute)
import Control.Exception (throwIO)
import qualified Data.Text as Text
import Data.Yaml (ParseException (..),
decodeFileThrow,
parseEither)
import System.Directory (makeAbsolute)

import HStream.Common.Types (getHStreamVersion)
import HStream.Common.Types (getHStreamVersion)
import HStream.Kafka.Server.Config.FromCli
import HStream.Kafka.Server.Config.FromJson
import HStream.Kafka.Server.Config.KafkaConfig
import HStream.Kafka.Server.Config.Types
import qualified HStream.Server.HStreamApi as A
import qualified HStream.Server.HStreamApi as A


runServerConfig :: [String] -> (ServerOpts -> IO ()) -> IO ()
Expand Down
9 changes: 1 addition & 8 deletions hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ module HStream.Kafka.Server.Config.FromCli
import qualified Data.Attoparsec.Text as AP
import Data.Bifunctor (second)
import Data.ByteString (ByteString)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Text (Text)
Expand Down Expand Up @@ -301,7 +300,7 @@ experimentalFeatureParser = option parseExperimentalFeature $
long "experimental" <> metavar "ExperimentalFeature"

brokerConfigsParser :: O.Parser KC.KafkaBrokerConfigs
brokerConfigsParser = toKafkaBrokerConfigs . Map.fromList
brokerConfigsParser = KC.mkKafkaBrokerConfigs . Map.fromList
<$> O.many
( O.option propertyReader
( O.long "prop"
Expand All @@ -315,12 +314,6 @@ brokerConfigsParser = toKafkaBrokerConfigs . Map.fromList
let (k, v) = second tail $ span (/= '=') kv
in Right (T.pack k, T.pack v)

toKafkaBrokerConfigs :: Map Text Text -> KC.KafkaBrokerConfigs
toKafkaBrokerConfigs mp =
case KC.mkConfigs (mp Map.!?) of
Left msg -> errorWithoutStackTrace (T.unpack msg)
Right v -> v

-------------------------------------------------------------------------------

parserOpt :: (Text -> Either String a) -> O.Mod O.OptionFields a -> O.Parser a
Expand Down
14 changes: 12 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import qualified Data.Aeson.Key as Y
import qualified Data.Aeson.Text as Y
import Data.Int (Int32)
import Data.List (intercalate)
import qualified Data.Map as Map
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Read as T
Expand Down Expand Up @@ -177,6 +178,12 @@ allBrokerConfigs = V.fromList . Map.elems . dumpConfigs
mergeBrokerConfigs :: KafkaBrokerConfigs -> KafkaBrokerConfigs -> KafkaBrokerConfigs
mergeBrokerConfigs = mergeConfigs

mkKafkaBrokerConfigs :: Map T.Text T.Text -> KafkaBrokerConfigs
mkKafkaBrokerConfigs mp =
case mkConfigs (mp Map.!?) of
Left msg -> errorWithoutStackTrace (T.unpack msg)
Right v -> v

---------------------------------------------------------------------------
-- Config Helpers
---------------------------------------------------------------------------
Expand All @@ -187,6 +194,9 @@ class KafkaConfigs a where
mkConfigs :: Lookup -> Either T.Text a
dumpConfigs :: a -> ConfigMap
defaultConfigs :: a
-- mergeConfig will use the second config to update the first one.
-- value in the second will overwrite the first one.
-- if some value is not be set in the second config, the first one will be used.
mergeConfigs :: a -> a -> a

default mkConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => Lookup -> Either T.Text a
Expand All @@ -213,7 +223,7 @@ instance KafkaConfig c => GKafkaConfigs (G.K1 i c) where
Just textValue -> fromText @c textValue
gdumpConfigs (G.K1 x) = (Map.singleton (name x) (KafkaConfigInstance x))
gdefaultConfigs = G.K1 (defaultConfig @c)
gmergeConfigs (G.K1 x) (G.K1 y) = G.K1 (if isDefaultValue x then y else x)
gmergeConfigs (G.K1 x) (G.K1 y) = G.K1 (if isDefaultValue y then x else y)

instance GKafkaConfigs f => GKafkaConfigs (G.M1 i c f) where
gmkConfigs lk = G.M1 <$> (gmkConfigs lk)
Expand Down
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ test-suite hstream-kafka-test
HStream.Kafka.Common.AuthorizerSpec
HStream.Kafka.Common.OffsetManagerSpec
HStream.Kafka.Common.TestUtils
HStream.Kafka.Common.ConfigSpec

hs-source-dirs: tests
build-depends:
Expand Down
35 changes: 35 additions & 0 deletions hstream-kafka/tests/HStream/Kafka/Common/ConfigSpec.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{-# OPTIONS_GHC -Wno-orphans #-}

module HStream.Kafka.Common.ConfigSpec where

import qualified Data.Map.Strict as M
import HStream.Kafka.Server.Config
import Test.Hspec

spec :: Spec
spec = describe "KafkaConfigTest" $ do

it "mergeConfigs" $ do
let kc1 = mkKafkaBrokerConfigs $ M.fromList
[
("auto.create.topics.enable", "false"),
("num.partitions", "2"),
("offsets.topic.replication.factor", "1")
]
let kc2 = mkKafkaBrokerConfigs $ M.fromList
[
("num.partitions", "3"),
("default.replication.factor", "2"),
("offsets.topic.replication.factor", "2")
]
let expected = mkKafkaBrokerConfigs $ M.fromList
[
-- values not set by kc2 will be set by kc1
("auto.create.topics.enable", "false"),
-- values set by kc2 will overrid values set by kc1
("num.partitions", "3"),
("default.replication.factor", "2"),
("offsets.topic.replication.factor", "2")
]
let kc = mergeBrokerConfigs kc1 kc2
kc `shouldBe` expected

0 comments on commit 80fdd6e

Please sign in to comment.