From 1f351c825c9e6e2996a3fff9762fe5acb1d3d371 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Mon, 27 May 2024 11:15:03 +0800 Subject: [PATCH] Kafka: a standalone storage package (#1821) --- .../HStream/Kafka/Common/FetchManager.hs | 2 +- .../HStream/Kafka/Common/OffsetManager.hs | 3 +- hstream-kafka/HStream/Kafka/Common/Read.hs | 3 +- .../HStream/Kafka/Common/RecordFormat.hs | 2 +- .../HStream/Kafka/Group/GroupCoordinator.hs | 2 +- .../HStream/Kafka/Group/GroupOffsetManager.hs | 17 ++- .../HStream/Kafka/Group/OffsetsStore.hs | 2 +- .../HStream/Kafka/Server/Config/FromCli.hs | 7 +- .../HStream/Kafka/Server/Config/FromJson.hs | 4 +- .../Kafka/Server/Config/KafkaConfigManager.hs | 2 +- .../HStream/Kafka/Server/Config/Types.hs | 15 ++- .../HStream/Kafka/Server/Core/Topic.hs | 2 +- .../HStream/Kafka/Server/Handler/Basic.hs | 12 +-- .../HStream/Kafka/Server/Handler/Config.hs | 0 .../HStream/Kafka/Server/Handler/Consume.hs | 2 +- .../HStream/Kafka/Server/Handler/Offset.hs | 2 +- .../HStream/Kafka/Server/Handler/Produce.hs | 2 +- .../HStream/Kafka/Server/Handler/Topic.hs | 2 +- hstream-kafka/HStream/Kafka/Server/Types.hs | 2 +- hstream-kafka/hstream-kafka.cabal | 29 ++++- hstream-kafka/storage/Kafka/Storage.hs | 7 ++ .../storage/Kafka/Storage/Logdevice.hs | 100 ++++++++++++++++++ 22 files changed, 172 insertions(+), 47 deletions(-) delete mode 100644 hstream-kafka/HStream/Kafka/Server/Handler/Config.hs create mode 100644 hstream-kafka/storage/Kafka/Storage.hs create mode 100644 hstream-kafka/storage/Kafka/Storage/Logdevice.hs diff --git a/hstream-kafka/HStream/Kafka/Common/FetchManager.hs b/hstream-kafka/HStream/Kafka/Common/FetchManager.hs index 2db49daac..48fbedfed 100644 --- a/hstream-kafka/HStream/Kafka/Common/FetchManager.hs +++ b/hstream-kafka/HStream/Kafka/Common/FetchManager.hs @@ -19,7 +19,7 @@ import Foreign.ForeignPtr (newForeignPtr_) import Foreign.Ptr (nullPtr) import qualified HStream.Kafka.Common.RecordFormat as K -import qualified HStream.Store as S +import qualified Kafka.Storage as S data FetchLogContext = FetchLogContext { expectedOffset :: Int64 diff --git a/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs b/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs index b5f2ce744..f6583d97c 100644 --- a/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs @@ -36,8 +36,7 @@ import GHC.Stack (HasCallStack) import HStream.Kafka.Common.Read import HStream.Kafka.Common.RecordFormat -import qualified HStream.Store as S -import qualified HStream.Store.Internal.LogDevice as S +import qualified Kafka.Storage as S ------------------------------------------------------------------------------- diff --git a/hstream-kafka/HStream/Kafka/Common/Read.hs b/hstream-kafka/HStream/Kafka/Common/Read.hs index 8ff03d033..f80422d6d 100644 --- a/hstream-kafka/HStream/Kafka/Common/Read.hs +++ b/hstream-kafka/HStream/Kafka/Common/Read.hs @@ -12,9 +12,8 @@ import GHC.Stack (HasCallStack) import HStream.Kafka.Common.RecordFormat import qualified HStream.Logger as Log -import qualified HStream.Store as S -import qualified HStream.Store.Internal.LogDevice as S import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Storage as S readOneRecord :: HasCallStack diff --git a/hstream-kafka/HStream/Kafka/Common/RecordFormat.hs b/hstream-kafka/HStream/Kafka/Common/RecordFormat.hs index 967fd7ac7..8d28792aa 100644 --- a/hstream-kafka/HStream/Kafka/Common/RecordFormat.hs +++ b/hstream-kafka/HStream/Kafka/Common/RecordFormat.hs @@ -14,8 +14,8 @@ import Data.Int import GHC.Generics (Generic) import qualified HStream.Logger as Log -import qualified HStream.Store as S import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Storage as S -- | Record is the smallest unit of data in HStream Kafka. -- diff --git a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs index 400df2a9c..d23a9ba8a 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs @@ -22,8 +22,8 @@ import qualified HStream.Kafka.Group.Group as G import qualified HStream.Kafka.Group.GroupOffsetManager as GOM import qualified HStream.Logger as Log import qualified HStream.MetaStore.Types as Meta -import HStream.Store (LDClient) import qualified Kafka.Protocol.Error as K +import Kafka.Storage (LDClient) data GroupCoordinator = GroupCoordinator { groups :: C.MVar (Utils.HashTable T.Text Group) diff --git a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs index a59b48180..77e258f40 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs @@ -20,7 +20,7 @@ import Data.IORef (IORef, modifyIORef', import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe) import Data.Set (Set) -import qualified Data.Set as S +import qualified Data.Set as Set import qualified Data.Text as T import qualified Data.Vector as V import Data.Word (Word64) @@ -32,8 +32,6 @@ import qualified HStream.Kafka.Common.Metrics as M import HStream.Kafka.Group.OffsetsStore (OffsetStorage (..), mkCkpOffsetStorage) import qualified HStream.Logger as Log -import qualified HStream.Store as LD -import qualified HStream.Store as S import qualified Kafka.Protocol as K import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray)) import qualified Kafka.Protocol.Error as K @@ -41,6 +39,7 @@ import Kafka.Protocol.Message (OffsetCommitRequestPartiti OffsetCommitResponsePartition (..), OffsetFetchResponsePartition (..), OffsetFetchResponseTopic (..)) +import qualified Kafka.Storage as S -- NOTE: All operations on the GroupMetadataManager are not concurrency-safe, -- and the caller needs to ensure concurrency-safety on its own. @@ -73,7 +72,7 @@ loadOffsetsFromStorage GroupOffsetManager{..} = do start <- getTime Monotonic tpOffsets <- Map.map fromIntegral <$> loadOffsets offsetStorage groupName let totalPartitions = length tpOffsets - logIds = S.fromList $ Map.keys tpOffsets + logIds = Set.fromList $ Map.keys tpOffsets topicNumRef <- newIORef 0 tps <- getTopicPartitions logIds [] topicNumRef totalTopicNum <- readIORef topicNumRef @@ -95,10 +94,10 @@ loadOffsetsFromStorage GroupOffsetManager{..} = do where getTopicPartitions :: Set S.C_LogID -> [[(TopicPartition, S.C_LogID)]] -> IORef Int -> IO ([(TopicPartition, S.C_LogID)]) getTopicPartitions lgs res topicNum - | S.null lgs = return $ concat res + | Set.null lgs = return $ concat res | otherwise = do - let lgId = S.elemAt 0 lgs - LD.logIdHasGroup ldClient lgId >>= \case + let lgId = Set.elemAt 0 lgs + S.logIdHasGroup ldClient lgId >>= \case True -> do (streamId, _) <- S.getStreamIdFromLogId ldClient lgId modifyIORef' topicNum (+1) @@ -107,11 +106,11 @@ loadOffsetsFromStorage GroupOffsetManager{..} = do tpWithLogId = zipWith (\(_, logId) idx -> (mkTopicPartition topicName idx, logId)) partitions ([0..]) res' = tpWithLogId : res -- remove partition ids from lgs because they all have same streamId - lgs' = lgs S.\\ S.fromList (map snd partitions) + lgs' = lgs Set.\\ Set.fromList (map snd partitions) getTopicPartitions lgs' res' topicNum False -> do Log.warning $ "get log group from log id failed, skip this log id:" <> Log.build lgId - getTopicPartitions (S.delete lgId lgs) res topicNum + getTopicPartitions (Set.delete lgId lgs) res topicNum storeOffsets :: GroupOffsetManager diff --git a/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs b/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs index ae51bfdaa..6bf25406b 100644 --- a/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs +++ b/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs @@ -14,8 +14,8 @@ import HStream.Base.Timer (CompactedWorker, startCompactedWorker, stopCompactedWorker, triggerCompactedWorker) import qualified HStream.Logger as Log -import qualified HStream.Store as S import HStream.Utils (textToCBytes) +import qualified Kafka.Storage as S type LogID = Word64 type LSN = Word64 diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs index 22ad45770..029702a45 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs @@ -35,8 +35,7 @@ import Z.Data.CBytes (CBytes) import HStream.Kafka.Server.Config.Types import qualified HStream.Logger as Log -import HStream.Store (Compression (..)) -import HStream.Store.Logger (LDLogLevel (..)) +import qualified Kafka.Storage as S ------------------------------------------------------------------------------- @@ -249,7 +248,7 @@ seedNodesParser = strOption <> metavar "ADDRESS" <> help "host:port pairs of seed nodes, separated by commas (,)" -storeCompressionParser :: O.Parser Compression +storeCompressionParser :: O.Parser S.Compression storeCompressionParser = option auto $ long "store-compression" <> metavar "none | lz4 | lz4hc" @@ -271,7 +270,7 @@ logFlushImmediatelyParser = O.switch $ long "log-flush-immediately" <> help "Flush immediately after logging, this may help debugging" -ldLogLevelParser :: O.Parser LDLogLevel +ldLogLevelParser :: O.Parser S.LDLogLevel ldLogLevelParser = option auto $ long "store-log-level" <> metavar "[critical|error|warning|notify|info|debug|spew]" diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs index 6f16b18d8..9243fa52e 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs @@ -20,7 +20,7 @@ import Text.Read (readEither) import qualified HStream.Kafka.Server.Config.KafkaConfig as KC import HStream.Kafka.Server.Config.Types -import HStream.Store (Compression (..)) +import qualified Kafka.Storage as S ------------------------------------------------------------------------------- @@ -61,7 +61,7 @@ parseJSONToOptions CliOptions{..} obj = do let !_serverGossipAddress = fromMaybe _advertisedAddress (cliServerGossipAddress <|> nodeGossipAddress) let !_metaStore = fromMaybe nodeMetaStore cliMetaStore - let !_compression = fromMaybe CompressionNone cliStoreCompression + let !_compression = fromMaybe S.CompressionNone cliStoreCompression let !_serverLogLevel = fromMaybe (readWithErrLog "log-level" nodeLogLevel) cliServerLogLevel let !_serverLogWithColor = nodeLogWithColor || cliServerLogWithColor diff --git a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs index 3bddb2d8c..19c6cfef4 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs @@ -13,10 +13,10 @@ import qualified Data.Vector as V import qualified HStream.Kafka.Common.Utils as K import qualified HStream.Kafka.Server.Config.KafkaConfig as KC import HStream.Kafka.Server.Handler.Topic (validateTopicName) -import qualified HStream.Store as S import qualified HStream.Utils as Utils import qualified Kafka.Protocol as K import qualified Kafka.Protocol.Error as K +import qualified Kafka.Storage as S data KafkaConfigManager = KafkaConfigManager diff --git a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs index 6e4c68324..5fd1a5634 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs @@ -36,17 +36,16 @@ import Data.Text (Text) import qualified Data.Text as Text import qualified Data.Vector as V import Data.Word -import HStream.Gossip (GossipOpts (..), - defaultGossipOpts) import qualified Options.Applicative as O import qualified Z.Data.CBytes as CBytes import Z.Data.CBytes (CBytes) +import HStream.Gossip (GossipOpts (..), + defaultGossipOpts) import qualified HStream.Kafka.Server.Config.KafkaConfig as KC import qualified HStream.Logger as Log import qualified HStream.Server.HStreamInternal as SAI -import HStream.Store (Compression (..)) -import HStream.Store.Logger (LDLogLevel) +import qualified Kafka.Storage as S ------------------------------------------------------------------------------- @@ -83,8 +82,8 @@ data ServerOpts = ServerOpts -- Store Options , _storage :: !StorageOptions - , _compression :: !Compression - , _ldLogLevel :: !LDLogLevel + , _compression :: !S.Compression + , _ldLogLevel :: !S.LDLogLevel , _ldConfigPath :: !CBytes , experimentalFeatures :: ![ExperimentalFeature] @@ -132,9 +131,9 @@ data CliOptions = CliOptions -- * Store config , cliStoreConfigPath :: !CBytes - , cliLdLogLevel :: !(Maybe LDLogLevel) + , cliLdLogLevel :: !(Maybe S.LDLogLevel) -- ** Internal Store options - , cliStoreCompression :: !(Maybe Compression) + , cliStoreCompression :: !(Maybe S.Compression) -- SASL Authentication , cliEnableSaslAuth :: !Bool diff --git a/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs b/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs index 2a3a49036..115cfcf31 100644 --- a/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Core/Topic.hs @@ -25,10 +25,10 @@ import HStream.Kafka.Server.Config.Types (ServerOpts (..), StorageOptions (..)) import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log -import qualified HStream.Store as S import qualified HStream.Utils as Utils import qualified Kafka.Protocol as K import qualified Kafka.Protocol.Error as K +import qualified Kafka.Storage as S createTopic :: ServerContext diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs index 184680096..fb131ed84 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs @@ -20,7 +20,7 @@ import Data.Int (Int32) import qualified Data.List as L import qualified Data.Map as Map import Data.Maybe (fromJust) -import qualified Data.Set as S +import qualified Data.Set as Set import Data.Text (Text) import qualified Data.Text as Text import qualified Data.Vector as V @@ -41,12 +41,12 @@ import qualified HStream.Kafka.Server.Handler.Topic as K import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log import qualified HStream.Server.HStreamApi as A -import qualified HStream.Store as S import qualified HStream.Utils as Utils import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K +import qualified Kafka.Storage as S -------------------- -- 18: ApiVersions @@ -107,11 +107,11 @@ handleMetadata ctx reqCtx req = do -- Note: authorize **DESCRIBE** for existed topics; -- authorize **DESCRIBE** and **CREATE** for -- unexisted topics. - let topicNames = S.fromList . V.toList $ + let topicNames = Set.fromList . V.toList $ V.map (\K.MetadataRequestTopic{..} -> name) v - allStreamNames <- S.findStreams ctx.scLDClient S.StreamTypeTopic <&> S.fromList . L.map (Utils.cBytesToText . S.streamName) - let needCreate = S.toList $ topicNames S.\\ allStreamNames - alreadyExist = V.fromList . S.toList $ topicNames `S.intersection` allStreamNames + allStreamNames <- S.findStreams ctx.scLDClient S.StreamTypeTopic <&> Set.fromList . L.map (Utils.cBytesToText . S.streamName) + let needCreate = Set.toList $ topicNames Set.\\ allStreamNames + alreadyExist = V.fromList . Set.toList $ topicNames `Set.intersection` allStreamNames kafkaBrokerConfigs = ctx.kafkaBrokerConfigs createResp <- diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Config.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Config.hs deleted file mode 100644 index e69de29bb..000000000 diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 89cccc277..fffc19adc 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -36,12 +36,12 @@ import HStream.Kafka.Server.Config (ServerOpts (..), import qualified HStream.Kafka.Server.Config.KafkaConfig as KC import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log -import qualified HStream.Store as S import qualified HStream.Utils as U import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K +import qualified Kafka.Storage as S ------------------------------------------------------------------------------- diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index d21bb4427..b1f4819c9 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -24,10 +24,10 @@ import qualified HStream.Kafka.Group.Group as G import qualified HStream.Kafka.Group.GroupCoordinator as GC import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log -import qualified HStream.Store as S import qualified Kafka.Protocol as K import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Service as K +import qualified Kafka.Storage as S -------------------- -- 2: ListOffsets diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index 08d0a4c79..aa7a03ded 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -25,12 +25,12 @@ import qualified HStream.Kafka.Common.RecordFormat as K import HStream.Kafka.Common.Resource import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log -import qualified HStream.Store as S import qualified HStream.Utils as U import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K +import qualified Kafka.Storage as S -- acks: (FIXME: Currently we only support -1) -- 0: The server will not send any response(this is the only case where the diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs index 2026f3dc2..88b390c21 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs @@ -32,13 +32,13 @@ import qualified HStream.Kafka.Common.Utils as Utils import qualified HStream.Kafka.Server.Core.Topic as Core import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log -import qualified HStream.Store as S import Kafka.Protocol (NullableString) import qualified Kafka.Protocol.Encoding as K import Kafka.Protocol.Error (ErrorCode) import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K +import qualified Kafka.Storage as S -------------------- -- 19: CreateTopics diff --git a/hstream-kafka/HStream/Kafka/Server/Types.hs b/hstream-kafka/HStream/Kafka/Server/Types.hs index 32b2dd0fa..f417ab5c6 100644 --- a/hstream-kafka/HStream/Kafka/Server/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Types.hs @@ -27,7 +27,7 @@ import qualified HStream.Kafka.Server.Config.KafkaConfig as KC import HStream.MetaStore.Types (MetaHandle (..)) import HStream.Stats (newServerStatsHolder) import qualified HStream.Stats as Stats -import qualified HStream.Store as S +import qualified Kafka.Storage as S data ServerContext = ServerContext { serverID :: !Word32 diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index dd577aaa0..c8c910c84 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -122,6 +122,28 @@ benchmark kafka-protocol-bench-encoding default-language: Haskell2010 ghc-options: -threaded -rtsopts -with-rtsopts=-N +library kafka-storage + import: shared-properties + exposed-modules: Kafka.Storage + other-modules: Kafka.Storage.Logdevice + hs-source-dirs: storage + build-tool-depends: hpp:hpp >=0.6 && <0.7 + build-depends: + , base >=4.11 && <5 + , bytestring + , deepseq + , hstream-common-base + , hstream-store + , text + , vector + + default-language: GHC2021 + default-extensions: + DerivingStrategies + LambdaCase + OverloadedStrings + RecordWildCards + library import: shared-properties exposed-modules: @@ -225,7 +247,7 @@ library , hstream-common-stats , hstream-gossip , hstream-kafka:kafka-protocol - , hstream-store + , hstream-kafka:kafka-storage , mtl , network , optparse-applicative @@ -271,7 +293,7 @@ test-suite hstream-kafka-test hs-source-dirs: tests build-depends: , aeson - , base >=4.11 && <5 + , base >=4.11 && <5 , bytestring , containers , hspec @@ -279,7 +301,8 @@ test-suite hstream-kafka-test , hstream-common , hstream-common-base , hstream-common-server - , hstream-kafka:{hstream-kafka, kafka-protocol} + , hstream-kafka + , hstream-kafka:kafka-protocol , hstream-store , http-client , text diff --git a/hstream-kafka/storage/Kafka/Storage.hs b/hstream-kafka/storage/Kafka/Storage.hs new file mode 100644 index 000000000..36fc8f079 --- /dev/null +++ b/hstream-kafka/storage/Kafka/Storage.hs @@ -0,0 +1,7 @@ +{-# LANGUAGE PatternSynonyms #-} + +module Kafka.Storage + ( module Kafka.Storage.Logdevice + ) where + +import Kafka.Storage.Logdevice diff --git a/hstream-kafka/storage/Kafka/Storage/Logdevice.hs b/hstream-kafka/storage/Kafka/Storage/Logdevice.hs new file mode 100644 index 000000000..2c4a791f9 --- /dev/null +++ b/hstream-kafka/storage/Kafka/Storage/Logdevice.hs @@ -0,0 +1,100 @@ +{-# LANGUAGE PatternSynonyms #-} + +module Kafka.Storage.Logdevice + ( -- * Client + LDClient + , newLDClient + , setClientSetting + , LDLogLevel + , trimLastBefore + + -- * Topic + , StreamId (streamName) + , StreamType (StreamTypeTopic) + , C_LogID + , createStream + , createStreamPartition + , listStreamPartitions + , listStreamPartitionsOrderedByName + , findStreams + , doesStreamExist + , isLogEmpty + , transToTopicStreamName + , getStreamIdFromLogId + , showStreamName + , logIdHasGroup + -- ** LSN + , LSN + , pattern LSN_MIN + , pattern LSN_MAX + , pattern LSN_INVALID + , getTailLSN + , findKey + , findTime + , FindKeyAccuracy (FindKeyStrict) + , pattern KeyTypeFindKey + -- ** Attributes + , removeStream + , getLogHeadAttrsTrimPoint + , getLogHeadAttrs + , Attribute (attrValue) + , LogAttributes (..) + , getStreamLogAttrs + , def + , defAttr1 + , getLogTailAttrsLSN + , getLogTailAttrs + , getStreamExtraAttrs + + -- * Records + -- ** Data record + , DataRecord (..) + , DataRecordAttr (..) + -- ** Gap record + , GapRecord (..) + , GapType + , pattern GapTypeUnknown + , pattern GapTypeBridge + , pattern GapTypeHole + , pattern GapTypeDataloss + , pattern GapTypeTrim + , pattern GapTypeAccess + , pattern GapTypeNotInConfig + , pattern GapTypeFilteredOut + , pattern GapTypeMax + + -- * Append + , appendCompressedBS + , Compression (..) + , AppendCompletion (..) + + -- * Reader + , LDReader + , newLDReader + , readerSetTimeout + , readerSetWaitOnlyWhenNoData + , readerStartReading + , readerRead + , readerReadSome + , readerReadAllowGap + , readerIsReading + , readerStopReading + + -- * Checkpoint store + , LDCheckpointStore + , initOffsetCheckpointDir + , allocOffsetCheckpointId + , newRSMBasedCheckpointStore + , ckpStoreUpdateMultiLSN + , ckpStoreGetAllCheckpoints' + , freeOffsetCheckpointId + + -- * Exception + , NOTFOUND (..) + , EXISTS (..) + ) where + + +import HStream.Store +import HStream.Store.Internal.LogDevice +import HStream.Store.Logger