diff --git a/rabbitProduct.go b/rabbitProduct.go index 66f8a9d..07105f5 100644 --- a/rabbitProduct.go +++ b/rabbitProduct.go @@ -42,6 +42,9 @@ func (receiver *rabbitProduct) popChannel() rabbitChannel { receiver.lock.Lock() defer receiver.lock.Unlock() + // 工作队列 + 1 + defer atomic.AddInt32(&receiver.workChannelCount, 1) + if receiver.chlQueue == nil { receiver.init() } @@ -54,14 +57,12 @@ func (receiver *rabbitProduct) popChannel() rabbitChannel { if receiver.workChannelCount >= receiver.manager.server.MaxChannelCount { continue } - receiver.workChannelCount++ return receiver.createChannelAndConfirm() case rabbitChl := <-receiver.chlQueue: // 如果通道是关闭状态,则重新走取出逻辑 if rabbitChl.chl.IsClosed() { continue } - receiver.workChannelCount++ return rabbitChl } } @@ -124,13 +125,6 @@ func (receiver *rabbitProduct) SendMessage(message []byte, routingKey, messageId receiver.pushChannel(rabbitChl) }(rabbitChl) - // 如果关闭了,则重新init - if receiver.manager.conn.IsClosed() { - receiver.lock.Lock() - defer receiver.lock.Unlock() - receiver.init() - } - // 发布消息 err := rabbitChl.chl.PublishWithContext(context.Background(), receiver.manager.exchange.ExchangeName, // exchange