Skip to content

Commit

Permalink
Address data race
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Apr 2, 2024
1 parent 247a985 commit 5b5f709
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ func (p *Producer[Request, Response]) isConsumerAlive(ctx context.Context, consu
return time.Now().UnixMilli()-val < int64(p.cfg.KeepAliveTimeout.Milliseconds())
}

func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
_, found := p.promises[messageID]
return found
}

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.cfg.RedisStream,
Expand All @@ -213,7 +220,7 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
active := make(map[string]bool)
for _, msg := range pendingMessages {
// Ignore messages not produced by this producer.
if _, found := p.promises[msg.ID]; !found {
if p.havePromiseFor(msg.ID) {
continue
}
alive, found := active[msg.Consumer]
Expand Down

0 comments on commit 5b5f709

Please sign in to comment.