Skip to content

Commit

Permalink
rpcclient: safe read and write to batch
Browse files Browse the repository at this point in the history
  • Loading branch information
ynewmann committed Nov 28, 2024
1 parent 684d64a commit d815651
Showing 1 changed file with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions rpcclient/infrastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ var (
// client having already connected to the RPC server.
ErrClientAlreadyConnected = errors.New("websocket client has already " +
"connected")

// ErrEmptyBatch is an error to describe that there is nothing to send.
ErrEmptyBatch = errors.New("batch is empty")
)

const (
Expand Down Expand Up @@ -151,6 +154,7 @@ type Client struct {

// whether or not to batch requests, false unless changed by Batch()
batch bool
batchLock sync.Mutex
batchList *list.List

// retryCount holds the number of times the client has tried to
Expand Down Expand Up @@ -214,7 +218,10 @@ func (c *Client) addRequest(jReq *jsonRequest) error {
element := c.requestList.PushBack(jReq)
c.requestMap[jReq.id] = element
} else {
c.batchLock.Lock()
element := c.batchList.PushBack(jReq)
c.batchLock.Unlock()

c.requestMap[jReq.id] = element
}
return nil
Expand All @@ -238,7 +245,9 @@ func (c *Client) removeRequest(id uint64) *jsonRequest {

var request *jsonRequest
if c.batch {
c.batchLock.Lock()
request = c.batchList.Remove(element).(*jsonRequest)
c.batchLock.Unlock()
} else {
request = c.requestList.Remove(element).(*jsonRequest)
}
Expand Down Expand Up @@ -1656,7 +1665,15 @@ func (c *Client) BackendVersion() (BackendVersion, error) {
return c.backendVersion, nil
}

func (c *Client) sendAsync() FutureGetBulkResult {
func (c *Client) sendAsync() (FutureGetBulkResult, error) {
c.batchLock.Lock()
defer c.batchLock.Unlock()

// If batchList is empty, there's nothing to send.
if c.batchList.Len() == 0 {
return nil, ErrEmptyBatch
}

// convert the array of marshalled json requests to a single request we can send
responseChan := make(chan *Response, 1)
marshalledRequest := []byte("[")
Expand All @@ -1678,25 +1695,24 @@ func (c *Client) sendAsync() FutureGetBulkResult {
responseChan: responseChan,
}
c.sendPostRequest(&request)
return responseChan
return responseChan, nil
}

// Marshall's bulk requests and sends to the server
// creates a response channel to receive the response
func (c *Client) Send() error {
// if batchlist is empty, there's nothing to send
if c.batchList.Len() == 0 {
return nil
future, err := c.sendAsync()
if err != nil {
return err
}

batchResp, err := c.sendAsync().Receive()
batchResp, err := future.Receive()
if err != nil {
// Clear batchlist in case of an error.
//
// TODO(yy): need to double check to make sure there's no
// concurrent access to this batch list, otherwise we may miss
// some batched requests.

c.batchLock.Lock()
c.batchList = list.New()
c.batchLock.Unlock()

return err
}
Expand All @@ -1706,6 +1722,10 @@ func (c *Client) Send() error {
// Perform a GC on batchList and requestMap before moving
// forward.
request := c.removeRequest(id)
if request == nil {
// Perhaps another goroutine has already processed this request.
continue
}

// If there's an error, we log it and continue to the next
// request.
Expand Down

0 comments on commit d815651

Please sign in to comment.