From 261394933eb3519761f4330e0c790d5579889e59 Mon Sep 17 00:00:00 2001 From: steden <1470804@qq.com> Date: Sun, 3 Nov 2024 23:13:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=BC=82=E5=B8=B8=E6=97=B6?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rabbitConsumer.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rabbitConsumer.go b/rabbitConsumer.go index e65cf9b..3a51820 100644 --- a/rabbitConsumer.go +++ b/rabbitConsumer.go @@ -2,6 +2,8 @@ package rabbit import ( "fmt" + "time" + "github.com/farseer-go/collections" "github.com/farseer-go/fs" "github.com/farseer-go/fs/asyncLocal" @@ -9,7 +11,6 @@ import ( "github.com/farseer-go/fs/flog" "github.com/farseer-go/fs/trace" amqp "github.com/rabbitmq/amqp091-go" - "time" ) type rabbitConsumer struct { @@ -87,10 +88,12 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string // 创建一个连接和通道 chl, deliveries, err := receiver.createQueueAndBindAndConsume(queueName, routingKey, prefetchCount, false) if err != nil { + flog.Errorf("rabbit.SubscribeAck 创建通道时失败,3秒后重试:%s", err.Error()) // 3秒后重试 time.Sleep(3 * time.Second) continue } + // 读取通道的消息 for page := range deliveries { entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) @@ -275,6 +278,9 @@ func (receiver *rabbitConsumer) createAndBindQueue(chl *amqp.Channel, queueName, if chl == nil || chl.IsClosed() { // 创建一个连接和通道 if chl, err = receiver.manager.CreateChannel(); err != nil { + if chl != nil { + _ = chl.Close() + } return chl, err }