Skip to content

Commit

Permalink
Merge pull request gagliardetto#124 from JumpCrypto/pires/fix_ws_clie…
Browse files Browse the repository at this point in the history
…nt_goroutine_leak

Fix ws.ConnectWithOptions leaking goroutines
  • Loading branch information
gagliardetto authored Jan 6, 2023
2 parents b5aeabc + ee46ead commit 84acdbc
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions rpc/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type result interface{}
type Client struct {
rpcURL string
conn *websocket.Conn
connCtx context.Context
connCtxCancel context.CancelFunc
lock sync.RWMutex
subscriptionByRequestID map[uint64]*Subscription
subscriptionByWSSubID map[uint64]*Subscription
Expand Down Expand Up @@ -83,12 +85,15 @@ func ConnectWithOptions(ctx context.Context, rpcEndpoint string, opt *Options) (
return nil, fmt.Errorf("new ws client: dial: %w", err)
}

c.connCtx, c.connCtxCancel = context.WithCancel(context.Background())
go func() {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
ticker := time.NewTicker(pingPeriod)
for {
select {
case <-c.connCtx.Done():
return
case <-ticker.C:
c.sendPing()
}
Expand All @@ -111,17 +116,23 @@ func (c *Client) sendPing() {
func (c *Client) Close() {
c.lock.Lock()
defer c.lock.Unlock()
c.connCtxCancel()
c.conn.Close()
}

func (c *Client) receiveMessages() {
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
c.closeAllSubscription(err)
select {
case <-c.connCtx.Done():
return
default:
_, message, err := c.conn.ReadMessage()
if err != nil {
c.closeAllSubscription(err)
return
}
c.handleMessage(message)
}
c.handleMessage(message)
}
}

Expand Down

0 comments on commit 84acdbc

Please sign in to comment.