From 5b5f709970dcaf2e10532deac6284a0ad0827003 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Tue, 2 Apr 2024 18:04:52 +0200 Subject: [PATCH] Address data race --- pubsub/producer.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pubsub/producer.go b/pubsub/producer.go index f467d87260..a183cdbd7b 100644 --- a/pubsub/producer.go +++ b/pubsub/producer.go @@ -193,6 +193,13 @@ func (p *Producer[Request, Response]) isConsumerAlive(ctx context.Context, consu return time.Now().UnixMilli()-val < int64(p.cfg.KeepAliveTimeout.Milliseconds()) } +func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool { + p.promisesLock.Lock() + defer p.promisesLock.Unlock() + _, found := p.promises[messageID] + return found +} + func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) { pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{ Stream: p.cfg.RedisStream, @@ -213,7 +220,7 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess active := make(map[string]bool) for _, msg := range pendingMessages { // Ignore messages not produced by this producer. - if _, found := p.promises[msg.ID]; !found { + if p.havePromiseFor(msg.ID) { continue } alive, found := active[msg.Consumer]