Skip to content

Commit

Permalink
新增:自动重连机制
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Sep 22, 2023
1 parent 4bf5b2d commit b1a53ec
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 56 deletions.
108 changes: 61 additions & 47 deletions rabbitConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,117 +11,131 @@ type rabbitConsumer struct {
manager *rabbitManager
}

func newConsumer(server serverConfig, exchange exchangeConfig) rabbitConsumer {
return rabbitConsumer{
func newConsumer(server serverConfig, exchange exchangeConfig) *rabbitConsumer {
return &rabbitConsumer{
manager: newManager(server, exchange),
}
}

func (receiver rabbitConsumer) Subscribe(queueName string, routingKey string, prefetchCount int, consumerHandle func(message string, ea EventArgs)) {
func (receiver *rabbitConsumer) createQueueAndBindAndConsume(queueName, routingKey string, prefetchCount int, autoAck bool) (*amqp.Channel, <-chan amqp.Delivery) {
// 创建一个连接和通道
chl := receiver.manager.CreateChannel()
receiver.manager.CreateQueue(chl, queueName, receiver.manager.exchange.IsDurable, receiver.manager.exchange.AutoDelete, nil)
receiver.manager.BindQueue(chl, queueName, routingKey, receiver.manager.exchange.ExchangeName, nil)

// 设置预读数量
err := chl.Qos(prefetchCount, 0, false)
if err != nil {
flog.Panicf("Failed to Qos %s: %s", queueName, err)
}
deliveries, err := chl.Consume(queueName, "", true, false, false, false, nil)

// 订阅消息
deliveries, err := chl.Consume(queueName, "", autoAck, false, false, false, nil)
if err != nil {
flog.Panicf("Failed to Subscribe %s: %s", queueName, err)
}
return chl, deliveries
}

func (receiver *rabbitConsumer) Subscribe(queueName string, routingKey string, prefetchCount int, consumerHandle func(message string, ea EventArgs)) {
// 创建一个连接和通道
chl, deliveries := receiver.createQueueAndBindAndConsume(queueName, routingKey, prefetchCount, true)

go func(chl *amqp.Channel) {
// 读取通道的消息
for page := range deliveries {
args := receiver.createEventArgs(page)
consumerHandle(string(page.Body), args)
}
_ = chl.Close()
// 关闭后,重新调用自己
receiver.Subscribe(queueName, routingKey, prefetchCount, consumerHandle)
}(chl)
}

func (receiver rabbitConsumer) SubscribeAck(queueName string, routingKey string, prefetchCount int, consumerHandle func(message string, ea EventArgs) bool) {
chl := receiver.manager.CreateChannel()
receiver.manager.CreateQueue(chl, queueName, receiver.manager.exchange.IsDurable, receiver.manager.exchange.AutoDelete, nil)
receiver.manager.BindQueue(chl, queueName, routingKey, receiver.manager.exchange.ExchangeName, nil)

err := chl.Qos(prefetchCount, 0, false)
if err != nil {
flog.Panicf("Failed to Qos %s: %s", queueName, err)
}

deliveries, err := chl.Consume(queueName, "", false, false, false, false, nil)
if err != nil {
flog.Panicf("Failed to Subscribe %s: %s", queueName, err)
}
func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string, prefetchCount int, consumerHandle func(message string, ea EventArgs) bool) {
// 创建一个连接和通道
chl, deliveries := receiver.createQueueAndBindAndConsume(queueName, routingKey, prefetchCount, false)

go func(chl *amqp.Channel) {
// 读取通道的消息
for page := range deliveries {
args := receiver.createEventArgs(page)
// Ack
if consumerHandle(string(page.Body), args) {
if err = page.Ack(false); err != nil {
if err := page.Ack(false); err != nil {
_ = flog.Errorf("Failed to Ack %s: %s %s", queueName, err, string(page.Body))
}
} else {
if err = page.Nack(false, true); err != nil {
} else { // Nack
if err := page.Nack(false, true); err != nil {
_ = flog.Errorf("Failed to Nack %s: %s %s", queueName, err, string(page.Body))
}
}
}
_ = chl.Close()
// 关闭后,重新调用自己
receiver.SubscribeAck(queueName, routingKey, prefetchCount, consumerHandle)
}(chl)
}

func (receiver rabbitConsumer) SubscribeBatchAck(queueName string, routingKey string, pullCount int, consumerHandle func(messages collections.List[EventArgs]) bool) {
func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey string, pullCount int, consumerHandle func(messages collections.List[EventArgs])) {
if pullCount < 1 {
flog.Panicf("The parameter pullCount must be greater than 0, %s: %d", queueName, pullCount)
}

chl := receiver.manager.CreateChannel()
receiver.manager.CreateQueue(chl, queueName, receiver.manager.exchange.IsDurable, receiver.manager.exchange.AutoDelete, nil)
receiver.manager.BindQueue(chl, queueName, routingKey, receiver.manager.exchange.ExchangeName, nil)

go func() {
var chl *amqp.Channel
for {
lst, lastPage := receiver.pullBatch(queueName, false, pullCount, chl)
if chl == nil || chl.IsClosed() {
// 创建一个连接和通道
chl = receiver.manager.CreateChannel()
receiver.manager.CreateQueue(chl, queueName, receiver.manager.exchange.IsDurable, receiver.manager.exchange.AutoDelete, nil)
receiver.manager.BindQueue(chl, queueName, routingKey, receiver.manager.exchange.ExchangeName, nil)
}

lst, _ := receiver.pullBatch(queueName, true, pullCount, chl)
if lst.Count() > 0 {
if consumerHandle(lst) {
if err := lastPage.Ack(true); err != nil {
_ = flog.Errorf("Failed to Ack %s: %s", queueName, err)
}
} else {
if err := lastPage.Nack(true, true); err != nil {
_ = flog.Errorf("Failed to Nack %s: %s", queueName, err)
}
}
consumerHandle(lst)
}
time.Sleep(100 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
}
}()
}

func (receiver rabbitConsumer) SubscribeBatch(queueName string, routingKey string, pullCount int, consumerHandle func(messages collections.List[EventArgs])) {
func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey string, pullCount int, consumerHandle func(messages collections.List[EventArgs]) bool) {
if pullCount < 1 {
flog.Panicf("The parameter pullCount must be greater than 0, %s: %d", queueName, pullCount)
}

chl := receiver.manager.CreateChannel()
receiver.manager.CreateQueue(chl, queueName, receiver.manager.exchange.IsDurable, receiver.manager.exchange.AutoDelete, nil)
receiver.manager.BindQueue(chl, queueName, routingKey, receiver.manager.exchange.ExchangeName, nil)

go func() {
var chl *amqp.Channel
for {
lst, _ := receiver.pullBatch(queueName, true, pullCount, chl)
if chl == nil || chl.IsClosed() {
// 创建一个连接和通道
chl = receiver.manager.CreateChannel()
receiver.manager.CreateQueue(chl, queueName, receiver.manager.exchange.IsDurable, receiver.manager.exchange.AutoDelete, nil)
receiver.manager.BindQueue(chl, queueName, routingKey, receiver.manager.exchange.ExchangeName, nil)
}

lst, lastPage := receiver.pullBatch(queueName, false, pullCount, chl)
if lst.Count() > 0 {
consumerHandle(lst)
if consumerHandle(lst) {
if err := lastPage.Ack(true); err != nil {
_ = flog.Errorf("Failed to Ack %s: %s", queueName, err)
}
} else {
if err := lastPage.Nack(true, true); err != nil {
_ = flog.Errorf("Failed to Nack %s: %s", queueName, err)
}
}
}
time.Sleep(500 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
}()
}

// 生成事件参数
func (receiver rabbitConsumer) createEventArgs(page amqp.Delivery) EventArgs {
func (receiver *rabbitConsumer) createEventArgs(page amqp.Delivery) EventArgs {
return EventArgs{
ConsumerTag: page.ConsumerTag,
DeliveryTag: page.DeliveryTag,
Expand All @@ -148,7 +162,7 @@ func (receiver rabbitConsumer) createEventArgs(page amqp.Delivery) EventArgs {
}

// 手动拉取数据
func (receiver rabbitConsumer) pullBatch(queueName string, autoAck bool, pullCount int, chl *amqp.Channel) (collections.List[EventArgs], amqp.Delivery) {
func (receiver *rabbitConsumer) pullBatch(queueName string, autoAck bool, pullCount int, chl *amqp.Channel) (collections.List[EventArgs], amqp.Delivery) {
lst := collections.NewList[EventArgs]()
var lastPage amqp.Delivery
for lst.Count() < pullCount {
Expand Down
1 change: 0 additions & 1 deletion rabbitManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (receiver *rabbitManager) Open() {
if receiver.conn == nil || receiver.conn.IsClosed() {
var err error
receiver.conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s/", receiver.server.UserName, receiver.server.Password, receiver.server.Server))

if err != nil {
flog.Panicf("Failed to connect to RabbitMQ %s: %s", receiver.server.Server, err)
}
Expand Down
33 changes: 25 additions & 8 deletions rabbitProduct.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
type rabbitProduct struct {
manager *rabbitManager
deliveryMode uint8
chlQueue chan rabbitChannel // 通道队列,打开通道后,不关系,放回此队列
chlQueue chan rabbitChannel // 通道队列,使用完后放回此队列
workChannelCount int32 // 正在使用的通道数量
lock *sync.Mutex
}
Expand Down Expand Up @@ -57,6 +57,10 @@ func (receiver *rabbitProduct) popChannel() rabbitChannel {
receiver.workChannelCount++
return receiver.createChannelAndConfirm()
case rabbitChl := <-receiver.chlQueue:
// 如果通道是关闭状态,则重新走取出逻辑
if rabbitChl.chl.IsClosed() {
continue
}
receiver.workChannelCount++
return rabbitChl
}
Expand All @@ -65,14 +69,14 @@ func (receiver *rabbitProduct) popChannel() rabbitChannel {

func (receiver *rabbitProduct) init() {
// 首次使用
if receiver.chlQueue == nil {
receiver.chlQueue = make(chan rabbitChannel, 2048)
// 按最低channel要求,创建指定数量的channel
for len(receiver.chlQueue) < receiver.manager.server.MinChannelCount {
receiver.chlQueue <- receiver.createChannelAndConfirm()
}
receiver.workChannelCount = 0
receiver.chlQueue = make(chan rabbitChannel, 2048)
// 按最低channel要求,创建指定数量的channel
for len(receiver.chlQueue) < receiver.manager.server.MinChannelCount {
receiver.chlQueue <- receiver.createChannelAndConfirm()
}
}

func (receiver *rabbitProduct) createChannelAndConfirm() rabbitChannel {
chl := receiver.manager.CreateChannel()
return rabbitChannel{
Expand All @@ -81,7 +85,7 @@ func (receiver *rabbitProduct) createChannelAndConfirm() rabbitChannel {
}
}

// 创建通道
// 通道使用完后,放回队列中
func (receiver *rabbitProduct) pushChannel(rabbitChl rabbitChannel) {
defer atomic.AddInt32(&receiver.workChannelCount, -1)
receiver.chlQueue <- rabbitChl
Expand Down Expand Up @@ -120,6 +124,13 @@ func (receiver *rabbitProduct) SendMessage(message []byte, routingKey, messageId
receiver.pushChannel(rabbitChl)
}(rabbitChl)

// 如果关闭了,则重新init
if receiver.manager.conn.IsClosed() {
receiver.lock.Lock()
defer receiver.lock.Unlock()
receiver.init()
}

// 发布消息
err := rabbitChl.chl.PublishWithContext(context.Background(),
receiver.manager.exchange.ExchangeName, // exchange
Expand All @@ -134,7 +145,13 @@ func (receiver *rabbitProduct) SendMessage(message []byte, routingKey, messageId
AppId: fs.AppName,
Body: message,
})

if err != nil {
//if rabbitError, ok := err.(*amqp.Error); ok {
// if rabbitError.Code == 504 {
//
// }
//}
return flog.Errorf("Failed to Publish %s: %s", receiver.manager.server.Server, err)
}

Expand Down

0 comments on commit b1a53ec

Please sign in to comment.