From 1981b3f0df740e60ff0d6c8fd4d6337b851d370d Mon Sep 17 00:00:00 2001 From: steden <1470804@qq.com> Date: Mon, 16 Sep 2024 15:32:23 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=EF=BC=9A=E9=93=BE=E8=B7=AF?= =?UTF-8?q?=E8=BF=BD=E8=B8=AAEnd=E6=8E=A5=E5=8F=97error=E5=8F=82=E6=95=B0?= =?UTF-8?q?=EF=BC=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rabbitConsumer.go | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/rabbitConsumer.go b/rabbitConsumer.go index 45146b9..e65cf9b 100644 --- a/rabbitConsumer.go +++ b/rabbitConsumer.go @@ -64,9 +64,9 @@ func (receiver *rabbitConsumer) Subscribe(queueName string, routingKey string, p exception.Try(func() { consumerHandle(string(page.Body), args) }).CatchException(func(exp any) { - entryMqConsumer.Error(flog.Errorf("rabbit:Subscribe exception:%s", exp)) + err = flog.Errorf("rabbit:Subscribe exception:%s", exp) }) - entryMqConsumer.End() + entryMqConsumer.End(err) asyncLocal.Release() } // 通道关闭了 @@ -103,15 +103,15 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string } } }).CatchException(func(exp any) { - entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp)) + err = flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp) }) if !isSuccess { // Nack if err = page.Nack(false, true); err != nil { - entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body))) + err = flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body)) } } - entryMqConsumer.End() + entryMqConsumer.End(err) asyncLocal.Release() } // 通道关闭了 @@ -144,8 +144,7 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri if chl == nil || chl.IsClosed() { entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) if chl, err = receiver.manager.CreateChannel(); err != nil { - entryMqConsumer.Error(err) - entryMqConsumer.End() + entryMqConsumer.End(err) _ = flog.Error(err) continue } @@ -156,10 +155,10 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri exception.Try(func() { consumerHandle(lst) }).CatchException(func(exp any) { - entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatch exception:%s", exp)) + err = flog.Errorf("rabbit:SubscribeBatch exception:%s", exp) }) // 数量大于0,才追踪 - entryMqConsumer.End() + entryMqConsumer.End(err) } asyncLocal.Release() } @@ -186,8 +185,7 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s if chl == nil || chl.IsClosed() { entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) if chl, err = receiver.manager.CreateChannel(); err != nil { - entryMqConsumer.Error(err) - entryMqConsumer.End() + entryMqConsumer.End(err) _ = flog.Error(err) continue } @@ -199,24 +197,24 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s exception.Try(func() { isSuccess = consumerHandle(lst) if isSuccess { - if err := lastPage.Ack(true); err != nil { - entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatchAck failed to Ack %s: %s", queueName, err)) + if err = lastPage.Ack(true); err != nil { + err = flog.Errorf("rabbit:SubscribeBatchAck failed to Ack %s: %s", queueName, err) } } }).CatchException(func(exp any) { if file, funcName, line := trace.GetCallerInfo(); file != "" { _ = flog.Errorf("%s:%s %s \n", file, flog.Blue(line), funcName) } - entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatchAck exception %s:%s", queueName, exp)) + err = flog.Errorf("rabbit:SubscribeBatchAck exception %s:%s", queueName, exp) }) if !isSuccess { // Nack - if err := lastPage.Nack(true, true); err != nil { - entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatchAck failed to Nack %s: %s", queueName, err)) + if err = lastPage.Nack(true, true); err != nil { + err = flog.Errorf("rabbit:SubscribeBatchAck failed to Nack %s: %s", queueName, err) } } // 数量大于0,才追踪 - entryMqConsumer.End() + entryMqConsumer.End(err) } asyncLocal.Release() }