diff --git a/rabbitConsumer.go b/rabbitConsumer.go index 3a51820..fd8cd6d 100644 --- a/rabbitConsumer.go +++ b/rabbitConsumer.go @@ -94,29 +94,38 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string continue } - // 读取通道的消息 - for page := range deliveries { - entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) - args := receiver.createEventArgs(page, queueName) - isSuccess := false - exception.Try(func() { - if isSuccess = consumerHandle(string(page.Body), args); isSuccess { - if err = page.Ack(false); err != nil { - _ = flog.Errorf("rabbit:SubscribeAck failed to Ack q=%s: %s %s", queueName, err, string(page.Body)) + func() { + for { + select { + case <-time.NewTicker(10 * time.Second).C: + flog.Infof("队列:%s,routingKey:%s,10秒内未收到消费,退出队列。", queueName, routingKey) + return + // 读取通道的消息 + case page := <-deliveries: + entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) + args := receiver.createEventArgs(page, queueName) + isSuccess := false + exception.Try(func() { + if isSuccess = consumerHandle(string(page.Body), args); isSuccess { + if err = page.Ack(false); err != nil { + _ = flog.Errorf("rabbit:SubscribeAck failed to Ack q=%s: %s %s", queueName, err, string(page.Body)) + } + } + }).CatchException(func(exp any) { + err = flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp) + }) + if !isSuccess { + // Nack + if err = page.Nack(false, true); err != nil { + err = flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body)) + } } - } - }).CatchException(func(exp any) { - err = flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp) - }) - if !isSuccess { - // Nack - if err = page.Nack(false, true); err != nil { - err = flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body)) + entryMqConsumer.End(err) + asyncLocal.Release() } } - entryMqConsumer.End(err) - asyncLocal.Release() - } + }() + // 通道关闭了 if chl != nil { _ = chl.Close()