Skip to content

Commit

Permalink
调整:链路追踪End接受error参数,
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Sep 16, 2024
1 parent 4818ddc commit 1981b3f
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions rabbitConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (receiver *rabbitConsumer) Subscribe(queueName string, routingKey string, p
exception.Try(func() {
consumerHandle(string(page.Body), args)
}).CatchException(func(exp any) {
entryMqConsumer.Error(flog.Errorf("rabbit:Subscribe exception:%s", exp))
err = flog.Errorf("rabbit:Subscribe exception:%s", exp)
})
entryMqConsumer.End()
entryMqConsumer.End(err)
asyncLocal.Release()
}
// 通道关闭了
Expand Down Expand Up @@ -103,15 +103,15 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string
}
}
}).CatchException(func(exp any) {
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp))
err = flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp)
})
if !isSuccess {
// Nack
if err = page.Nack(false, true); err != nil {
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body)))
err = flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body))
}
}
entryMqConsumer.End()
entryMqConsumer.End(err)
asyncLocal.Release()
}
// 通道关闭了
Expand Down Expand Up @@ -144,8 +144,7 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri
if chl == nil || chl.IsClosed() {
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
if chl, err = receiver.manager.CreateChannel(); err != nil {
entryMqConsumer.Error(err)
entryMqConsumer.End()
entryMqConsumer.End(err)
_ = flog.Error(err)
continue
}
Expand All @@ -156,10 +155,10 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri
exception.Try(func() {
consumerHandle(lst)
}).CatchException(func(exp any) {
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatch exception:%s", exp))
err = flog.Errorf("rabbit:SubscribeBatch exception:%s", exp)
})
// 数量大于0,才追踪
entryMqConsumer.End()
entryMqConsumer.End(err)
}
asyncLocal.Release()
}
Expand All @@ -186,8 +185,7 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s
if chl == nil || chl.IsClosed() {
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
if chl, err = receiver.manager.CreateChannel(); err != nil {
entryMqConsumer.Error(err)
entryMqConsumer.End()
entryMqConsumer.End(err)
_ = flog.Error(err)
continue
}
Expand All @@ -199,24 +197,24 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s
exception.Try(func() {
isSuccess = consumerHandle(lst)
if isSuccess {
if err := lastPage.Ack(true); err != nil {
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatchAck failed to Ack %s: %s", queueName, err))
if err = lastPage.Ack(true); err != nil {
err = flog.Errorf("rabbit:SubscribeBatchAck failed to Ack %s: %s", queueName, err)
}
}
}).CatchException(func(exp any) {
if file, funcName, line := trace.GetCallerInfo(); file != "" {
_ = flog.Errorf("%s:%s %s \n", file, flog.Blue(line), funcName)
}
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatchAck exception %s:%s", queueName, exp))
err = flog.Errorf("rabbit:SubscribeBatchAck exception %s:%s", queueName, exp)
})
if !isSuccess {
// Nack
if err := lastPage.Nack(true, true); err != nil {
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatchAck failed to Nack %s: %s", queueName, err))
if err = lastPage.Nack(true, true); err != nil {
err = flog.Errorf("rabbit:SubscribeBatchAck failed to Nack %s: %s", queueName, err)
}
}
// 数量大于0,才追踪
entryMqConsumer.End()
entryMqConsumer.End(err)
}
asyncLocal.Release()
}
Expand Down

0 comments on commit 1981b3f

Please sign in to comment.