From 62d983608ebb2d483f2d51069ebf1a5a87d29052 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Tue, 24 Oct 2023 12:47:23 +0800 Subject: [PATCH] Add grpc channel arguments about keepalive --- hs-grpc-server/HsGrpc/Server.hs | 13 ++- hs-grpc-server/HsGrpc/Server/FFI.hs | 2 + hs-grpc-server/HsGrpc/Server/Types.hsc | 110 +++++++++++++++++++++--- hs-grpc-server/cbits/hs_grpc_server.cpp | 22 +++++ hs-grpc-server/hs-grpc-server.cabal | 6 +- hs-grpc-server/include/hs_grpc_server.h | 15 +++- 6 files changed, 150 insertions(+), 18 deletions(-) diff --git a/hs-grpc-server/HsGrpc/Server.hs b/hs-grpc-server/HsGrpc/Server.hs index 0591661..fd39595 100644 --- a/hs-grpc-server/HsGrpc/Server.hs +++ b/hs-grpc-server/HsGrpc/Server.hs @@ -96,7 +96,9 @@ runServer :: ServerOptions -> [ServiceHandler] -> IO () runServer ServerOptions{..} handlers = do server <- newAsioServer serverHost serverPort serverParallelism - serverSslOptions serverInterceptors + serverSslOptions + serverChannelArgs + serverInterceptors runAsioGrpc server handlers serverOnStarted serverInternalChannelSize ------------------------------------------------------------------------------- @@ -108,14 +110,19 @@ newAsioServer -> Int -- ^ port -> Int -- ^ parallelism -> Maybe SslServerCredentialsOptions + -> [ChannelArg] -> [ServerInterceptor] -> IO AsioServer -newAsioServer host port parallelism m_sslOpts interceptors = do +newAsioServer host port parallelism m_sslOpts chanArgs interceptors = do ptr <- HF.withShortByteString host $ \host' host_len -> HF.withMaybePtr m_sslOpts withSslServerCredentialsOptions $ \sslOpts' -> + withChannelArgs chanArgs $ \chanArgs' chanArgs_size -> HF.withPrimList (map toCItcptFact interceptors) $ \intcept' intcept_size -> - new_asio_server host' host_len port parallelism sslOpts' intcept' intcept_size + new_asio_server host' host_len port parallelism + sslOpts' + chanArgs' chanArgs_size + intcept' intcept_size if ptr == nullPtr then Ex.throwIO $ ServerException "newGrpcServer failed!" else newForeignPtr delete_asio_server_fun ptr where diff --git a/hs-grpc-server/HsGrpc/Server/FFI.hs b/hs-grpc-server/HsGrpc/Server/FFI.hs index 6f6288c..5e87f8d 100644 --- a/hs-grpc-server/HsGrpc/Server/FFI.hs +++ b/hs-grpc-server/HsGrpc/Server/FFI.hs @@ -24,6 +24,8 @@ foreign import ccall unsafe "new_asio_server" -- ^ parallelism -> Ptr SslServerCredentialsOptions -- ^ tls options + -> Ptr ChannelArg -> Int + -- ^ Grpc Channel arguments -> Ptr (Ptr CServerInterceptorFactory) -> Int -- ^ Interceptors -> IO (Ptr CppAsioServer) diff --git a/hs-grpc-server/HsGrpc/Server/Types.hsc b/hs-grpc-server/HsGrpc/Server/Types.hsc index 3e95a28..f350ed6 100644 --- a/hs-grpc-server/HsGrpc/Server/Types.hsc +++ b/hs-grpc-server/HsGrpc/Server/Types.hsc @@ -1,4 +1,6 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} +{-# LANGUAGE MagicHash #-} {-# LANGUAGE PatternSynonyms #-} module HsGrpc.Server.Types @@ -54,6 +56,17 @@ module HsGrpc.Server.Types , CServerInterceptorFactory , ServerInterceptor (..) + -- * Channel arguments + , ChannelArg + , ChanArgValue (..) + , mk_GRPC_ARG_KEEPALIVE_TIME_MS + , mk_GRPC_ARG_KEEPALIVE_TIMEOUT_MS + , mk_GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS + , mk_GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS + , mk_GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA + , mk_GRPC_ARG_HTTP2_MAX_PING_STRIKES + , withChannelArgs -- XXX: this function should be in a Internal module + -- * Internal Types , Request (..) , Response (..) @@ -75,6 +88,7 @@ module HsGrpc.Server.Types ) where import Control.Exception (Exception, bracket, throwIO) +import Control.Monad (forM_) import Data.ByteString (ByteString) import Data.ByteString.Short (ShortByteString) import qualified Data.ByteString.Unsafe as BS @@ -82,9 +96,10 @@ import Data.Maybe (isJust) import Data.ProtoLens.Service.Types (StreamingType (..)) import Data.Text (Text) import Data.Word (Word64, Word8) -import Foreign.Marshal.Alloc (allocaBytesAligned) -import Foreign.Ptr (FunPtr, Ptr, freeHaskellFunPtr, - nullPtr) +import Foreign.C.Types +import Foreign.Marshal.Alloc +import Foreign.Marshal.Array +import Foreign.Ptr import Foreign.StablePtr (StablePtr) import Foreign.Storable (Storable (..)) import GHC.Conc (PrimMVar) @@ -93,6 +108,7 @@ import qualified HsForeign as HF import HsGrpc.Common.Foreign.Channel import HsGrpc.Server.Internal.Types +#include #include "hs_grpc_server.h" ------------------------------------------------------------------------------- @@ -104,6 +120,7 @@ data ServerOptions = ServerOptions , serverSslOptions :: !(Maybe SslServerCredentialsOptions) , serverOnStarted :: !(Maybe (IO ())) , serverInterceptors :: ![ServerInterceptor] + , serverChannelArgs :: ![ChannelArg] -- The following options are considering as internal , serverInternalChannelSize :: !Word } @@ -116,6 +133,7 @@ defaultServerOpts = ServerOptions , serverSslOptions = Nothing , serverOnStarted = Nothing , serverInterceptors = [] + , serverChannelArgs = [] , serverInternalChannelSize = 2 } @@ -126,6 +144,7 @@ instance Show ServerOptions where <> "port: " <> show serverPort <> ", " <> "parallelism: " <> show serverParallelism <> ", " <> "sslOptions: " <> show serverSslOptions <> ", " + <> "channelArgs: " <> show serverChannelArgs <> ", " <> "onStartedEvent: " <> notifyFn serverOnStarted <> "}" @@ -240,16 +259,11 @@ instance Storable Response where (#poke hsgrpc::server_response_t, data) ptr data_ptr (#poke hsgrpc::server_response_t, data_size) ptr data_size (#poke hsgrpc::server_response_t, status_code) ptr (unStatusCode responseStatusCode) - errmsg_ptr <- maybeNewStdString responseErrorMsg + errmsg_ptr <- HF.maybeNewStdString responseErrorMsg (#poke hsgrpc::server_response_t, error_msg) ptr errmsg_ptr - errdetails_ptr <- maybeNewStdString responseErrorDetails + errdetails_ptr <- HF.maybeNewStdString responseErrorDetails (#poke hsgrpc::server_response_t, error_details) ptr errdetails_ptr --- TODO: upgrade foreign package to use HS.maybeNewStdString -maybeNewStdString :: Maybe ByteString -> IO (Ptr HF.StdString) -maybeNewStdString Nothing = pure nullPtr -maybeNewStdString (Just bs) = HF.withByteString bs $ HF.hs_new_std_string - ------------------------------------------------------------------------------- pattern C_StreamingType_NonStreaming :: Word8 @@ -468,6 +482,82 @@ newtype StatusCode = StatusCode { unStatusCode :: Int } , StatusDoNotUse #-} +------------------------------------------------------------------------------- +-- Grpc channel arguments +-- +-- https://grpc.github.io/grpc/core/group__grpc__arg__keys.html + +data ChanArgValue + = ChanArgValueInt CInt + | ChanArgValueString ShortByteString + deriving (Show, Eq) + +newtype ChannelArg = ChannelArg + { unChannelArg :: (ShortByteString, ChanArgValue) } + deriving (Show, Eq) + +instance Storable ChannelArg where + sizeOf _ = (#size hsgrpc::hs_grpc_channel_arg_t) + alignment _ = (#alignment hsgrpc::hs_grpc_channel_arg_t) + peek _ptr = error "Unimplemented" + poke ptr (ChannelArg (key, val)) = do + key_ptr <- HF.newStdStringFromShort key -- should be deleted on cpp side + (#poke hsgrpc::hs_grpc_channel_arg_t, key) ptr key_ptr + case val of + ChanArgValueInt i -> do + (#poke hsgrpc::hs_grpc_channel_arg_t, type) + ptr + ((#const static_cast(hsgrpc::GrpcChannelArgValType::Int)) :: Word8) + (#poke hsgrpc::hs_grpc_channel_arg_t, value.int_value) + ptr i + ChanArgValueString s -> do + (#poke hsgrpc::hs_grpc_channel_arg_t, type) + ptr + ((#const static_cast(hsgrpc::GrpcChannelArgValType::String)) :: Word8) + value_ptr <- HF.newStdStringFromShort s -- should be deleted on cpp side + (#poke hsgrpc::hs_grpc_channel_arg_t, value.string_value) ptr value_ptr + +withChannelArgs :: [ChannelArg] -> (Ptr ChannelArg -> Int -> IO a) -> IO a +withChannelArgs args f = do + let !len = length args + allocaArray @ChannelArg len $ \ptr -> do + forM_ (zip [0..len-1] args) $ \(i, arg) -> pokeElemOff ptr i arg + f ptr len + +#define hsc_mk_chan_args(c, vt, vw) \ + hsc_printf("pattern %s :: ShortByteString\n", #c, #c); \ + hsc_printf("pattern %s = ", #c); hsc_const_str(c); \ + hsc_printf("\n"); \ + hsc_printf("mk_%s :: %s -> ChannelArg\n", #c, #vt); \ + hsc_printf("mk_%s v = ChannelArg (%s, %s v)\n", #c, #c, #vw); + +-- | After a duration of this time the client/server pings its peer to see if +-- the transport is still alive. Int valued, milliseconds. +#mk_chan_args GRPC_ARG_KEEPALIVE_TIME_MS, CInt, ChanArgValueInt + +-- | After waiting for a duration of this time, if the keepalive ping sender +-- does not receive the ping ack, it will close the transport. Int valued, +-- milliseconds. +#mk_chan_args GRPC_ARG_KEEPALIVE_TIMEOUT_MS, CInt, ChanArgValueInt + +-- | Is it permissible to send keepalive pings from the client without any +-- outstanding streams. Int valued, 0(false)/1(true). +#mk_chan_args GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, CInt, ChanArgValueInt + +-- | Minimum allowed time between a server receiving successive ping frames +-- without sending any data/header frame. Int valued, milliseconds +#mk_chan_args GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, CInt, ChanArgValueInt + +-- | How many pings can the client send before needing to send a +-- data/header frame? (0 indicates that an infinite number of +-- pings can be sent without sending a data frame or header frame) +#mk_chan_args GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, CInt, ChanArgValueInt + +-- | How many misbehaving pings the server can bear before sending goaway and +-- closing the transport? (0 indicates that the server can bear an infinite +-- number of misbehaving pings) +#mk_chan_args GRPC_ARG_HTTP2_MAX_PING_STRIKES, CInt, ChanArgValueInt + ------------------------------------------------------------------------------- -- Interceptors diff --git a/hs-grpc-server/cbits/hs_grpc_server.cpp b/hs-grpc-server/cbits/hs_grpc_server.cpp index f87df72..208e4a6 100644 --- a/hs-grpc-server/cbits/hs_grpc_server.cpp +++ b/hs-grpc-server/cbits/hs_grpc_server.cpp @@ -483,6 +483,8 @@ CppAsioServer* new_asio_server( const char* host, HsInt host_len, HsInt port, HsInt parallelism, // ssl options hsgrpc::hs_ssl_server_credentials_options_t* ssl_server_opts, + // grpc channel args + hsgrpc::hs_grpc_channel_arg_t* grpc_chan_args, HsInt grpc_chan_args_size, // interceptors grpc::experimental::ServerInterceptorFactoryInterface** interceptor_facts, HsInt interceptors_size) { @@ -528,6 +530,26 @@ CppAsioServer* new_asio_server( builder.RegisterAsyncGenericService(&server_data->service_); + // Set grpc channel args + for (auto i = 0; i < grpc_chan_args_size; ++i) { + auto& arg = grpc_chan_args[i]; + switch (arg.type) { + case hsgrpc::GrpcChannelArgValType::Int: + gpr_log(GPR_DEBUG, "AddChannelArgument: (%s, %d)", arg.key->c_str(), + arg.value.int_value); + builder.AddChannelArgument(*arg.key, arg.value.int_value); + delete arg.key; + break; + case hsgrpc::GrpcChannelArgValType::String: + gpr_log(GPR_DEBUG, "AddChannelArgument: (%s, %s)", arg.key->c_str(), + arg.value.string_value->c_str()); + builder.AddChannelArgument(*arg.key, *arg.value.string_value); + delete arg.key; + delete arg.value.string_value; + break; + } + } + if (interceptors_size > 0) { std::vector< std::unique_ptr> diff --git a/hs-grpc-server/hs-grpc-server.cabal b/hs-grpc-server/hs-grpc-server.cabal index f88a4f9..89f78e5 100644 --- a/hs-grpc-server/hs-grpc-server.cabal +++ b/hs-grpc-server/hs-grpc-server.cabal @@ -83,9 +83,9 @@ library build-depends: , async ^>=2.2 - , base >=4.14 && <5 - , bytestring >=0.10 && <0.12 - , foreign ^>=0.2 + , base >=4.14 && <5 + , bytestring >=0.10 && <0.12 + , foreign ^>=0.2.1 , ghc-prim , microlens , primitive diff --git a/hs-grpc-server/include/hs_grpc_server.h b/hs-grpc-server/include/hs_grpc_server.h index 6b4e3a6..8d0e68f 100644 --- a/hs-grpc-server/include/hs_grpc_server.h +++ b/hs-grpc-server/include/hs_grpc_server.h @@ -17,8 +17,8 @@ using ChannelOut = asio::experimental::concurrent_channel