Skip to content

Commit

Permalink
Fix race condition in rocket block
Browse files Browse the repository at this point in the history
Summary:
Race condition was between `valChan` and `completeChan`.

When a response comes back from the server - the following two callbacks are invoked back-to-back:
https://www.internalfb.com/code/fbsource/[954f75c300e6]/fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go?lines=177-178

They both signal on channels (`valChan` and `completeChan` respectively). However, there is no guarantee which channel will be received on first.

As a result - it was possible for the `completeChan` to be received on first and `valChan` to never be received on:
https://www.internalfb.com/code/fbsource/[954f75c300e6]/fbcode/thrift/lib/go/thrift/rocket_rsocket_client_block.go?lines=85-87

....As a result of that - occasionally (very rarely - as with most race conditions) - we would get a `nil` payload back, which would manifest as a `EOF` error for Rocket clients.

The changes below ensure that by the time `completeChan` is received on - the payload has already been stored.

Reviewed By: podtserkovskiy

Differential Revision: D67217712

fbshipit-source-id: 1d1726344a1a2eb50d8167030d3bb77d8cad37f0
  • Loading branch information
echistyakov authored and facebook-github-bot committed Dec 14, 2024
1 parent 645e763 commit 5c1a863
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package thrift
import (
"context"
"fmt"
"sync"

rsocket "github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
Expand All @@ -34,22 +35,25 @@ func rsocketBlock(ctx context.Context, client rsocket.Client, request payload.Pa
}

type subsriber struct {
// Error and Complete are two possible terminal states.
errChan chan error
valChan chan payload.Payload
completeChan chan struct{}
payload payload.Payload
payloadMutex sync.Mutex
cancelSubscription func()
}

func newSubscriber() *subsriber {
return &subsriber{
errChan: make(chan error, 1),
valChan: make(chan payload.Payload, 1),
completeChan: make(chan struct{}, 1),
}
}

func (s *subsriber) OnNext(msg payload.Payload) {
s.valChan <- payload.Clone(msg)
s.payloadMutex.Lock()
defer s.payloadMutex.Unlock()
s.payload = payload.Clone(msg)
}

// OnError represents failed terminal state.
Expand All @@ -75,16 +79,16 @@ func (s *subsriber) OnSubscribe(ctx context.Context, subscription rx.Subscriptio
}

func (s *subsriber) Block(ctx context.Context) (payload.Payload, error) {
var val payload.Payload
for {
select {
case err := <-s.errChan:
return val, err
case val = <-s.valChan:
return nil, err
case <-s.completeChan:
return val, nil
s.payloadMutex.Lock()
defer s.payloadMutex.Unlock()
return s.payload, nil
case <-ctx.Done():
return val, ctx.Err()
return nil, ctx.Err()
}
}
}

0 comments on commit 5c1a863

Please sign in to comment.