Skip to content

Commit

Permalink
Experimental SpareOffset 2: Produce handler
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed May 17, 2024
1 parent c20d3d7 commit 805b65d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
40 changes: 38 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#ifndef HSTREAM_SPARSE_OFFSET
module HStream.Kafka.Server.Handler.Produce
( handleProduce
, handleInitProducerId
) where
#endif

import qualified Control.Concurrent.Async as Async
import Control.Exception
Expand Down Expand Up @@ -44,12 +46,20 @@ import qualified Kafka.Protocol.Service as K
-- guarantees that the record will not be lost as long as at least one
-- in-sync replica remains alive. This is the strongest available
-- guarantee.
#ifndef HSTREAM_SPARSE_OFFSET
handleProduce
#else
handleProduceSparseOffset
#endif
:: ServerContext
-> K.RequestContext
-> K.ProduceRequest
-> IO K.ProduceResponse
#ifndef HSTREAM_SPARSE_OFFSET
handleProduce ServerContext{..} _reqCtx req = do
#else
handleProduceSparseOffset ServerContext{..} _reqCtx req = do
#endif
-- TODO: handle request args: acks, timeoutMs
let topicData = fromMaybe V.empty (K.unKaArray req.topicData)
responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do
Expand Down Expand Up @@ -80,8 +90,9 @@ handleProduce ServerContext{..} _reqCtx req = do
-- Note that the Show instance of RecordBytes type will only show the
-- length of the ByteString. So here we pass the ByteString to the Log
Log.trace $ "Received recordBytes: " <> Log.buildString' (recordBytes :: ByteString)
Log.debug1 $ "Try to append to logid " <> Log.build logid
<> "(" <> Log.build partition.index <> ")"
Log.debug1 $ "Try to append to "
<> Log.build topic.name <> ":" <> Log.build partition.index
<> ", log " <> Log.build logid

-- [ACL] Generate response by the authorization result of the **topic**
case isTopicAuthzed of
Expand Down Expand Up @@ -158,6 +169,7 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
let batchLength = batch.recordsCount
when (batchLength < 1) $ error "Invalid batch length"

#ifndef HSTREAM_SPARSE_OFFSET
-- Offset wroten into storage is the max key in the batch, but return the min
-- key to the client. This is because the usage of findKey.
--
Expand Down Expand Up @@ -206,6 +218,30 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
<> ", lsn: " <> Log.build r.appendCompLSN
<> ", start offset: " <> Log.build startOffset
pure (r.appendCompTimestamp, startOffset)
#else
-- FIXME unlikely overflow: convert batchLength from Int to Int32
let storedRecord = K.runPut $ K.RecordFormat 0{- version -}
0{- max offset -}
batchLength
(K.CompactBytes bs)
Log.debug1 $ "Append(SparseOffset) "
<> "batch length " <> Log.build batchLength
<> ", bytes " <> Log.build (BS.length bs)
<> ", stored bytes " <> Log.build (BS.length storedRecord)
r <- M.observeWithLabel M.topicWriteStoreLatency streamName $
S.appendCompressedBS ldclient logid storedRecord S.CompressionNone
Nothing
let !partLabel = (streamName, T.pack . show $ partition)
M.withLabel M.topicTotalAppendBytes partLabel $ \counter ->
void $ M.addCounter counter (fromIntegral $ BS.length storedRecord)
M.withLabel M.topicTotalAppendMessages partLabel $ \counter ->
void $ M.addCounter counter (fromIntegral batchLength)
let startOffset = K.composeSparseOffset r.appendCompLSN 0
Log.debug1 $ "Append(SparseOffset) done " <> Log.build r.appendCompLogID
<> ", lsn: " <> Log.build r.appendCompLSN
<> ", start offset: " <> Log.build startOffset
pure (r.appendCompTimestamp, startOffset)
#endif

-- TODO: performance improvements
--
Expand Down
10 changes: 10 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Produce.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{-# LANGUAGE PatternSynonyms #-}
{-# OPTIONS_GHC -Wno-unused-top-binds #-}

module HStream.Kafka.Server.Handler.SparseOffset.Produce
( handleProduceSparseOffset
) where

#define HSTREAM_SPARSE_OFFSET
#include "HStream/Kafka/Server/Handler/Produce.hs"
#undef HSTREAM_SPARSE_OFFSET
5 changes: 2 additions & 3 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ library
HStream.Kafka.Server.Handler.Offset
HStream.Kafka.Server.Handler.Produce
HStream.Kafka.Server.Handler.SparseOffset.Offset
HStream.Kafka.Server.Handler.SparseOffset.Produce
HStream.Kafka.Server.Handler.Topic
HStream.Kafka.Server.Security.SASL

Expand All @@ -193,9 +194,7 @@ library
-- -DASIO_ENABLE_BUFFER_DEBUGGING

extra-libraries: rdkafka++
include-dirs:
. include /usr/local/include external/asio/asio/include

include-dirs: . include /usr/local/include external/asio/asio/include
extra-lib-dirs: /usr/local/lib
hs-source-dirs: .
build-tool-depends: hsc2hs:hsc2hs
Expand Down

0 comments on commit 805b65d

Please sign in to comment.