Skip to content

Commit

Permalink
fix(cpp kafka): make haskell callback asynchronous (#1754)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Feb 3, 2024
1 parent 3bb684f commit 75f8d1f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 14 deletions.
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ checkGroupGenerationId :: Group -> Int32 -> IO ()
checkGroupGenerationId Group{..} generationId = do
currentGenerationId <- IO.readIORef groupGenerationId
M.unless (currentGenerationId == generationId) $ do
Log.info $ "invalid generation id"
Log.warning $ "invalid generation id"
<> ", current generationId:" <> Log.buildString' currentGenerationId
<> ", expected generationId:" <> Log.buildString' generationId
<> ", group:" <> Log.build groupId
Expand Down
3 changes: 2 additions & 1 deletion hstream-kafka/HStream/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ runCppServer opts sc_ mkAuthedHandlers =
processorCallback :: Cxx.ProcessorCallback ServerContext
processorCallback sptr request_ptr response_ptr =
-- nullPtr means client closed the connection
when (castStablePtrToPtr sptr /= nullPtr) $ do
when (castStablePtrToPtr sptr /= nullPtr) $ void $ forkIO $ do
(sc, conn) <- deRefStablePtr sptr
let handlers = mkAuthedHandlers sc
req <- peek request_ptr
Expand All @@ -232,6 +232,7 @@ runCppServer opts sc_ mkAuthedHandlers =
(\err -> do Log.fatal $ Log.buildString' (err :: E.SomeException)
pure Nothing)
poke response_ptr Cxx.Response{responseData = respBs}
Cxx.release_lock req.requestLock

-------------------------------------------------------------------------------
-- Server misc
Expand Down
9 changes: 9 additions & 0 deletions hstream-kafka/HStream/Kafka/Network/Cxx.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module HStream.Kafka.Network.Cxx
, new_kafka_server
, run_kafka_server
, stop_kafka_server
, release_lock
--
, getSystemEventManager'
, withFdEventNotification
Expand Down Expand Up @@ -48,8 +49,11 @@ import System.Posix.Types (Fd (..))

#include "hs_kafka_server.h"

data CppLock

data Request = Request
{ requestPayload :: ByteString
, requestLock :: Ptr CppLock
} deriving (Show)

instance Storable Request where
Expand All @@ -69,7 +73,9 @@ instance Storable Request where
--
-- BS.unsafePackCStringLen (nullPtr, 0) === ""
payload <- BS.unsafePackCStringLen (data_ptr, fromIntegral data_size)
lock <- (#peek server_request_t, lock) ptr
return $ Request{ requestPayload = payload
, requestLock = lock
}
poke _ptr _req = error "Request is not pokeable"

Expand Down Expand Up @@ -161,6 +167,9 @@ foreign import ccall safe "run_kafka_server"
foreign import ccall safe "stop_kafka_server"
stop_kafka_server :: Ptr CppKafkaServer -> IO ()

foreign import ccall unsafe "release_lock"
release_lock :: Ptr CppLock -> IO ()

-------------------------------------------------------------------------------
-- Copy from foreign: HsForeign.String
--
Expand Down
47 changes: 36 additions & 11 deletions hstream-kafka/cbits/hs_kafka_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,27 @@ using namespace wangle;
// Note: this should be the end of pipeline
class ServerHandler : public HandlerAdapter<std::unique_ptr<folly::IOBuf>> {
public:
explicit ServerHandler(HsCallback& callback, HsStablePtr& sp)
: callback_(callback), sp_(sp) {}
explicit ServerHandler(HsCallback& callback, HsStablePtr& sp,
std::shared_ptr<folly::Executor> exe)
: callback_(callback), sp_(sp), exe_(exe) {}

void read(Context* ctx, std::unique_ptr<folly::IOBuf> request) override {
folly::fbstring request_ = request->moveToFbString();
void read(Context* ctx, std::unique_ptr<folly::IOBuf> req) override {
folly::fbstring request_ = req->moveToFbString();

server_request_t hs_req{(uint8_t*)request_.data(), request_.size()};
folly::Promise<folly::Unit> lock;
folly::Future<folly::Unit> lock_fu = lock.getSemiFuture().via(exe_.get());

server_request_t hs_req{(uint8_t*)request_.data(), request_.size(), &lock};
server_response_t hs_resp;

callback_(sp_, &hs_req, &hs_resp);

std::move(lock_fu).get();

if (hs_resp.data != nullptr) {
std::unique_ptr<folly::IOBuf> response =
std::unique_ptr<folly::IOBuf> resp =
folly::IOBuf::takeOwnership(hs_resp.data, hs_resp.data_size);
ctx->fireWrite(std::move(response));
ctx->fireWrite(std::move(resp));
} else {
ctx->fireClose();
}
Expand All @@ -53,6 +59,7 @@ class ServerHandler : public HandlerAdapter<std::unique_ptr<folly::IOBuf>> {
private:
HsCallback& callback_;
HsStablePtr sp_;
std::shared_ptr<folly::Executor> exe_;

void onClientExit() {
// FIXME: lock guard here?
Expand All @@ -66,8 +73,9 @@ class ServerHandler : public HandlerAdapter<std::unique_ptr<folly::IOBuf>> {

class RpcPipelineFactory : public PipelineFactory<DefaultPipeline> {
public:
explicit RpcPipelineFactory(HsCallback callback, HsNewStablePtr newConnCtx)
: callback_(callback), newConnCtx_(newConnCtx) {}
explicit RpcPipelineFactory(HsCallback callback, HsNewStablePtr newConnCtx,
std::shared_ptr<folly::Executor> exe)
: callback_(callback), newConnCtx_(newConnCtx), exe_(exe) {}

DefaultPipeline::Ptr
newPipeline(std::shared_ptr<folly::AsyncTransport> sock) override {
Expand All @@ -82,14 +90,15 @@ class RpcPipelineFactory : public PipelineFactory<DefaultPipeline> {
pipeline->addBack(LengthFieldBasedFrameDecoder());
pipeline->addBack(LengthFieldPrepender());
// Haskell handler
pipeline->addBack(ServerHandler(callback_, hssp));
pipeline->addBack(ServerHandler(callback_, hssp, exe_));
pipeline->finalize();
return pipeline;
}

private:
HsCallback callback_;
HsNewStablePtr newConnCtx_;
std::shared_ptr<folly::Executor> exe_;
};

void hs_event_notify(int& fd) {
Expand Down Expand Up @@ -118,8 +127,20 @@ void run_kafka_server(ServerBootstrap<DefaultPipeline>* server,
auto addr = folly::SocketAddress{host, port};
free((void*)host);

auto threads = std::thread::hardware_concurrency();
if (threads <= 0) {
// Reasonable mid-point for concurrency when actual value unknown
threads = 8;
}
auto io_group = std::make_shared<folly::IOThreadPoolExecutor>(
threads, std::make_shared<folly::NamedThreadFactory>("IO Thread"));

server->childPipeline(
std::make_shared<RpcPipelineFactory>(callback, newConnCtx));
std::make_shared<RpcPipelineFactory>(callback, newConnCtx, io_group));

// Using default accept_group (a single thread)
server->group(io_group);

server->bind(addr);

hs_event_notify(fd_on_started);
Expand All @@ -133,5 +154,9 @@ void stop_kafka_server(ServerBootstrap<DefaultPipeline>* server) {
server->stop();
}

void release_lock(folly::Promise<folly::Unit>* p) {
p->setValue(folly::Unit{});
}

// ---
}
9 changes: 8 additions & 1 deletion hstream-kafka/include/hs_kafka_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@
#include <HsFFI.h>
#include <cstdint>

#include <folly/Unit.h>
#include <folly/futures/Promise.h>

struct server_request_t {
uint8_t* data;
size_t data_size;
folly::Promise<folly::Unit>* lock;
};

struct server_response_t {
uint8_t* data;
// Must be initialized to nullptr. Because we'll use it to check if the
// response is empty, and the haskell land may not set it to nullptr
// (when the StablePtr == nullPtr).
uint8_t* data = nullptr;
size_t data_size;
};

Expand Down

0 comments on commit 75f8d1f

Please sign in to comment.