Skip to content

Commit

Permalink
Fix data race
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Apr 4, 2024
1 parent 0db255f commit 9d450af
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (p *Producer[Request, Response]) Start(ctx context.Context) {
p.StopWaiter.Start(ctx, p)
}

func (p *Producer[Request, Response]) promisesLen() int {
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
return len(p.promises)
}

// reproduce is used when Producer claims ownership on the pending
// message that was sent to inactive consumer and reinserts it into the stream,
// so that seamlessly return the answer in the same promise.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func TestRedisClaimingOwnership(t *testing.T) {
if diff := cmp.Diff(wantResp, gotResponses); diff != "" {
t.Errorf("Unexpected diff in responses:\n%s\n", diff)
}
if cnt := len(producer.promises); cnt != 0 {
if cnt := producer.promisesLen(); cnt != 0 {
t.Errorf("Producer still has %d unfullfilled promises", cnt)
}
}
Expand Down

0 comments on commit 9d450af

Please sign in to comment.