Skip to content

Commit

Permalink
+ Handler create message funcs
Browse files Browse the repository at this point in the history
- update nbio examples
  • Loading branch information
lesismal committed Aug 16, 2021
1 parent e2ea51b commit af8c150
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 36 deletions.
36 changes: 19 additions & 17 deletions examples/bench_nbio/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"os"
"os/signal"
"syscall"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/lesismal/arpc/log"
"github.com/lesismal/nbio"
nlog "github.com/lesismal/nbio/logging"
"github.com/lesismal/nbio/mempool"
)

var (
Expand All @@ -29,15 +31,14 @@ type HelloRsp struct {

// Session .
type Session struct {
Client *arpc.Client
Buffer []byte
*arpc.Client
bytes.Buffer
}

func onOpen(c *nbio.Conn) {
client := &arpc.Client{Conn: c, Codec: codec.DefaultCodec, Handler: handler}
session := &Session{
Client: client,
Buffer: nil,
}
c.SetSession(session)
}
Expand All @@ -49,20 +50,21 @@ func onData(c *nbio.Conn, data []byte) {
return
}
session := iSession.(*Session)
session.Buffer = append(session.Buffer, data...)
if len(session.Buffer) < arpc.HeadLen {
return
session.Write(data)
for session.Len() >= arpc.HeadLen {
headBuf := session.Bytes()[:4]
header := arpc.Header(headBuf)
total := arpc.HeadLen + header.BodyLen()
if session.Len() < total {
return
}

buffer := mempool.Malloc(total)
session.Read(buffer)

msg := handler.NewMessageWithBuffer(buffer)
handler.OnMessage(session.Client, msg)
}

headBuf := session.Buffer[:4]
header := arpc.Header(headBuf)
if len(session.Buffer) < arpc.HeadLen+header.BodyLen() {
return
}

msg := &arpc.Message{Buffer: session.Buffer[:arpc.HeadLen+header.BodyLen()]}
session.Buffer = session.Buffer[arpc.HeadLen+header.BodyLen():]
handler.OnMessage(session.Client, msg)
}

func main() {
Expand Down Expand Up @@ -91,7 +93,7 @@ func main() {
}
defer g.Stop()

quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
3 changes: 2 additions & 1 deletion examples/graceful/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"os/signal"

// "sync"
"syscall"
"time"
Expand Down Expand Up @@ -32,7 +33,7 @@ func main() {

go server.Run("localhost:8888")

quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

Expand Down
2 changes: 1 addition & 1 deletion examples/micro/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
}
defer register.Stop()

quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
svr.Stop()
Expand Down
35 changes: 19 additions & 16 deletions examples/nbio/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"os"
"os/signal"
"syscall"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/lesismal/arpc/log"
"github.com/lesismal/nbio"
nlog "github.com/lesismal/nbio/logging"
"github.com/lesismal/nbio/mempool"
)

var (
Expand All @@ -19,15 +21,14 @@ var (

// Session .
type Session struct {
Client *arpc.Client
Buffer []byte
*arpc.Client
bytes.Buffer
}

func onOpen(c *nbio.Conn) {
client := &arpc.Client{Conn: c, Codec: codec.DefaultCodec, Handler: handler}
session := &Session{
Client: client,
Buffer: nil,
}
c.SetSession(session)
}
Expand All @@ -39,25 +40,27 @@ func onData(c *nbio.Conn, data []byte) {
return
}
session := iSession.(*Session)
session.Buffer = append(session.Buffer, data...)
if len(session.Buffer) < arpc.HeadLen {
return
}
session.Write(data)
for session.Len() >= arpc.HeadLen {
headBuf := session.Bytes()[:4]
header := arpc.Header(headBuf)
total := arpc.HeadLen + header.BodyLen()
if session.Len() < total {
return
}

headBuf := session.Buffer[:4]
header := arpc.Header(headBuf)
if len(session.Buffer) < arpc.HeadLen+header.BodyLen() {
return
}
buffer := mempool.Malloc(total)
session.Read(buffer)

msg := &arpc.Message{Buffer: session.Buffer[:arpc.HeadLen+header.BodyLen()]}
session.Buffer = session.Buffer[arpc.HeadLen+header.BodyLen():]
handler.OnMessage(session.Client, msg)
msg := handler.NewMessageWithBuffer(buffer)
handler.OnMessage(session.Client, msg)
}
}

func main() {
nlog.SetLogger(log.DefaultLogger)

handler.EnablePool(true)
handler.SetAsyncWrite(false)

// register router
Expand Down Expand Up @@ -87,7 +90,7 @@ func main() {
}
defer g.Stop()

quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
2 changes: 1 addition & 1 deletion examples/webchat/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func main() {
room := NewRoom().Run()
server := NewServer(room)

quit := make(chan os.Signal)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

Expand Down
19 changes: 19 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net"

"github.com/lesismal/arpc/codec"
"github.com/lesismal/arpc/log"
"github.com/lesismal/arpc/util"
)
Expand Down Expand Up @@ -165,6 +166,13 @@ type Handler interface {
Context() (context.Context, context.CancelFunc)
SetContext(ctx context.Context, cancel context.CancelFunc)
Cancel()

// NewMessage creates a Message.
NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, codec codec.Codec, values map[interface{}]interface{}) *Message

// NewMessageWithBuffer creates a message with the buffer and manage the message by the pool.
// The buffer arg should be managed by a pool if EnablePool(true) .
NewMessageWithBuffer(buffer []byte) *Message
}

// handler represents a default Handler implementation.
Expand Down Expand Up @@ -672,6 +680,17 @@ func (h *handler) Cancel() {
}
}

func (h *handler) NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, codec codec.Codec, values map[interface{}]interface{}) *Message {
return newMessage(cmd, method, v, false, false, seq, h, codec, nil)
}

func (h *handler) NewMessageWithBuffer(buffer []byte) *Message {
msg := messagePool.Get().(*Message)
msg.Buffer = buffer
msg.handler = h
return msg
}

// NewHandler returns a default Handler implementation.
func NewHandler() Handler {
h := &handler{
Expand Down

0 comments on commit af8c150

Please sign in to comment.