Skip to content

Commit

Permalink
Experimental SpareOffset 4(end): argument to enable (#1817)
Browse files Browse the repository at this point in the history
Argument `--experimental sparse-offset` is added to enable the feature of spare offset.
  • Loading branch information
4eUeP authored May 20, 2024
1 parent 090ae86 commit 577dd18
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 12 deletions.
6 changes: 4 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,11 @@ data StorageOptions = StorageOptions

data ExperimentalFeature
= ExperimentalCppServer
| ExperimentalSparseOffset
deriving (Show, Eq)

parseExperimentalFeature :: O.ReadM ExperimentalFeature
parseExperimentalFeature = O.eitherReader $ \case
"cpp" -> Right ExperimentalCppServer
x -> Left $ "cannot parse experimental feature: " <> x
"cpp" -> Right ExperimentalCppServer
"sparse-offset" -> Right ExperimentalSparseOffset
x -> Left $ "cannot parse experimental feature: " <> x
59 changes: 56 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

module HStream.Kafka.Server.Handler
( handlers
, sparseOffsetHandlers
, unAuthedHandlers
) where

Expand All @@ -13,10 +14,13 @@ import HStream.Kafka.Server.Handler.Group
import HStream.Kafka.Server.Handler.Offset
import HStream.Kafka.Server.Handler.Produce
import HStream.Kafka.Server.Handler.Security
import HStream.Kafka.Server.Handler.SparseOffset.Consume
import HStream.Kafka.Server.Handler.SparseOffset.Offset
import HStream.Kafka.Server.Handler.SparseOffset.Produce
import HStream.Kafka.Server.Handler.Topic
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -93,6 +97,11 @@ import qualified Kafka.Protocol.Service as K
#cv_handler CreateAcls, 0, 0
#cv_handler DeleteAcls, 0, 0

-- SparseOffset
#cv_handler ListOffsets, 0, 2, SparseOffset
#cv_handler Produce, 0, 7, SparseOffset
#cv_handler Fetch, 0, 6, SparseOffset

handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
Expand Down Expand Up @@ -142,3 +151,47 @@ unAuthedHandlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler SaslHandshake, 0, 1
]

sparseOffsetHandlers :: ServerContext -> [K.ServiceHandler]
sparseOffsetHandlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler ListOffsets, 0, 2, SparseOffset
, #mk_handler Metadata, 0, 5
-- Write
, #mk_handler Produce, 0, 7, SparseOffset
, #mk_handler InitProducerId, 0, 0
-- Read
, #mk_handler Fetch, 0, 6, SparseOffset

, #mk_handler FindCoordinator, 0, 1

, #mk_handler CreateTopics, 0, 2
, #mk_handler DeleteTopics, 0, 1
, #mk_handler CreatePartitions, 0, 1

-- Group
, #mk_handler JoinGroup, 0, 2
, #mk_handler SyncGroup, 0, 1
, #mk_handler LeaveGroup, 0, 1
, #mk_handler Heartbeat, 0, 1
, #mk_handler ListGroups, 0, 1
, #mk_handler DescribeGroups, 0, 1

, #mk_handler OffsetCommit, 0, 3
, #mk_handler OffsetFetch, 0, 3

-- configs
, #mk_handler DescribeConfigs, 0, 0

-- Sasl
, #mk_handler SaslHandshake, 0, 1, AfterAuth
, #mk_handler SaslAuthenticate, 0, 0

-- For hstream
, #mk_handler HadminCommand, 0, 0

-- ACL
, #mk_handler DescribeAcls, 0, 0
, #mk_handler CreateAcls, 0, 0
, #mk_handler DeleteAcls, 0, 0
]
30 changes: 23 additions & 7 deletions hstream/app/lib/KafkaServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ app config@ServerOpts{..} = do

-- Experimental features
let usingCppServer = ExperimentalCppServer `elem` experimentalFeatures
Async.withAsync (serve serverContext netOpts usingCppServer) $ \a -> do
usingSparseOffset = ExperimentalSparseOffset `elem` experimentalFeatures
Async.withAsync (serve serverContext netOpts usingCppServer usingSparseOffset) $ \a -> do
-- start gossip
a1 <- startGossip _serverHost gossipContext
Async.link2Only (const True) a a1
Expand All @@ -140,6 +141,7 @@ app config@ServerOpts{..} = do
_serverAdvertisedListeners
_listenersSecurityProtocolMap
usingCppServer
usingSparseOffset
forM_ as (Async.link2Only (const True) a)
-- wait the default server
waitGossipBoot gossipContext
Expand All @@ -153,8 +155,10 @@ app config@ServerOpts{..} = do
serve :: ServerContext -> K.ServerOptions
-> Bool
-- ^ ExperimentalFeature: ExperimentalCppServer
-> Bool
-- ^ ExperimentalFeature: ExperimentalSparseOffset
-> IO ()
serve sc@ServerContext{..} netOpts usingCppServer = do
serve sc@ServerContext{..} netOpts usingCppServer usingSparseOffset = do
Log.i "************************"
hPutStrLn stderr banner
Log.i "************************"
Expand Down Expand Up @@ -190,10 +194,15 @@ serve sc@ServerContext{..} netOpts usingCppServer = do
Log.info $ "Starting"
<> if isJust (K.serverSslOptions netOpts') then " secure " else " insecure "
<> "kafka server..."
handlers <- if usingSparseOffset
then do
Log.warning "Using a experimental feature: SparseOffset"
pure K.sparseOffsetHandlers
else pure K.handlers
if usingCppServer
then do Log.warning "Using a still-in-development c++ kafka server!"
K.runCppServer netOpts' sc K.handlers
else K.runHsServer netOpts' sc K.unAuthedHandlers K.handlers
K.runCppServer netOpts' sc handlers
else K.runHsServer netOpts' sc K.unAuthedHandlers handlers
where
exceptionHandlers =
[ Handler $ \(_ :: HE.RQLiteRowNotFound) -> return ()
Expand All @@ -208,10 +217,12 @@ serveListeners
-> ListenersSecurityProtocolMap
-> Bool
-- ^ ExperimentalFeature: ExperimentalCppServer
-> Bool
-- ^ ExperimentalFeature: ExperimentalSparseOffset
-> IO [Async.Async ()]
serveListeners sc netOpts
securityMap listeners listenerSecurityMap
usingCppServer
usingCppServer usingSparseOffset
= do
let listeners' = [(k, v) | (k, vs) <- Map.toList listeners, v <- Set.toList vs]
forM listeners' $ \(key, I.Listener{..}) -> Async.async $ do
Expand Down Expand Up @@ -240,10 +251,15 @@ serveListeners sc netOpts
<> Log.build key <> ":"
<> Log.build listenerAddress <> ":"
<> Log.build listenerPort
handlers <- if usingSparseOffset
then do
Log.warning "Using a experimental feature: SparseOffset"
pure K.sparseOffsetHandlers
else pure K.handlers
if usingCppServer
then do Log.warning "Using a still-in-development c++ kafka server!"
K.runCppServer netOpts' sc' K.handlers
else K.runHsServer netOpts' sc' K.unAuthedHandlers K.handlers
K.runCppServer netOpts' sc' handlers
else K.runHsServer netOpts' sc' K.unAuthedHandlers handlers

-------------------------------------------------------------------------------

Expand Down

0 comments on commit 577dd18

Please sign in to comment.