diff --git a/rabbitConsumer.go b/rabbitConsumer.go index bd8e755..7cea1ec 100644 --- a/rabbitConsumer.go +++ b/rabbitConsumer.go @@ -2,6 +2,7 @@ package rabbit import ( "github.com/farseer-go/collections" + "github.com/farseer-go/fs/exception" "github.com/farseer-go/fs/flog" amqp "github.com/rabbitmq/amqp091-go" "time" @@ -45,7 +46,9 @@ func (receiver *rabbitConsumer) Subscribe(queueName string, routingKey string, p // 读取通道的消息 for page := range deliveries { args := receiver.createEventArgs(page) - consumerHandle(string(page.Body), args) + exception.Try(func() { + consumerHandle(string(page.Body), args) + }) } _ = chl.Close() // 关闭后,重新调用自己 @@ -61,12 +64,17 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string // 读取通道的消息 for page := range deliveries { args := receiver.createEventArgs(page) - // Ack - if consumerHandle(string(page.Body), args) { - if err := page.Ack(false); err != nil { - _ = flog.Errorf("Failed to Ack %s: %s %s", queueName, err, string(page.Body)) + isSuccess := false + exception.Try(func() { + isSuccess = consumerHandle(string(page.Body), args) + if isSuccess { + if err := page.Ack(false); err != nil { + _ = flog.Errorf("Failed to Ack %s: %s %s", queueName, err, string(page.Body)) + } } - } else { // Nack + }) + if !isSuccess { + // Nack if err := page.Nack(false, true); err != nil { _ = flog.Errorf("Failed to Nack %s: %s %s", queueName, err, string(page.Body)) } @@ -95,7 +103,9 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri lst, _ := receiver.pullBatch(queueName, true, pullCount, chl) if lst.Count() > 0 { - consumerHandle(lst) + exception.Try(func() { + consumerHandle(lst) + }) } time.Sleep(500 * time.Millisecond) } @@ -119,11 +129,17 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s lst, lastPage := receiver.pullBatch(queueName, false, pullCount, chl) if lst.Count() > 0 { - if consumerHandle(lst) { - if err := lastPage.Ack(true); err != nil { - _ = flog.Errorf("Failed to Ack %s: %s", queueName, err) + isSuccess := false + exception.Try(func() { + isSuccess = consumerHandle(lst) + if isSuccess { + if err := lastPage.Ack(true); err != nil { + _ = flog.Errorf("Failed to Ack %s: %s", queueName, err) + } } - } else { + }) + if !isSuccess { + // Nack if err := lastPage.Nack(true, true); err != nil { _ = flog.Errorf("Failed to Nack %s: %s", queueName, err) } @@ -168,7 +184,7 @@ func (receiver *rabbitConsumer) pullBatch(queueName string, autoAck bool, pullCo for lst.Count() < pullCount { msg, ok, err := chl.Get(queueName, autoAck) if err != nil { - flog.Panicf("Failed to Get %s: %s", queueName, err) + _ = flog.Errorf("Failed to Get %s: %s", queueName, err) } if !ok { break