Skip to content

Commit

Permalink
- support both client and server sync write
Browse files Browse the repository at this point in the history
  • Loading branch information
lesismal committed Aug 3, 2021
1 parent 9c6a620 commit fbe0af1
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 66 deletions.
146 changes: 83 additions & 63 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ type Client struct {
Dialer DialerFunc
Head Header

IsAsyncWrite bool

running bool
reconnecting bool

Expand Down Expand Up @@ -131,7 +129,7 @@ func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout t
c.deleteSession(seq)
}()

if c.IsAsyncWrite {
if c.Handler.AsyncWrite() {
select {
case c.chSend <- msg:
case <-timer.C:
Expand All @@ -142,13 +140,18 @@ func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout t
return ErrClientStopped
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
if _, err := c.Handler.Send(c.Conn, msg.Buffer); err != nil {
c.Conn.Close()
return err
if !c.reconnecting {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
if _, err := c.Handler.Send(c.Conn, msg.Buffer); err != nil {
c.Conn.Close()
return err
}
} else {
c.dropMessage(msg)
return ErrClientReconnecting
}
}

Expand Down Expand Up @@ -176,7 +179,7 @@ func (c *Client) CallWith(ctx context.Context, method string, req interface{}, r
c.addSession(seq, sess)
defer c.deleteSession(seq)

if c.IsAsyncWrite {
if c.Handler.AsyncWrite() {
select {
case c.chSend <- msg:
case <-ctx.Done():
Expand All @@ -187,13 +190,18 @@ func (c *Client) CallWith(ctx context.Context, method string, req interface{}, r
return ErrClientStopped
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
if _, err := c.Handler.Send(c.Conn, msg.Buffer); err != nil {
c.Conn.Close()
return err
if !c.reconnecting {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
if _, err := c.Handler.Send(c.Conn, msg.Buffer); err != nil {
c.Conn.Close()
return err
}
} else {
c.dropMessage(msg)
return ErrClientReconnecting
}
}

Expand Down Expand Up @@ -230,21 +238,26 @@ func (c *Client) CallAsync(method string, req interface{}, handler HandlerFunc,
defer timer.Stop()
}

if c.IsAsyncWrite {
if c.Handler.AsyncWrite() {
switch timeout {
case TimeZero:
err = c.pushMessage(msg, nil)
default:
err = c.pushMessage(msg, timer)
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
_, err = c.Handler.Send(c.Conn, msg.Buffer)
if err != nil {
c.Conn.Close()
if !c.reconnecting {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
_, err = c.Handler.Send(c.Conn, msg.Buffer)
if err != nil {
c.Conn.Close()
}
} else {
c.dropMessage(msg)
err = ErrClientReconnecting
}
}

Expand All @@ -265,7 +278,7 @@ func (c *Client) Notify(method string, data interface{}, timeout time.Duration,

msg := c.newRequestMessage(CmdNotify, method, data, false, true, args...)

if c.IsAsyncWrite {
if c.Handler.AsyncWrite() {
switch timeout {
case TimeZero:
err = c.pushMessage(msg, nil)
Expand All @@ -275,13 +288,18 @@ func (c *Client) Notify(method string, data interface{}, timeout time.Duration,
err = c.pushMessage(msg, timer)
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
_, err = c.Handler.Send(c.Conn, msg.Buffer)
if err != nil {
c.Conn.Close()
if !c.reconnecting {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
_, err = c.Handler.Send(c.Conn, msg.Buffer)
if err != nil {
c.Conn.Close()
}
} else {
c.dropMessage(msg)
err = ErrClientReconnecting
}
}

Expand All @@ -297,7 +315,7 @@ func (c *Client) NotifyWith(ctx context.Context, method string, data interface{}

msg := c.newRequestMessage(CmdNotify, method, data, false, true, args...)

if c.IsAsyncWrite {
if c.Handler.AsyncWrite() {
select {
case c.chSend <- msg:
case <-ctx.Done():
Expand All @@ -308,13 +326,18 @@ func (c *Client) NotifyWith(ctx context.Context, method string, data interface{}
return ErrClientStopped
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
if _, err := c.Handler.Send(c.Conn, msg.Buffer); err != nil {
c.Conn.Close()
return err
if !c.reconnecting {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
if _, err := c.Handler.Send(c.Conn, msg.Buffer); err != nil {
c.Conn.Close()
return err
}
} else {
c.dropMessage(msg)
return ErrClientReconnecting
}
}

Expand All @@ -328,16 +351,21 @@ func (c *Client) PushMsg(msg *Message, timeout time.Duration) error {
return err
}

if !c.IsAsyncWrite {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
_, err := c.Handler.Send(c.Conn, msg.Buffer)
if err != nil {
c.Conn.Close()
if !c.Handler.AsyncWrite() {
if !c.reconnecting {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
_, err := c.Handler.Send(c.Conn, msg.Buffer)
if err != nil {
c.Conn.Close()
}
return err
} else {
c.dropMessage(msg)
return ErrClientReconnecting
}
return err
}

if timeout < 0 {
Expand Down Expand Up @@ -390,7 +418,7 @@ func (c *Client) Restart() error {
c.values = map[interface{}]interface{}{}

c.initReader()
if c.IsAsyncWrite {
if c.Handler.AsyncWrite() {
go util.Safe(c.sendLoop)
}
go util.Safe(c.recvLoop)
Expand Down Expand Up @@ -622,7 +650,7 @@ func (c *Client) run() {
if !c.running {
c.running = true
c.initReader()
if c.IsAsyncWrite {
if c.Handler.AsyncWrite() {
go util.Safe(c.sendLoop)
}
go util.Safe(c.recvLoop)
Expand All @@ -635,7 +663,7 @@ func (c *Client) runWebsocket() {
if !c.running {
c.running = true
c.initReader()
if c.IsAsyncWrite {
if c.Handler.AsyncWrite() {
go util.Safe(c.sendLoop)
}
c.Conn.(WebsocketConn).HandleWebsocket(c.recvLoop)
Expand Down Expand Up @@ -822,19 +850,12 @@ func newClientWithConn(conn net.Conn, codec codec.Codec, handler Handler, onStop
}

// NewClient creates a Client.
func NewClient(dialer DialerFunc, args ...interface{}) (*Client, error) {
func NewClient(dialer DialerFunc) (*Client, error) {
conn, err := dialer()
if err != nil {
return nil, err
}

isAsyncWrite := true
if len(args) > 0 {
if asyncWrite, ok := args[0].(bool); ok {
isAsyncWrite = asyncWrite
}
}

c := &Client{}
c.Conn = conn
c.Codec = codec.DefaultCodec
Expand All @@ -845,7 +866,6 @@ func NewClient(dialer DialerFunc, args ...interface{}) (*Client, error) {
c.chClose = make(chan util.Empty)
c.sessionMap = make(map[uint64]*rpcSession)
c.asyncHandlerMap = make(map[uint64]HandlerFunc)
c.IsAsyncWrite = isAsyncWrite

c.run()

Expand Down
2 changes: 1 addition & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (ctx *Context) Value(key interface{}) interface{} {

func (ctx *Context) write(v interface{}, isError bool, timeout time.Duration) error {
cli := ctx.Client
if cli.IsAsyncWrite {
if !cli.Handler.AsyncWrite() {
return ctx.writeDirectly(v, isError)
}
req := ctx.Message
Expand Down
4 changes: 3 additions & 1 deletion examples/bench_nbio/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Session struct {
}

func onOpen(c *nbio.Conn) {
client := &arpc.Client{Conn: c, Codec: codec.DefaultCodec, IsAsync: true}
client := &arpc.Client{Conn: c, Codec: codec.DefaultCodec, Handler: handler}
session := &Session{
Client: client,
Buffer: nil,
Expand Down Expand Up @@ -68,6 +68,8 @@ func onData(c *nbio.Conn, data []byte) {
func main() {
nlog.SetLogger(log.DefaultLogger)

handler.SetAsyncWrite(false)

// register router
handler.Handle("Hello", func(ctx *arpc.Context) {
req := &HelloReq{}
Expand Down
4 changes: 3 additions & 1 deletion examples/nbio/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Session struct {
}

func onOpen(c *nbio.Conn) {
client := &arpc.Client{Conn: c, Codec: codec.DefaultCodec, IsAsync: true}
client := &arpc.Client{Conn: c, Codec: codec.DefaultCodec, Handler: handler}
session := &Session{
Client: client,
Buffer: nil,
Expand Down Expand Up @@ -58,6 +58,8 @@ func onData(c *nbio.Conn, data []byte) {
func main() {
nlog.SetLogger(log.DefaultLogger)

handler.SetAsyncWrite(false)

// register router
handler.Handle("/echo", func(ctx *arpc.Context) {
str := ""
Expand Down
14 changes: 14 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ type Handler interface {
// SetBatchSend sets BatchSend flag.
SetBatchSend(batch bool)

// AsyncWrite returns AsyncWrite flag.
AsyncWrite() bool
// SetAsyncWrite sets AsyncWrite flag.
SetAsyncWrite(async bool)

// AsyncResponse returns AsyncResponse flag.
AsyncResponse() bool
// SetAsyncResponse sets AsyncResponse flag.
Expand Down Expand Up @@ -140,6 +145,7 @@ type handler struct {
logtag string
batchRecv bool
batchSend bool
asyncWrite bool
asyncResponse bool
recvBufferSize int
sendQueueSize int
Expand Down Expand Up @@ -283,6 +289,14 @@ func (h *handler) SetBatchSend(batch bool) {
h.batchSend = batch
}

func (h *handler) AsyncWrite() bool {
return h.asyncWrite
}

func (h *handler) SetAsyncWrite(async bool) {
h.asyncWrite = async
}

func (h *handler) AsyncResponse() bool {
return h.asyncResponse
}
Expand Down

0 comments on commit fbe0af1

Please sign in to comment.