Skip to content

Commit

Permalink
添加异常时错误日志
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Nov 3, 2024
1 parent 79d6480 commit 2613949
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion rabbitConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package rabbit

import (
"fmt"
"time"

"github.com/farseer-go/collections"
"github.com/farseer-go/fs"
"github.com/farseer-go/fs/asyncLocal"
"github.com/farseer-go/fs/exception"
"github.com/farseer-go/fs/flog"
"github.com/farseer-go/fs/trace"
amqp "github.com/rabbitmq/amqp091-go"
"time"
)

type rabbitConsumer struct {
Expand Down Expand Up @@ -87,10 +88,12 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string
// 创建一个连接和通道
chl, deliveries, err := receiver.createQueueAndBindAndConsume(queueName, routingKey, prefetchCount, false)
if err != nil {
flog.Errorf("rabbit.SubscribeAck 创建通道时失败,3秒后重试:%s", err.Error())
// 3秒后重试
time.Sleep(3 * time.Second)
continue
}

// 读取通道的消息
for page := range deliveries {
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
Expand Down Expand Up @@ -275,6 +278,9 @@ func (receiver *rabbitConsumer) createAndBindQueue(chl *amqp.Channel, queueName,
if chl == nil || chl.IsClosed() {
// 创建一个连接和通道
if chl, err = receiver.manager.CreateChannel(); err != nil {
if chl != nil {
_ = chl.Close()
}
return chl, err
}

Expand Down

0 comments on commit 2613949

Please sign in to comment.