Skip to content

Commit

Permalink
测试自动退出
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Nov 3, 2024
1 parent 2613949 commit 6531d68
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions rabbitConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,38 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string
continue
}

// 读取通道的消息
for page := range deliveries {
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
args := receiver.createEventArgs(page, queueName)
isSuccess := false
exception.Try(func() {
if isSuccess = consumerHandle(string(page.Body), args); isSuccess {
if err = page.Ack(false); err != nil {
_ = flog.Errorf("rabbit:SubscribeAck failed to Ack q=%s: %s %s", queueName, err, string(page.Body))
func() {
for {
select {
case <-time.NewTicker(10 * time.Second).C:
flog.Infof("队列:%s,routingKey:%s,10秒内未收到消费,退出队列。", queueName, routingKey)
return
// 读取通道的消息
case page := <-deliveries:
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
args := receiver.createEventArgs(page, queueName)
isSuccess := false
exception.Try(func() {
if isSuccess = consumerHandle(string(page.Body), args); isSuccess {
if err = page.Ack(false); err != nil {
_ = flog.Errorf("rabbit:SubscribeAck failed to Ack q=%s: %s %s", queueName, err, string(page.Body))
}
}
}).CatchException(func(exp any) {
err = flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp)
})
if !isSuccess {
// Nack
if err = page.Nack(false, true); err != nil {
err = flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body))
}
}
}
}).CatchException(func(exp any) {
err = flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp)
})
if !isSuccess {
// Nack
if err = page.Nack(false, true); err != nil {
err = flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body))
entryMqConsumer.End(err)
asyncLocal.Release()
}
}
entryMqConsumer.End(err)
asyncLocal.Release()
}
}()

// 通道关闭了
if chl != nil {
_ = chl.Close()
Expand Down

0 comments on commit 6531d68

Please sign in to comment.