Skip to content

Commit

Permalink
新增:对消费处理函数的异常捕获
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Sep 23, 2023
1 parent b92054b commit 2f696ee
Showing 1 changed file with 28 additions and 12 deletions.
40 changes: 28 additions & 12 deletions rabbitConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbit

import (
"github.com/farseer-go/collections"
"github.com/farseer-go/fs/exception"
"github.com/farseer-go/fs/flog"
amqp "github.com/rabbitmq/amqp091-go"
"time"
Expand Down Expand Up @@ -45,7 +46,9 @@ func (receiver *rabbitConsumer) Subscribe(queueName string, routingKey string, p
// 读取通道的消息
for page := range deliveries {
args := receiver.createEventArgs(page)
consumerHandle(string(page.Body), args)
exception.Try(func() {
consumerHandle(string(page.Body), args)
})
}
_ = chl.Close()
// 关闭后,重新调用自己
Expand All @@ -61,12 +64,17 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string
// 读取通道的消息
for page := range deliveries {
args := receiver.createEventArgs(page)
// Ack
if consumerHandle(string(page.Body), args) {
if err := page.Ack(false); err != nil {
_ = flog.Errorf("Failed to Ack %s: %s %s", queueName, err, string(page.Body))
isSuccess := false
exception.Try(func() {
isSuccess = consumerHandle(string(page.Body), args)
if isSuccess {
if err := page.Ack(false); err != nil {
_ = flog.Errorf("Failed to Ack %s: %s %s", queueName, err, string(page.Body))
}
}
} else { // Nack
})
if !isSuccess {
// Nack
if err := page.Nack(false, true); err != nil {
_ = flog.Errorf("Failed to Nack %s: %s %s", queueName, err, string(page.Body))
}
Expand Down Expand Up @@ -95,7 +103,9 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri

lst, _ := receiver.pullBatch(queueName, true, pullCount, chl)
if lst.Count() > 0 {
consumerHandle(lst)
exception.Try(func() {
consumerHandle(lst)
})
}
time.Sleep(500 * time.Millisecond)
}
Expand All @@ -119,11 +129,17 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s

lst, lastPage := receiver.pullBatch(queueName, false, pullCount, chl)
if lst.Count() > 0 {
if consumerHandle(lst) {
if err := lastPage.Ack(true); err != nil {
_ = flog.Errorf("Failed to Ack %s: %s", queueName, err)
isSuccess := false
exception.Try(func() {
isSuccess = consumerHandle(lst)
if isSuccess {
if err := lastPage.Ack(true); err != nil {
_ = flog.Errorf("Failed to Ack %s: %s", queueName, err)
}
}
} else {
})
if !isSuccess {
// Nack
if err := lastPage.Nack(true, true); err != nil {
_ = flog.Errorf("Failed to Nack %s: %s", queueName, err)
}
Expand Down Expand Up @@ -168,7 +184,7 @@ func (receiver *rabbitConsumer) pullBatch(queueName string, autoAck bool, pullCo
for lst.Count() < pullCount {
msg, ok, err := chl.Get(queueName, autoAck)
if err != nil {
flog.Panicf("Failed to Get %s: %s", queueName, err)
_ = flog.Errorf("Failed to Get %s: %s", queueName, err)
}
if !ok {
break
Expand Down

0 comments on commit 2f696ee

Please sign in to comment.