Skip to content

Commit

Permalink
新增:自动重连机制
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Sep 22, 2023
1 parent b1a53ec commit b92054b
Showing 1 changed file with 3 additions and 9 deletions.
12 changes: 3 additions & 9 deletions rabbitProduct.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (receiver *rabbitProduct) popChannel() rabbitChannel {
receiver.lock.Lock()
defer receiver.lock.Unlock()

// 工作队列 + 1
defer atomic.AddInt32(&receiver.workChannelCount, 1)

if receiver.chlQueue == nil {
receiver.init()
}
Expand All @@ -54,14 +57,12 @@ func (receiver *rabbitProduct) popChannel() rabbitChannel {
if receiver.workChannelCount >= receiver.manager.server.MaxChannelCount {
continue
}
receiver.workChannelCount++
return receiver.createChannelAndConfirm()
case rabbitChl := <-receiver.chlQueue:
// 如果通道是关闭状态,则重新走取出逻辑
if rabbitChl.chl.IsClosed() {
continue
}
receiver.workChannelCount++
return rabbitChl
}
}
Expand Down Expand Up @@ -124,13 +125,6 @@ func (receiver *rabbitProduct) SendMessage(message []byte, routingKey, messageId
receiver.pushChannel(rabbitChl)
}(rabbitChl)

// 如果关闭了,则重新init
if receiver.manager.conn.IsClosed() {
receiver.lock.Lock()
defer receiver.lock.Unlock()
receiver.init()
}

// 发布消息
err := rabbitChl.chl.PublishWithContext(context.Background(),
receiver.manager.exchange.ExchangeName, // exchange
Expand Down

0 comments on commit b92054b

Please sign in to comment.