From d815651f27c8f88b0a962ce80aa92aad6e1e3c69 Mon Sep 17 00:00:00 2001 From: Yevhen Date: Thu, 28 Nov 2024 12:13:23 +0200 Subject: [PATCH] rpcclient: safe read and write to batch --- rpcclient/infrastructure.go | 40 +++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index 4fe1d894df..03104c1318 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -74,6 +74,9 @@ var ( // client having already connected to the RPC server. ErrClientAlreadyConnected = errors.New("websocket client has already " + "connected") + + // ErrEmptyBatch is an error to describe that there is nothing to send. + ErrEmptyBatch = errors.New("batch is empty") ) const ( @@ -151,6 +154,7 @@ type Client struct { // whether or not to batch requests, false unless changed by Batch() batch bool + batchLock sync.Mutex batchList *list.List // retryCount holds the number of times the client has tried to @@ -214,7 +218,10 @@ func (c *Client) addRequest(jReq *jsonRequest) error { element := c.requestList.PushBack(jReq) c.requestMap[jReq.id] = element } else { + c.batchLock.Lock() element := c.batchList.PushBack(jReq) + c.batchLock.Unlock() + c.requestMap[jReq.id] = element } return nil @@ -238,7 +245,9 @@ func (c *Client) removeRequest(id uint64) *jsonRequest { var request *jsonRequest if c.batch { + c.batchLock.Lock() request = c.batchList.Remove(element).(*jsonRequest) + c.batchLock.Unlock() } else { request = c.requestList.Remove(element).(*jsonRequest) } @@ -1656,7 +1665,15 @@ func (c *Client) BackendVersion() (BackendVersion, error) { return c.backendVersion, nil } -func (c *Client) sendAsync() FutureGetBulkResult { +func (c *Client) sendAsync() (FutureGetBulkResult, error) { + c.batchLock.Lock() + defer c.batchLock.Unlock() + + // If batchList is empty, there's nothing to send. + if c.batchList.Len() == 0 { + return nil, ErrEmptyBatch + } + // convert the array of marshalled json requests to a single request we can send responseChan := make(chan *Response, 1) marshalledRequest := []byte("[") @@ -1678,25 +1695,24 @@ func (c *Client) sendAsync() FutureGetBulkResult { responseChan: responseChan, } c.sendPostRequest(&request) - return responseChan + return responseChan, nil } // Marshall's bulk requests and sends to the server // creates a response channel to receive the response func (c *Client) Send() error { - // if batchlist is empty, there's nothing to send - if c.batchList.Len() == 0 { - return nil + future, err := c.sendAsync() + if err != nil { + return err } - batchResp, err := c.sendAsync().Receive() + batchResp, err := future.Receive() if err != nil { // Clear batchlist in case of an error. - // - // TODO(yy): need to double check to make sure there's no - // concurrent access to this batch list, otherwise we may miss - // some batched requests. + + c.batchLock.Lock() c.batchList = list.New() + c.batchLock.Unlock() return err } @@ -1706,6 +1722,10 @@ func (c *Client) Send() error { // Perform a GC on batchList and requestMap before moving // forward. request := c.removeRequest(id) + if request == nil { + // Perhaps another goroutine has already processed this request. + continue + } // If there's an error, we log it and continue to the next // request.