Skip to content

Commit

Permalink
kafka: add naïve ListOffset handler (#1619)
Browse files Browse the repository at this point in the history
* move streamId converter to HStream.store
*  naïve ListOffset handler
  • Loading branch information
YangKian authored Sep 25, 2023
1 parent 7edacd7 commit 4999329
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 81 deletions.
11 changes: 8 additions & 3 deletions hstream-kafka/HStream/Kafka/Common/OffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module HStream.Kafka.Common.OffsetManager
, cleanOffsetCache
, getOldestOffset
, getLatestOffset
, getOffsetByTimestamp
) where

import Control.Concurrent
Expand Down Expand Up @@ -88,9 +89,13 @@ getLatestOffset OffsetManager{..} logid = do
else do tailLsn <- S.getTailLSN store logid
Just . offset <$> readOneRecord reader logid tailLsn tailLsn

-- TODO
-- getOffsetByTime :: HasCallStack => OffsetManager -> Word64 -> Int64 -> IO Int64
-- getOffsetByTime OffsetManager{..} logid timestamp = undefined
getOffsetByTimestamp :: HasCallStack => OffsetManager -> Word64 -> Int64 -> IO (Maybe Int64)
getOffsetByTimestamp OffsetManager{..} logid timestamp = do
isEmpty <- S.isLogEmpty store logid
if isEmpty
then pure Nothing
else do lsn <- S.findTime store logid timestamp S.FindKeyStrict
Just . offset <$> readOneRecord reader logid lsn lsn

-------------------------------------------------------------------------------

Expand Down
39 changes: 18 additions & 21 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,26 @@ module HStream.Kafka.Server.Handler.Basic
, handleMetadataV1
) where

import Control.Concurrent.STM (readTVarIO)
import Control.Exception
import Control.Monad
import Data.Functor ((<&>))
import Data.Int (Int32)
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Vector as V
import Data.Functor ((<&>))
import Data.Int (Int32)
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Vector as V

import HStream.Common.ConsistentHashing (getResNode)
import HStream.Common.Server.Lookup (KafkaResource (..),
lookupKafkaPersist)
import qualified HStream.Gossip as Gossip
import HStream.Kafka.Server.Types (ServerContext (..),
transToStreamName)
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamApi as A
import qualified HStream.Store as S
import qualified HStream.Utils as Utils
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import HStream.Common.Server.Lookup (KafkaResource (..),
lookupKafkaPersist)
import qualified HStream.Gossip as Gossip
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamApi as A
import qualified HStream.Store as S
import qualified HStream.Utils as Utils
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

--------------------
-- 18: ApiVersions
Expand Down Expand Up @@ -124,7 +121,7 @@ handleMetadataV1 ctx@ServerContext{..} _ req = do

getRespTopic :: Text -> IO K.MetadataResponseTopicV1
getRespTopic topicName = do
let streamId = transToStreamName topicName
let streamId = S.transToTopicStreamName topicName
shards_e <- try ((V.map snd) <$> S.listStreamPartitionsOrdered scLDClient streamId)
case shards_e of
-- FIXME: We catch all exceptions here. Is this proper?
Expand Down
6 changes: 2 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ import qualified Data.ByteString as BS
import Data.Int
import qualified Data.List as L
import Data.Maybe
import Data.Time
import qualified Data.Vector as V

import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Server.Types (ServerContext (..),
transToStreamName)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
Expand Down Expand Up @@ -68,7 +66,7 @@ readSingleTopic
readSingleTopic ldclient K.FetchTopicV0{..} totalMaxBytes_m timeLeftMs isFirstTopic = case partitions of
K.KaArray Nothing -> return (totalMaxBytes_m, timeLeftMs, K.FetchableTopicResponseV0 topic (K.KaArray Nothing))
K.KaArray (Just parts) -> do
orderedParts <- S.listStreamPartitionsOrdered ldclient (transToStreamName topic)
orderedParts <- S.listStreamPartitionsOrdered ldclient (S.transToTopicStreamName topic)
-- FIXME: is it proper to use one reader for all partitions of a topic?
reader <- S.newLDReader ldclient 1 (Just 1)
(_,totalMaxBytes_m', timeLeftMs', resps) <-
Expand Down
49 changes: 49 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
@@ -1,18 +1,67 @@
module HStream.Kafka.Server.Handler.Offset
( handleOffsetCommitV0
, handleOffsetFetchV0
, handleListOffsetsV0
)
where

import Control.Concurrent (withMVar)
import qualified Data.HashMap.Strict as HM
import Data.Int (Int64)
import Data.Text (Text)
import Data.Vector (Vector)
import qualified Data.Vector as V
import HStream.Kafka.Common.OffsetManager (getLatestOffset,
getOffsetByTimestamp,
getOldestOffset)
import HStream.Kafka.Group.GroupMetadataManager (fetchOffsets,
storeOffsets)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Store as S
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Service as K

--------------------
-- 2: ListOffsets
--------------------
handleListOffsetsV0
:: ServerContext -> K.RequestContext -> K.ListOffsetsRequestV0 -> IO K.ListOffsetsResponseV0
handleListOffsetsV0 sc _ K.ListOffsetsRequestV0{..} = do
case K.unKaArray topics of
Nothing -> undefined
Just topics' -> do
res <- V.forM topics' $ \K.ListOffsetsTopicV0 {..} -> do
listOffsetTopicPartitions sc name (K.unKaArray partitions)
return $ K.ListOffsetsResponseV0 {topics = K.KaArray {unKaArray = Just res}}

latestTimestamp :: Int64
latestTimestamp = -1

earliestTimestamp :: Int64
earliestTimestamp = -2

listOffsetTopicPartitions :: ServerContext -> Text -> Maybe (Vector K.ListOffsetsPartitionV0) -> IO K.ListOffsetsTopicResponseV0
listOffsetTopicPartitions _ topicName Nothing = do
return $ K.ListOffsetsTopicResponseV0 {partitions = K.KaArray {unKaArray = Nothing}, name = topicName}
listOffsetTopicPartitions ServerContext{..} topicName (Just offsetsPartitions) = do
orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topicName)
res <- V.forM offsetsPartitions $ \K.ListOffsetsPartitionV0{..} -> do
let logid = orderedParts V.!? (fromIntegral partitionIndex)
offset <- getOffset logid timestamp
return $ K.ListOffsetsPartitionResponseV0
{ oldStyleOffsets = K.KaArray {unKaArray = V.singleton <$> offset}
, partitionIndex = partitionIndex
, errorCode = K.NONE
}
return $ K.ListOffsetsTopicResponseV0 {partitions = K.KaArray {unKaArray = Just res}, name = topicName}
where
getOffset Nothing _ = undefined
getOffset (Just (_, logid)) timestamp
| timestamp == latestTimestamp = getLatestOffset scOffsetManager logid
| timestamp == earliestTimestamp = getOldestOffset scOffsetManager logid
| otherwise = getOffsetByTimestamp scOffsetManager logid timestamp

--------------------
-- 8: OffsetCommit
--------------------
Expand Down
7 changes: 3 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import qualified Data.Vector as V
import qualified HStream.Base.Time as BaseTime
import qualified HStream.Common.Server.Shard as Shard
import qualified HStream.Common.Types as CommonTypes
import HStream.Kafka.Server.Types (ServerContext (..),
transToStreamName)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
Expand Down Expand Up @@ -57,7 +56,7 @@ handleCreateTopicsV0 ctx _ K.CreateTopicsRequestV0{..} =
Log.warning $ "Expect a positive numPartitions but got " <> Log.build numPartitions
return $ K.CreatableTopicResultV0 name K.INVALID_PARTITIONS
| otherwise = do
let streamId = transToStreamName name
let streamId = S.transToTopicStreamName name
timeStamp <- BaseTime.getSystemNsTimestamp
-- FIXME: Is there any other attrs to store?
-- FIXME: Should we parse any other attr from `confs` of `CreateableTopicV0`?
Expand Down Expand Up @@ -118,7 +117,7 @@ handleDeleteTopicsV0 ctx _ K.DeleteTopicsRequestV0{..} =
-- TODO: Handle topic that has subscription (i.e. cannot be deleted)
deleteTopic :: T.Text -> IO K.DeletableTopicResultV0
deleteTopic topicName = do
let streamId = transToStreamName topicName
let streamId = S.transToTopicStreamName topicName
S.doesStreamExist (scLDClient ctx) streamId >>= \case
True -> do
S.removeStream (scLDClient ctx) streamId
Expand Down
6 changes: 0 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Types.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module HStream.Kafka.Server.Types
( ServerContext (..)
, initServerContext

, transToStreamName
) where

import Data.Text (Text)
Expand All @@ -23,7 +21,6 @@ import HStream.MetaStore.Types (MetaHandle)
import HStream.Stats (newServerStatsHolder)
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)

data ServerContext = ServerContext
{ serverID :: !Word32
Expand Down Expand Up @@ -71,6 +68,3 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do
, scOffsetManager = offsetManager
, scGroupMetadataManagers = groupMetadataManager
}

transToStreamName :: Text -> S.StreamId
transToStreamName = S.mkStreamId S.StreamTypeStream . textToCBytes
36 changes: 31 additions & 5 deletions hstream-store/HStream/Store/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ module HStream.Store.Stream
, getStreamLogAttrs
, getStreamPartitionHeadTimestamp
, getStreamPartitionExtraAttrs
, transToStreamName
, transToTempStreamName
, transToViewStreamName
, transToTopicStreamName
-- ** Operations
, createStream
, createStreamPartition
Expand Down Expand Up @@ -162,6 +166,8 @@ import Data.IORef (IORef, atomicModifyIORef',
newIORef, readIORef)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Text (Text)
import qualified Data.Text as T
import Data.Vector (Vector)
import qualified Data.Vector as V
import qualified Data.Vector.Algorithms.Intro as V
Expand Down Expand Up @@ -208,17 +214,19 @@ data StreamSettings = StreamSettings
{ streamNameLogDir :: CBytes
, streamViewLogDir :: CBytes
, streamTempLogDir :: CBytes
, streamTopicLogDir :: CBytes
, streamDefaultKey :: CBytes
, streamArchivePrefix :: CBytes
}

gloStreamSettings :: IORef StreamSettings
gloStreamSettings = unsafePerformIO . newIORef $
-- NOTE: no trailing slash
StreamSettings { streamNameLogDir = "/hstream/stream"
, streamViewLogDir = "/hstream/view"
, streamTempLogDir = "/tmp/hstream"
, streamDefaultKey = "__default_key__"
StreamSettings { streamNameLogDir = "/hstream/stream"
, streamViewLogDir = "/hstream/view"
, streamTempLogDir = "/tmp/hstream"
, streamTopicLogDir = "/hstream/topic"
, streamDefaultKey = "__default_key__"
, streamArchivePrefix = "__archive__"
}
{-# NOINLINE gloStreamSettings #-}
Expand Down Expand Up @@ -301,7 +309,7 @@ toArchivedStreamName StreamId{..} = do

-------------------------------------------------------------------------------

data StreamType = StreamTypeStream | StreamTypeView | StreamTypeTemp
data StreamType = StreamTypeStream | StreamTypeView | StreamTypeTemp | StreamTypeTopic
deriving (Show, Eq, Generic)

instance Hashable StreamType
Expand Down Expand Up @@ -331,11 +339,28 @@ mkStreamIdFromFullLogDir streamType path = do
StreamTypeStream -> t2 P.makeRelative (streamNameLogDir s) path
StreamTypeView -> t2 P.makeRelative (streamViewLogDir s) path
StreamTypeTemp -> t2 P.makeRelative (streamTempLogDir s) path
StreamTypeTopic -> t2 P.makeRelative (streamTopicLogDir s) path
return $ StreamId streamType name

showStreamName :: StreamId -> String
showStreamName = CBytes.unpack . streamName

transToStreamName :: Text -> StreamId
transToStreamName = mkStreamId StreamTypeStream . textToCBytes

transToTempStreamName :: Text -> StreamId
transToTempStreamName = mkStreamId StreamTypeTemp . textToCBytes

transToViewStreamName :: Text -> StreamId
transToViewStreamName = mkStreamId StreamTypeView . textToCBytes

transToTopicStreamName :: Text -> StreamId
transToTopicStreamName = mkStreamId StreamTypeTopic . textToCBytes

textToCBytes :: Text -> CBytes.CBytes
textToCBytes = CBytes.pack . T.unpack
{-# INLINE textToCBytes #-}

-------------------------------------------------------------------------------

-- | Create stream
Expand Down Expand Up @@ -836,6 +861,7 @@ getStreamDirPath StreamId{..} = do
StreamTypeStream -> pure $ t2 P.combine (streamNameLogDir s) streamName
StreamTypeView -> pure $ t2 P.combine (streamViewLogDir s) streamName
StreamTypeTemp -> pure $ t2 P.combine (streamTempLogDir s) streamName
StreamTypeTopic -> pure $ t2 P.combine (streamTopicLogDir s) streamName
{-# INLINABLE getStreamDirPath #-}

getStreamLogPath :: StreamId -> Maybe CBytes -> IO (CBytes, CBytes)
Expand Down
1 change: 1 addition & 0 deletions hstream-store/hstream-store.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ library
, vector >=0.12 && <0.14
, vector-algorithms ^>=0.9
, Z-Data
, text

default-language: Haskell2010
default-extensions:
Expand Down
8 changes: 4 additions & 4 deletions hstream/src/HStream/Server/Core/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ createQueryWithNamespace'
-- FIXME: use another exception or find which resource doesn't exist
throwIO $ HE.StreamNotFound $ "At least one of the streams/views do not exist: " <> T.pack (show sources)
True -> do
createStreamWithShard scLDClient (transToStreamName sink) "query" factor
createStreamWithShard scLDClient (S.transToStreamName sink) "query" factor
let relatedStreams = (sources, sink)
-- FIXME: pass custom query name
createQueryAndRun sc sink (ins `zip` L.map fromJust roles_m) (out, RoleStream) builder createQueryRequestSql relatedStreams
Expand All @@ -193,7 +193,7 @@ createQueryWithNamespace'
Log.warning "CREATE STREAM only supports sources of stream type"
throwIO $ HE.InvalidSqlStatement "CREATE STREAM only supports sources of stream type"
-- check & prepare sink stream
S.doesStreamExist scLDClient (transToStreamName sink) >>= \case
S.doesStreamExist scLDClient (S.transToStreamName sink) >>= \case
True -> do
Log.warning $ "Sink stream already exists: " <> Log.buildString (show sink)
throwIO $ HE.StreamExists sink
Expand Down Expand Up @@ -243,7 +243,7 @@ createQueryWithNamespace'
Log.warning "Insert by Select only supports all sources of same resource type STREAM"
throw $ HE.InvalidSqlStatement "Insert by Select only supports all sources of same resource type STREAM"
-- check sink stream
foundSink <- S.doesStreamExist scLDClient (transToStreamName sink)
foundSink <- S.doesStreamExist scLDClient (S.transToStreamName sink)
when (not foundSink) $ do
Log.warning $ "Insert by Select: Stream not found: " <> Log.buildString (show streamName)
throw $ HE.StreamNotFound $ "Stream " <> streamName <> " not found"
Expand Down Expand Up @@ -297,7 +297,7 @@ deleteQuery ServerContext{..} DeleteQueryRequest{..} = do
Nothing -> do
-- do deletion
-- Note: do not forget to delete the stream for changelog
S.removeStream scLDClient (transToTempStreamName deleteQueryRequestId)
S.removeStream scLDClient (S.transToTempStreamName deleteQueryRequestId)
P.deleteQueryInfo deleteQueryRequestId metaHandle
Stats.connector_stat_erase scStatsHolder (textToCBytes deleteQueryRequestId)
Just P.QVRelation{..} -> throwIO $ HE.FoundAssociatedView qvRelationViewName
Expand Down
8 changes: 4 additions & 4 deletions hstream/src/HStream/Server/Core/QueryNew.hs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ createQueryWithNamespace'
-- FIXME: use another exception or find which resource doesn't exist
throwIO $ HE.StreamNotFound $ "At least one of the streams/views do not exist: " <> T.pack (show sources)
True -> do
createStreamWithShard scLDClient (transToStreamName sink) "query" factor
createStreamWithShard scLDClient (S.transToStreamName sink) "query" factor
let relatedStreams = (sources, sink)
-- FIXME: pass custom query name
createQueryAndRun sc sink (ins `zip` L.map fromJust roles_m) (out, RoleStream) builder createQueryRequestSql relatedStreams
Expand All @@ -192,7 +192,7 @@ createQueryWithNamespace'
Log.warning "CREATE STREAM only supports sources of stream type"
throwIO $ HE.InvalidSqlStatement "CREATE STREAM only supports sources of stream type"
-- check & prepare sink stream
S.doesStreamExist scLDClient (transToStreamName sink) >>= \case
S.doesStreamExist scLDClient (S.transToStreamName sink) >>= \case
True -> do
Log.warning $ "Sink stream already exists: " <> Log.buildString (show sink)
throwIO $ HE.StreamExists sink
Expand Down Expand Up @@ -243,7 +243,7 @@ createQueryWithNamespace'
Log.warning "Insert by Select only supports all sources of same resource type STREAM"
throw $ HE.InvalidSqlStatement "Insert by Select only supports all sources of same resource type STREAM"
-- check sink stream
foundSink <- S.doesStreamExist scLDClient (transToStreamName sink)
foundSink <- S.doesStreamExist scLDClient (S.transToStreamName sink)
when (not foundSink) $ do
Log.warning $ "Insert by Select: Stream not found: " <> Log.buildString (show streamName)
throw $ HE.StreamNotFound $ "Stream " <> streamName <> " not found"
Expand Down Expand Up @@ -297,7 +297,7 @@ deleteQuery ServerContext{..} DeleteQueryRequest{..} = do
Nothing -> do
-- do deletion
-- Note: do not forget to delete the stream for changelog
S.removeStream scLDClient (transToTempStreamName deleteQueryRequestId)
S.removeStream scLDClient (S.transToTempStreamName deleteQueryRequestId)
P.deleteQueryInfo deleteQueryRequestId metaHandle
Stats.connector_stat_erase scStatsHolder (textToCBytes deleteQueryRequestId)
Just P.QVRelation{..} -> throwIO $ HE.FoundAssociatedView qvRelationViewName
Expand Down
Loading

0 comments on commit 4999329

Please sign in to comment.