From 6531d68e83a20e6f498ae9cd85b04c1b68d32674 Mon Sep 17 00:00:00 2001 From: steden <1470804@qq.com> Date: Sun, 3 Nov 2024 23:37:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E8=87=AA=E5=8A=A8=E9=80=80?= =?UTF-8?q?=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rabbitConsumer.go | 49 ++++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/rabbitConsumer.go b/rabbitConsumer.go index 3a51820..fd8cd6d 100644 --- a/rabbitConsumer.go +++ b/rabbitConsumer.go @@ -94,29 +94,38 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string continue } - // 读取通道的消息 - for page := range deliveries { - entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) - args := receiver.createEventArgs(page, queueName) - isSuccess := false - exception.Try(func() { - if isSuccess = consumerHandle(string(page.Body), args); isSuccess { - if err = page.Ack(false); err != nil { - _ = flog.Errorf("rabbit:SubscribeAck failed to Ack q=%s: %s %s", queueName, err, string(page.Body)) + func() { + for { + select { + case <-time.NewTicker(10 * time.Second).C: + flog.Infof("队列:%s,routingKey:%s,10秒内未收到消费,退出队列。", queueName, routingKey) + return + // 读取通道的消息 + case page := <-deliveries: + entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) + args := receiver.createEventArgs(page, queueName) + isSuccess := false + exception.Try(func() { + if isSuccess = consumerHandle(string(page.Body), args); isSuccess { + if err = page.Ack(false); err != nil { + _ = flog.Errorf("rabbit:SubscribeAck failed to Ack q=%s: %s %s", queueName, err, string(page.Body)) + } + } + }).CatchException(func(exp any) { + err = flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp) + }) + if !isSuccess { + // Nack + if err = page.Nack(false, true); err != nil { + err = flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body)) + } } - } - }).CatchException(func(exp any) { - err = flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp) - }) - if !isSuccess { - // Nack - if err = page.Nack(false, true); err != nil { - err = flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body)) + entryMqConsumer.End(err) + asyncLocal.Release() } } - entryMqConsumer.End(err) - asyncLocal.Release() - } + }() + // 通道关闭了 if chl != nil { _ = chl.Close()