diff --git a/pubsub/producer.go b/pubsub/producer.go index f467d87260..a183cdbd7b 100644 --- a/pubsub/producer.go +++ b/pubsub/producer.go @@ -193,6 +193,13 @@ func (p *Producer[Request, Response]) isConsumerAlive(ctx context.Context, consu return time.Now().UnixMilli()-val < int64(p.cfg.KeepAliveTimeout.Milliseconds()) } +func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool { + p.promisesLock.Lock() + defer p.promisesLock.Unlock() + _, found := p.promises[messageID] + return found +} + func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) { pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{ Stream: p.cfg.RedisStream, @@ -213,7 +220,7 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess active := make(map[string]bool) for _, msg := range pendingMessages { // Ignore messages not produced by this producer. - if _, found := p.promises[msg.ID]; !found { + if p.havePromiseFor(msg.ID) { continue } alive, found := active[msg.Consumer]