Skip to content

Commit

Permalink
Merge pull request #46 from lesismal/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
lesismal authored Jan 30, 2023
2 parents 5d0e0e9 + d789eb2 commit b2338d7
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[4]: https://github.com/avelino/awesome-go#distributed-systems
[5]: https://img.shields.io/badge/license-MIT-blue.svg
[6]: LICENSE
[7]: https://img.shields.io/github/workflow/status/lesismal/arpc/build-linux?style=flat-square&logo=github-actions
[7]: https://img.shields.io/github/actions/workflow/status/lesismal/arpc/build_linux.yml?branch=master&style=flat-square&logo=github-actions
[8]: https://github.com/lesismal/arpc/actions?query=workflow%3build-linux
[9]: https://goreportcard.com/badge/github.com/lesismal/arpc
[10]: https://goreportcard.com/report/github.com/lesismal/arpc
Expand Down
24 changes: 23 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,29 @@ func (c *Client) Delete(key interface{}) {
}
}

// Ping .
func (c *Client) Ping() {
c.Conn.Write(PingMessage.Buffer)
}

// Pong .
func (c *Client) Pong() {
c.Conn.Write(PongMessage.Buffer)
}

// Ping .
func (c *Client) Keepalive(interval time.Duration) {
if c.running {
if interval <= 0 {
interval = time.Second * 30
}
time.AfterFunc(interval, func() {
c.Ping()
c.Keepalive(interval)
})
}
}

// NewMessage creates a Message by client's seq, handler and codec.
func (c *Client) NewMessage(cmd byte, method string, v interface{}, args ...interface{}) *Message {
if len(args) == 0 {
Expand Down Expand Up @@ -850,7 +873,6 @@ func (c *Client) batchSendLoop() {
}
}
}

if !c.reconnecting {
chLen = len(c.chSend)
coders = c.Handler.Coders()
Expand Down
5 changes: 5 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func init() {
HandleDisconnected(func(*Client) {})
HandleDisconnected(func(*Client) {})
UseCoder(new(CoderTest))
ReadTimeout()
SetReadTimeout(time.Second * 60)
WriteTimeout()
SetWriteTimeout(time.Second * 5)
Use(nil)
Use(func(ctx *Context) {})
Handle("-", func(ctx *Context) {})
Expand Down Expand Up @@ -251,6 +255,7 @@ func testClientCallMethodString(c *Client, t *testing.T) {
// } else if rsp != req {
// t.Fatalf("Client.Call() error, returns '%v', want '%v'", rsp, req)
// }
c.Keepalive(time.Second)
if err = c.Call(methodCallString, &req, &rsp, time.Second, map[interface{}]interface{}{}); err != nil {
t.Fatalf("Client.Call() error = %v", err)
}
Expand Down
68 changes: 67 additions & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"net"
"time"

"github.com/lesismal/arpc/codec"
"github.com/lesismal/arpc/log"
Expand Down Expand Up @@ -126,6 +127,16 @@ type Handler interface {
// SetSendBufferSize sets client's send buffer size.
SetSendBufferSize(size int)

// ReadTimeout returns client's read timeout.
ReadTimeout() time.Duration
// SetReadTimeout sets client's read timeout.
SetReadTimeout(timeout time.Duration)

// WriteTimeout returns client's write timeout.
WriteTimeout() time.Duration
// SetWriteTimeout sets client's write timeout.
SetWriteTimeout(timeout time.Duration)

// SendQueueSize returns client's send queue channel capacity.
SendQueueSize() int
// SetSendQueueSize sets client's send queue channel capacity.
Expand Down Expand Up @@ -204,6 +215,8 @@ type handler struct {
asyncResponse bool
recvBufferSize int
sendBufferSize int
readTimeout time.Duration
writeTimeout time.Duration
sendQueueSize int
maxBodyLen int
maxReconnectTimes int
Expand Down Expand Up @@ -438,6 +451,22 @@ func (h *handler) SetSendBufferSize(size int) {
h.sendBufferSize = size
}

func (h *handler) ReadTimeout() time.Duration {
return h.readTimeout
}

func (h *handler) SetReadTimeout(timeout time.Duration) {
h.readTimeout = timeout
}

func (h *handler) WriteTimeout() time.Duration {
return h.writeTimeout
}

func (h *handler) SetWriteTimeout(timeout time.Duration) {
h.writeTimeout = timeout
}

func (h *handler) SendQueueSize() int {
return h.sendQueueSize
}
Expand Down Expand Up @@ -549,6 +578,9 @@ func (h *handler) Recv(c *Client) (*Message, error) {
return nil, err
}
}
if h.readTimeout > 0 {
c.Conn.SetReadDeadline(time.Now().Add(h.readTimeout))
}

_, err = io.ReadFull(c.Reader, c.Head[:])
if err != nil {
Expand All @@ -560,7 +592,7 @@ func (h *handler) Recv(c *Client) (*Message, error) {
return nil, err
}

if message.Len() > HeadLen {
if message.Len() >= HeadLen {
_, err = io.ReadFull(c.Reader, message.Buffer[HeaderIndexBodyLenEnd:])
}

Expand All @@ -573,6 +605,9 @@ func (h *handler) Send(conn net.Conn, buffer []byte) (int, error) {
return -1, err
}
}
if h.writeTimeout > 0 {
conn.SetWriteDeadline(time.Now().Add(h.writeTimeout))
}

n, err := conn.Write(buffer)
return n, err
Expand All @@ -584,6 +619,9 @@ func (h *handler) SendN(conn net.Conn, buffers net.Buffers) (int, error) {
return -1, err
}
}
if h.writeTimeout > 0 {
conn.SetWriteDeadline(time.Now().Add(h.writeTimeout))
}

n64, err := buffers.WriteTo(conn)
return int(n64), err
Expand All @@ -592,6 +630,14 @@ func (h *handler) SendN(conn net.Conn, buffers net.Buffers) (int, error) {
func (h *handler) OnMessage(c *Client, msg *Message) {
defer util.Recover()

switch msg.Cmd() {
case CmdPing:
c.Pong()
return
case CmdPong:
return
}

for i := len(h.msgCoders) - 1; i >= 0; i-- {
msg = h.msgCoders[i].Decode(c, msg)
}
Expand Down Expand Up @@ -874,6 +920,26 @@ func SetSendBufferSize(size int) {
DefaultHandler.SetSendBufferSize(size)
}

// ReadTimeout returns client's read timeout.
func ReadTimeout() time.Duration {
return DefaultHandler.ReadTimeout()
}

// SetReadTimeout sets client's read timeout.
func SetReadTimeout(timeout time.Duration) {
DefaultHandler.SetReadTimeout(timeout)
}

// WriteTimeout returns client's write timeout.
func WriteTimeout() time.Duration {
return DefaultHandler.WriteTimeout()
}

// SetWriteTimeout sets client's write timeout.
func SetWriteTimeout(timeout time.Duration) {
DefaultHandler.SetWriteTimeout(timeout)
}

// SendQueueSize returns default client's send queue channel capacity.
func SendQueueSize() int {
return DefaultHandler.SendQueueSize()
Expand Down
14 changes: 14 additions & 0 deletions proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ const (

// CmdNotify the other side should not response to a request message
CmdNotify byte = 3

// CmdPing .
CmdPing byte = 4

// CmdPong .
CmdPong byte = 5
)

const (
Expand Down Expand Up @@ -63,6 +69,14 @@ const (
DefaultMaxBodyLen int = 1024*1024*64 - 16
)

var (
// PingMessage .
PingMessage = newMessage(CmdPing, "", nil, false, false, 0, nil, nil, nil)

// PongMessage .
PongMessage = newMessage(CmdPong, "", nil, false, false, 0, nil, nil, nil)
)

// Header defines Message head
type Header []byte

Expand Down

0 comments on commit b2338d7

Please sign in to comment.