From 577dd183e62b71f79617347f75b1c40bca5be072 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Mon, 20 May 2024 13:53:32 +0800 Subject: [PATCH] Experimental SpareOffset 4(end): argument to enable (#1817) Argument `--experimental sparse-offset` is added to enable the feature of spare offset. --- .../HStream/Kafka/Server/Config/Types.hs | 6 +- .../HStream/Kafka/Server/Handler.hsc | 59 ++++++++++++++++++- hstream/app/lib/KafkaServer.hs | 30 +++++++--- 3 files changed, 83 insertions(+), 12 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs index 20844d4ae..6e4c68324 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs @@ -234,9 +234,11 @@ data StorageOptions = StorageOptions data ExperimentalFeature = ExperimentalCppServer + | ExperimentalSparseOffset deriving (Show, Eq) parseExperimentalFeature :: O.ReadM ExperimentalFeature parseExperimentalFeature = O.eitherReader $ \case - "cpp" -> Right ExperimentalCppServer - x -> Left $ "cannot parse experimental feature: " <> x + "cpp" -> Right ExperimentalCppServer + "sparse-offset" -> Right ExperimentalSparseOffset + x -> Left $ "cannot parse experimental feature: " <> x diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index d9bca1f1a..a71747ce4 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -3,6 +3,7 @@ module HStream.Kafka.Server.Handler ( handlers + , sparseOffsetHandlers , unAuthedHandlers ) where @@ -13,10 +14,13 @@ import HStream.Kafka.Server.Handler.Group import HStream.Kafka.Server.Handler.Offset import HStream.Kafka.Server.Handler.Produce import HStream.Kafka.Server.Handler.Security +import HStream.Kafka.Server.Handler.SparseOffset.Consume +import HStream.Kafka.Server.Handler.SparseOffset.Offset +import HStream.Kafka.Server.Handler.SparseOffset.Produce import HStream.Kafka.Server.Handler.Topic -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified Kafka.Protocol.Message as K -import qualified Kafka.Protocol.Service as K +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K ------------------------------------------------------------------------------- @@ -93,6 +97,11 @@ import qualified Kafka.Protocol.Service as K #cv_handler CreateAcls, 0, 0 #cv_handler DeleteAcls, 0, 0 +-- SparseOffset +#cv_handler ListOffsets, 0, 2, SparseOffset +#cv_handler Produce, 0, 7, SparseOffset +#cv_handler Fetch, 0, 6, SparseOffset + handlers :: ServerContext -> [K.ServiceHandler] handlers sc = [ #mk_handler ApiVersions, 0, 3 @@ -142,3 +151,47 @@ unAuthedHandlers sc = [ #mk_handler ApiVersions, 0, 3 , #mk_handler SaslHandshake, 0, 1 ] + +sparseOffsetHandlers :: ServerContext -> [K.ServiceHandler] +sparseOffsetHandlers sc = + [ #mk_handler ApiVersions, 0, 3 + , #mk_handler ListOffsets, 0, 2, SparseOffset + , #mk_handler Metadata, 0, 5 + -- Write + , #mk_handler Produce, 0, 7, SparseOffset + , #mk_handler InitProducerId, 0, 0 + -- Read + , #mk_handler Fetch, 0, 6, SparseOffset + + , #mk_handler FindCoordinator, 0, 1 + + , #mk_handler CreateTopics, 0, 2 + , #mk_handler DeleteTopics, 0, 1 + , #mk_handler CreatePartitions, 0, 1 + + -- Group + , #mk_handler JoinGroup, 0, 2 + , #mk_handler SyncGroup, 0, 1 + , #mk_handler LeaveGroup, 0, 1 + , #mk_handler Heartbeat, 0, 1 + , #mk_handler ListGroups, 0, 1 + , #mk_handler DescribeGroups, 0, 1 + + , #mk_handler OffsetCommit, 0, 3 + , #mk_handler OffsetFetch, 0, 3 + + -- configs + , #mk_handler DescribeConfigs, 0, 0 + + -- Sasl + , #mk_handler SaslHandshake, 0, 1, AfterAuth + , #mk_handler SaslAuthenticate, 0, 0 + + -- For hstream + , #mk_handler HadminCommand, 0, 0 + + -- ACL + , #mk_handler DescribeAcls, 0, 0 + , #mk_handler CreateAcls, 0, 0 + , #mk_handler DeleteAcls, 0, 0 + ] diff --git a/hstream/app/lib/KafkaServer.hs b/hstream/app/lib/KafkaServer.hs index 5d926fd61..732de1a98 100644 --- a/hstream/app/lib/KafkaServer.hs +++ b/hstream/app/lib/KafkaServer.hs @@ -129,7 +129,8 @@ app config@ServerOpts{..} = do -- Experimental features let usingCppServer = ExperimentalCppServer `elem` experimentalFeatures - Async.withAsync (serve serverContext netOpts usingCppServer) $ \a -> do + usingSparseOffset = ExperimentalSparseOffset `elem` experimentalFeatures + Async.withAsync (serve serverContext netOpts usingCppServer usingSparseOffset) $ \a -> do -- start gossip a1 <- startGossip _serverHost gossipContext Async.link2Only (const True) a a1 @@ -140,6 +141,7 @@ app config@ServerOpts{..} = do _serverAdvertisedListeners _listenersSecurityProtocolMap usingCppServer + usingSparseOffset forM_ as (Async.link2Only (const True) a) -- wait the default server waitGossipBoot gossipContext @@ -153,8 +155,10 @@ app config@ServerOpts{..} = do serve :: ServerContext -> K.ServerOptions -> Bool -- ^ ExperimentalFeature: ExperimentalCppServer + -> Bool + -- ^ ExperimentalFeature: ExperimentalSparseOffset -> IO () -serve sc@ServerContext{..} netOpts usingCppServer = do +serve sc@ServerContext{..} netOpts usingCppServer usingSparseOffset = do Log.i "************************" hPutStrLn stderr banner Log.i "************************" @@ -190,10 +194,15 @@ serve sc@ServerContext{..} netOpts usingCppServer = do Log.info $ "Starting" <> if isJust (K.serverSslOptions netOpts') then " secure " else " insecure " <> "kafka server..." + handlers <- if usingSparseOffset + then do + Log.warning "Using a experimental feature: SparseOffset" + pure K.sparseOffsetHandlers + else pure K.handlers if usingCppServer then do Log.warning "Using a still-in-development c++ kafka server!" - K.runCppServer netOpts' sc K.handlers - else K.runHsServer netOpts' sc K.unAuthedHandlers K.handlers + K.runCppServer netOpts' sc handlers + else K.runHsServer netOpts' sc K.unAuthedHandlers handlers where exceptionHandlers = [ Handler $ \(_ :: HE.RQLiteRowNotFound) -> return () @@ -208,10 +217,12 @@ serveListeners -> ListenersSecurityProtocolMap -> Bool -- ^ ExperimentalFeature: ExperimentalCppServer + -> Bool + -- ^ ExperimentalFeature: ExperimentalSparseOffset -> IO [Async.Async ()] serveListeners sc netOpts securityMap listeners listenerSecurityMap - usingCppServer + usingCppServer usingSparseOffset = do let listeners' = [(k, v) | (k, vs) <- Map.toList listeners, v <- Set.toList vs] forM listeners' $ \(key, I.Listener{..}) -> Async.async $ do @@ -240,10 +251,15 @@ serveListeners sc netOpts <> Log.build key <> ":" <> Log.build listenerAddress <> ":" <> Log.build listenerPort + handlers <- if usingSparseOffset + then do + Log.warning "Using a experimental feature: SparseOffset" + pure K.sparseOffsetHandlers + else pure K.handlers if usingCppServer then do Log.warning "Using a still-in-development c++ kafka server!" - K.runCppServer netOpts' sc' K.handlers - else K.runHsServer netOpts' sc' K.unAuthedHandlers K.handlers + K.runCppServer netOpts' sc' handlers + else K.runHsServer netOpts' sc' K.unAuthedHandlers handlers -------------------------------------------------------------------------------