diff --git a/.stylish-haskell.yaml b/.stylish-haskell.yaml index 42703ca8e..57a780ba0 100644 --- a/.stylish-haskell.yaml +++ b/.stylish-haskell.yaml @@ -272,6 +272,8 @@ language_extensions: - TemplateHaskell - TypeApplications - QuasiQuotes + - CPP + - PatternSynonyms # Attempt to find the cabal file in ancestors of the current directory, and # parse options (currently only language extensions) from that. diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index 30bce055c..d21bb4427 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -1,12 +1,12 @@ -{-# LANGUAGE OverloadedRecordDot #-} -{-# LANGUAGE PatternSynonyms #-} +#ifndef HSTREAM_SPARSE_OFFSET +{-# LANGUAGE PatternSynonyms #-} module HStream.Kafka.Server.Handler.Offset ( handleOffsetCommit , handleOffsetFetch , handleListOffsets - ) -where + ) where +#endif import qualified Control.Exception as E import Data.Int (Int64) @@ -17,9 +17,7 @@ import HStream.Kafka.Common.Acl import HStream.Kafka.Common.Authorizer.Class import qualified HStream.Kafka.Common.KafkaException as K import qualified HStream.Kafka.Common.Metrics as Metrics -import HStream.Kafka.Common.OffsetManager (getLatestOffset, - getOffsetByTimestamp, - getOldestOffset) +import qualified HStream.Kafka.Common.OffsetManager as K import HStream.Kafka.Common.Resource import HStream.Kafka.Common.Utils (forKaArray, forKaArrayM) import qualified HStream.Kafka.Group.Group as G @@ -42,11 +40,20 @@ pattern EarliestTimestamp = (-2) -- FIXME: This function does not handle any ErrorCodeException. -- Modify the following 'listOffsetTopicPartitions' to fix it. -handleListOffsets :: ServerContext - -> K.RequestContext - -> K.ListOffsetsRequest - -> IO K.ListOffsetsResponse +#ifndef HSTREAM_SPARSE_OFFSET +handleListOffsets +#else +handleListOffsetsSparseOffset +#endif + :: ServerContext + -> K.RequestContext + -> K.ListOffsetsRequest + -> IO K.ListOffsetsResponse +#ifndef HSTREAM_SPARSE_OFFSET handleListOffsets sc reqCtx req = do +#else +handleListOffsetsSparseOffset sc reqCtx req = do +#endif topicResps <- forKaArrayM req.topics $ \listOffsetsTopic -> do -- [ACL] check [DESCRIBE TOPIC] for each topic simpleAuthorize (toAuthorizableReqCtx reqCtx) sc.authorizer Res_TOPIC listOffsetsTopic.name AclOp_DESCRIBE >>= \case @@ -109,18 +116,31 @@ handleListOffsets sc reqCtx req = do , name = listOffsetsTopic.name } +#ifndef HSTREAM_SPARSE_OFFSET -- NOTE: The last offset of a partition is the offset of the upcoming -- message, i.e. the offset of the last available message + 1. getOffset logid LatestTimestamp = - maybe 0 (+ 1) <$> getLatestOffset sc.scOffsetManager logid + maybe 0 (+ 1) <$> K.getLatestOffset sc.scOffsetManager logid + getOffset logid EarliestTimestamp = + fromMaybe 0 <$> K.getOldestOffset sc.scOffsetManager logid + -- Return the earliest offset whose timestamp is greater than or equal to + -- the given timestamp. + -- + -- TODO: actually, this is not supported currently. + getOffset logid timestamp = + fromMaybe (-1) <$> K.getOffsetByTimestamp sc.scOffsetManager logid timestamp +#else + getOffset logid LatestTimestamp = + maybe 0 K.calNextSparseOffset <$> K.getLatestHeadSparseOffset sc.scOffsetManager logid getOffset logid EarliestTimestamp = - fromMaybe 0 <$> getOldestOffset sc.scOffsetManager logid + fromMaybe 0 <$> K.getOldestSparseOffset sc.scOffsetManager logid -- Return the earliest offset whose timestamp is greater than or equal to -- the given timestamp. -- -- TODO: actually, this is not supported currently. getOffset logid timestamp = - fromMaybe (-1) <$> getOffsetByTimestamp sc.scOffsetManager logid timestamp + fromMaybe (-1) <$> K.getSparseOffsetByTimestamp sc.scOffsetManager logid timestamp +#endif -------------------- -- 8: OffsetCommit diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Offset.hs new file mode 100644 index 000000000..70fa8f18a --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Offset.hs @@ -0,0 +1,10 @@ +{-# LANGUAGE PatternSynonyms #-} +{-# OPTIONS_GHC -Wno-unused-top-binds #-} + +module HStream.Kafka.Server.Handler.SparseOffset.Offset + ( handleListOffsetsSparseOffset + ) where + +#define HSTREAM_SPARSE_OFFSET +#include "HStream/Kafka/Server/Handler/Offset.hs" +#undef HSTREAM_SPARSE_OFFSET diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 31eef9df9..43882b375 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -173,6 +173,7 @@ library HStream.Kafka.Server.Handler.Group HStream.Kafka.Server.Handler.Offset HStream.Kafka.Server.Handler.Produce + HStream.Kafka.Server.Handler.SparseOffset.Offset HStream.Kafka.Server.Handler.Topic HStream.Kafka.Server.Security.SASL @@ -193,7 +194,7 @@ library extra-libraries: rdkafka++ include-dirs: - include /usr/local/include external/asio/asio/include include + . include /usr/local/include external/asio/asio/include extra-lib-dirs: /usr/local/lib hs-source-dirs: . @@ -245,6 +246,7 @@ library default-language: GHC2021 default-extensions: + CPP DerivingStrategies LambdaCase MultiWayIf @@ -262,9 +264,9 @@ test-suite hstream-kafka-test HStream.Kafka.Common.AclEntrySpec HStream.Kafka.Common.AclSpec HStream.Kafka.Common.AuthorizerSpec + HStream.Kafka.Common.ConfigSpec HStream.Kafka.Common.OffsetManagerSpec HStream.Kafka.Common.TestUtils - HStream.Kafka.Common.ConfigSpec hs-source-dirs: tests build-depends: