From 5c1a8638e92a0b55784e9ab9282efc4ac70a93d3 Mon Sep 17 00:00:00 2001 From: Jack Chistyakov Date: Fri, 13 Dec 2024 16:49:58 -0800 Subject: [PATCH] Fix race condition in rocket block Summary: Race condition was between `valChan` and `completeChan`. When a response comes back from the server - the following two callbacks are invoked back-to-back: https://www.internalfb.com/code/fbsource/[954f75c300e6]/fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go?lines=177-178 They both signal on channels (`valChan` and `completeChan` respectively). However, there is no guarantee which channel will be received on first. As a result - it was possible for the `completeChan` to be received on first and `valChan` to never be received on: https://www.internalfb.com/code/fbsource/[954f75c300e6]/fbcode/thrift/lib/go/thrift/rocket_rsocket_client_block.go?lines=85-87 ....As a result of that - occasionally (very rarely - as with most race conditions) - we would get a `nil` payload back, which would manifest as a `EOF` error for Rocket clients. The changes below ensure that by the time `completeChan` is received on - the payload has already been stored. Reviewed By: podtserkovskiy Differential Revision: D67217712 fbshipit-source-id: 1d1726344a1a2eb50d8167030d3bb77d8cad37f0 --- .../go/thrift/rocket_rsocket_client_block.go | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/third-party/thrift/src/thrift/lib/go/thrift/rocket_rsocket_client_block.go b/third-party/thrift/src/thrift/lib/go/thrift/rocket_rsocket_client_block.go index 8f89f2c99a6f8..675f967a9c446 100644 --- a/third-party/thrift/src/thrift/lib/go/thrift/rocket_rsocket_client_block.go +++ b/third-party/thrift/src/thrift/lib/go/thrift/rocket_rsocket_client_block.go @@ -19,6 +19,7 @@ package thrift import ( "context" "fmt" + "sync" rsocket "github.com/rsocket/rsocket-go" "github.com/rsocket/rsocket-go/payload" @@ -34,22 +35,25 @@ func rsocketBlock(ctx context.Context, client rsocket.Client, request payload.Pa } type subsriber struct { + // Error and Complete are two possible terminal states. errChan chan error - valChan chan payload.Payload completeChan chan struct{} + payload payload.Payload + payloadMutex sync.Mutex cancelSubscription func() } func newSubscriber() *subsriber { return &subsriber{ errChan: make(chan error, 1), - valChan: make(chan payload.Payload, 1), completeChan: make(chan struct{}, 1), } } func (s *subsriber) OnNext(msg payload.Payload) { - s.valChan <- payload.Clone(msg) + s.payloadMutex.Lock() + defer s.payloadMutex.Unlock() + s.payload = payload.Clone(msg) } // OnError represents failed terminal state. @@ -75,16 +79,16 @@ func (s *subsriber) OnSubscribe(ctx context.Context, subscription rx.Subscriptio } func (s *subsriber) Block(ctx context.Context) (payload.Payload, error) { - var val payload.Payload for { select { case err := <-s.errChan: - return val, err - case val = <-s.valChan: + return nil, err case <-s.completeChan: - return val, nil + s.payloadMutex.Lock() + defer s.payloadMutex.Unlock() + return s.payload, nil case <-ctx.Done(): - return val, ctx.Err() + return nil, ctx.Err() } } }