diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index 247f47a73..08d0a4c79 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -1,7 +1,9 @@ +#ifndef HSTREAM_SPARSE_OFFSET module HStream.Kafka.Server.Handler.Produce ( handleProduce , handleInitProducerId ) where +#endif import qualified Control.Concurrent.Async as Async import Control.Exception @@ -44,12 +46,20 @@ import qualified Kafka.Protocol.Service as K -- guarantees that the record will not be lost as long as at least one -- in-sync replica remains alive. This is the strongest available -- guarantee. +#ifndef HSTREAM_SPARSE_OFFSET handleProduce +#else +handleProduceSparseOffset +#endif :: ServerContext -> K.RequestContext -> K.ProduceRequest -> IO K.ProduceResponse +#ifndef HSTREAM_SPARSE_OFFSET handleProduce ServerContext{..} _reqCtx req = do +#else +handleProduceSparseOffset ServerContext{..} _reqCtx req = do +#endif -- TODO: handle request args: acks, timeoutMs let topicData = fromMaybe V.empty (K.unKaArray req.topicData) responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do @@ -80,8 +90,9 @@ handleProduce ServerContext{..} _reqCtx req = do -- Note that the Show instance of RecordBytes type will only show the -- length of the ByteString. So here we pass the ByteString to the Log Log.trace $ "Received recordBytes: " <> Log.buildString' (recordBytes :: ByteString) - Log.debug1 $ "Try to append to logid " <> Log.build logid - <> "(" <> Log.build partition.index <> ")" + Log.debug1 $ "Try to append to " + <> Log.build topic.name <> ":" <> Log.build partition.index + <> ", log " <> Log.build logid -- [ACL] Generate response by the authorization result of the **topic** case isTopicAuthzed of @@ -158,6 +169,7 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d let batchLength = batch.recordsCount when (batchLength < 1) $ error "Invalid batch length" +#ifndef HSTREAM_SPARSE_OFFSET -- Offset wroten into storage is the max key in the batch, but return the min -- key to the client. This is because the usage of findKey. -- @@ -206,6 +218,30 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d <> ", lsn: " <> Log.build r.appendCompLSN <> ", start offset: " <> Log.build startOffset pure (r.appendCompTimestamp, startOffset) +#else + -- FIXME unlikely overflow: convert batchLength from Int to Int32 + let storedRecord = K.runPut $ K.RecordFormat 0{- version -} + 0{- max offset -} + batchLength + (K.CompactBytes bs) + Log.debug1 $ "Append(SparseOffset) " + <> "batch length " <> Log.build batchLength + <> ", bytes " <> Log.build (BS.length bs) + <> ", stored bytes " <> Log.build (BS.length storedRecord) + r <- M.observeWithLabel M.topicWriteStoreLatency streamName $ + S.appendCompressedBS ldclient logid storedRecord S.CompressionNone + Nothing + let !partLabel = (streamName, T.pack . show $ partition) + M.withLabel M.topicTotalAppendBytes partLabel $ \counter -> + void $ M.addCounter counter (fromIntegral $ BS.length storedRecord) + M.withLabel M.topicTotalAppendMessages partLabel $ \counter -> + void $ M.addCounter counter (fromIntegral batchLength) + let startOffset = K.composeSparseOffset r.appendCompLSN 0 + Log.debug1 $ "Append(SparseOffset) done " <> Log.build r.appendCompLogID + <> ", lsn: " <> Log.build r.appendCompLSN + <> ", start offset: " <> Log.build startOffset + pure (r.appendCompTimestamp, startOffset) +#endif -- TODO: performance improvements -- diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Produce.hs new file mode 100644 index 000000000..9d4dd39d1 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Produce.hs @@ -0,0 +1,10 @@ +{-# LANGUAGE PatternSynonyms #-} +{-# OPTIONS_GHC -Wno-unused-top-binds #-} + +module HStream.Kafka.Server.Handler.SparseOffset.Produce + ( handleProduceSparseOffset + ) where + +#define HSTREAM_SPARSE_OFFSET +#include "HStream/Kafka/Server/Handler/Produce.hs" +#undef HSTREAM_SPARSE_OFFSET diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 43882b375..57188b8bd 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -174,6 +174,7 @@ library HStream.Kafka.Server.Handler.Offset HStream.Kafka.Server.Handler.Produce HStream.Kafka.Server.Handler.SparseOffset.Offset + HStream.Kafka.Server.Handler.SparseOffset.Produce HStream.Kafka.Server.Handler.Topic HStream.Kafka.Server.Security.SASL @@ -193,9 +194,7 @@ library -- -DASIO_ENABLE_BUFFER_DEBUGGING extra-libraries: rdkafka++ - include-dirs: - . include /usr/local/include external/asio/asio/include - + include-dirs: . include /usr/local/include external/asio/asio/include extra-lib-dirs: /usr/local/lib hs-source-dirs: . build-tool-depends: hsc2hs:hsc2hs