From af8c1501df30d24fec206db042327055f4d08293 Mon Sep 17 00:00:00 2001 From: lesismal Date: Mon, 16 Aug 2021 11:35:36 +0800 Subject: [PATCH] + Handler create message funcs - update nbio examples --- examples/bench_nbio/server/server.go | 36 +++++++++++++++------------- examples/graceful/server/server.go | 3 ++- examples/micro/server/server.go | 2 +- examples/nbio/server/server.go | 35 ++++++++++++++------------- examples/webchat/server.go | 2 +- handler.go | 19 +++++++++++++++ 6 files changed, 61 insertions(+), 36 deletions(-) diff --git a/examples/bench_nbio/server/server.go b/examples/bench_nbio/server/server.go index ba112df..8212d6b 100644 --- a/examples/bench_nbio/server/server.go +++ b/examples/bench_nbio/server/server.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "os" "os/signal" "syscall" @@ -10,6 +11,7 @@ import ( "github.com/lesismal/arpc/log" "github.com/lesismal/nbio" nlog "github.com/lesismal/nbio/logging" + "github.com/lesismal/nbio/mempool" ) var ( @@ -29,15 +31,14 @@ type HelloRsp struct { // Session . type Session struct { - Client *arpc.Client - Buffer []byte + *arpc.Client + bytes.Buffer } func onOpen(c *nbio.Conn) { client := &arpc.Client{Conn: c, Codec: codec.DefaultCodec, Handler: handler} session := &Session{ Client: client, - Buffer: nil, } c.SetSession(session) } @@ -49,20 +50,21 @@ func onData(c *nbio.Conn, data []byte) { return } session := iSession.(*Session) - session.Buffer = append(session.Buffer, data...) - if len(session.Buffer) < arpc.HeadLen { - return + session.Write(data) + for session.Len() >= arpc.HeadLen { + headBuf := session.Bytes()[:4] + header := arpc.Header(headBuf) + total := arpc.HeadLen + header.BodyLen() + if session.Len() < total { + return + } + + buffer := mempool.Malloc(total) + session.Read(buffer) + + msg := handler.NewMessageWithBuffer(buffer) + handler.OnMessage(session.Client, msg) } - - headBuf := session.Buffer[:4] - header := arpc.Header(headBuf) - if len(session.Buffer) < arpc.HeadLen+header.BodyLen() { - return - } - - msg := &arpc.Message{Buffer: session.Buffer[:arpc.HeadLen+header.BodyLen()]} - session.Buffer = session.Buffer[arpc.HeadLen+header.BodyLen():] - handler.OnMessage(session.Client, msg) } func main() { @@ -91,7 +93,7 @@ func main() { } defer g.Stop() - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit } diff --git a/examples/graceful/server/server.go b/examples/graceful/server/server.go index 57f871a..ff60ff8 100644 --- a/examples/graceful/server/server.go +++ b/examples/graceful/server/server.go @@ -5,6 +5,7 @@ import ( "log" "os" "os/signal" + // "sync" "syscall" "time" @@ -32,7 +33,7 @@ func main() { go server.Run("localhost:8888") - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit diff --git a/examples/micro/server/server.go b/examples/micro/server/server.go index beafed0..8b246ce 100644 --- a/examples/micro/server/server.go +++ b/examples/micro/server/server.go @@ -48,7 +48,7 @@ func main() { } defer register.Stop() - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit svr.Stop() diff --git a/examples/nbio/server/server.go b/examples/nbio/server/server.go index 73a6908..960d545 100644 --- a/examples/nbio/server/server.go +++ b/examples/nbio/server/server.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "os" "os/signal" "syscall" @@ -10,6 +11,7 @@ import ( "github.com/lesismal/arpc/log" "github.com/lesismal/nbio" nlog "github.com/lesismal/nbio/logging" + "github.com/lesismal/nbio/mempool" ) var ( @@ -19,15 +21,14 @@ var ( // Session . type Session struct { - Client *arpc.Client - Buffer []byte + *arpc.Client + bytes.Buffer } func onOpen(c *nbio.Conn) { client := &arpc.Client{Conn: c, Codec: codec.DefaultCodec, Handler: handler} session := &Session{ Client: client, - Buffer: nil, } c.SetSession(session) } @@ -39,25 +40,27 @@ func onData(c *nbio.Conn, data []byte) { return } session := iSession.(*Session) - session.Buffer = append(session.Buffer, data...) - if len(session.Buffer) < arpc.HeadLen { - return - } + session.Write(data) + for session.Len() >= arpc.HeadLen { + headBuf := session.Bytes()[:4] + header := arpc.Header(headBuf) + total := arpc.HeadLen + header.BodyLen() + if session.Len() < total { + return + } - headBuf := session.Buffer[:4] - header := arpc.Header(headBuf) - if len(session.Buffer) < arpc.HeadLen+header.BodyLen() { - return - } + buffer := mempool.Malloc(total) + session.Read(buffer) - msg := &arpc.Message{Buffer: session.Buffer[:arpc.HeadLen+header.BodyLen()]} - session.Buffer = session.Buffer[arpc.HeadLen+header.BodyLen():] - handler.OnMessage(session.Client, msg) + msg := handler.NewMessageWithBuffer(buffer) + handler.OnMessage(session.Client, msg) + } } func main() { nlog.SetLogger(log.DefaultLogger) + handler.EnablePool(true) handler.SetAsyncWrite(false) // register router @@ -87,7 +90,7 @@ func main() { } defer g.Stop() - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit } diff --git a/examples/webchat/server.go b/examples/webchat/server.go index 22365e1..a22ec92 100644 --- a/examples/webchat/server.go +++ b/examples/webchat/server.go @@ -163,7 +163,7 @@ func main() { room := NewRoom().Run() server := NewServer(room) - quit := make(chan os.Signal) + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit diff --git a/handler.go b/handler.go index 7e92152..78ae934 100644 --- a/handler.go +++ b/handler.go @@ -11,6 +11,7 @@ import ( "io" "net" + "github.com/lesismal/arpc/codec" "github.com/lesismal/arpc/log" "github.com/lesismal/arpc/util" ) @@ -165,6 +166,13 @@ type Handler interface { Context() (context.Context, context.CancelFunc) SetContext(ctx context.Context, cancel context.CancelFunc) Cancel() + + // NewMessage creates a Message. + NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, codec codec.Codec, values map[interface{}]interface{}) *Message + + // NewMessageWithBuffer creates a message with the buffer and manage the message by the pool. + // The buffer arg should be managed by a pool if EnablePool(true) . + NewMessageWithBuffer(buffer []byte) *Message } // handler represents a default Handler implementation. @@ -672,6 +680,17 @@ func (h *handler) Cancel() { } } +func (h *handler) NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, codec codec.Codec, values map[interface{}]interface{}) *Message { + return newMessage(cmd, method, v, false, false, seq, h, codec, nil) +} + +func (h *handler) NewMessageWithBuffer(buffer []byte) *Message { + msg := messagePool.Get().(*Message) + msg.Buffer = buffer + msg.handler = h + return msg +} + // NewHandler returns a default Handler implementation. func NewHandler() Handler { h := &handler{