diff --git a/pubsub/producer.go b/pubsub/producer.go index 4569316b4d..99c4c33438 100644 --- a/pubsub/producer.go +++ b/pubsub/producer.go @@ -177,6 +177,12 @@ func (p *Producer[Request, Response]) Start(ctx context.Context) { p.StopWaiter.Start(ctx, p) } +func (p *Producer[Request, Response]) promisesLen() int { + p.promisesLock.Lock() + defer p.promisesLock.Unlock() + return len(p.promises) +} + // reproduce is used when Producer claims ownership on the pending // message that was sent to inactive consumer and reinserts it into the stream, // so that seamlessly return the answer in the same promise. diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 5b83923692..f872f8abfb 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -311,7 +311,7 @@ func TestRedisClaimingOwnership(t *testing.T) { if diff := cmp.Diff(wantResp, gotResponses); diff != "" { t.Errorf("Unexpected diff in responses:\n%s\n", diff) } - if cnt := len(producer.promises); cnt != 0 { + if cnt := producer.promisesLen(); cnt != 0 { t.Errorf("Producer still has %d unfullfilled promises", cnt) } }