diff --git a/rabbitConfig.go b/rabbitConfig.go index 6086dac..285af3d 100644 --- a/rabbitConfig.go +++ b/rabbitConfig.go @@ -4,7 +4,7 @@ type rabbitConfig struct { Server string // 服务端地址 UserName string // 用户名 Password string // 密码 - MinChannel int // 最低保持多少个频道 + MinChannel int32 // 最低保持多少个频道 MaxChannel int32 // 最多可以创建多少个频道 Exchange string // 交换器名称 RoutingKey string // 路由KEY diff --git a/rabbitConsumer.go b/rabbitConsumer.go index 2c19c60..d7e17d7 100644 --- a/rabbitConsumer.go +++ b/rabbitConsumer.go @@ -187,8 +187,8 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s } lst, lastPage := receiver.pullBatch(queueName, false, pullCount, chl) - entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) if lst.Count() > 0 { + entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) isSuccess := false exception.Try(func() { isSuccess = consumerHandle(lst) diff --git a/rabbitProduct.go b/rabbitProduct.go index 8439ad4..7fba4a9 100644 --- a/rabbitProduct.go +++ b/rabbitProduct.go @@ -75,10 +75,10 @@ func (receiver *rabbitProduct) popChannel() rabbitChannel { func (receiver *rabbitProduct) init() { // 首次使用 receiver.workChannelCount = 0 - receiver.chlQueue = make(chan rabbitChannel, 2048) + receiver.chlQueue = make(chan rabbitChannel, receiver.manager.config.MaxChannel) // 按最低channel要求,创建指定数量的channel go func() { - for (len(receiver.chlQueue) + parse.ToInt(receiver.workChannelCount)) < receiver.manager.config.MinChannel { + for (len(receiver.chlQueue) + parse.ToInt(receiver.workChannelCount)) < parse.ToInt(receiver.manager.config.MinChannel) { if channel := receiver.createChannelAndConfirm(); channel.chl != nil && !channel.chl.IsClosed() { receiver.chlQueue <- channel } @@ -149,6 +149,7 @@ func (receiver *rabbitProduct) SendMessage(message []byte, routingKey, messageId defer func(rabbitChl rabbitChannel) { receiver.pushChannel(rabbitChl) }(rabbitChl) + // 发布消息 err := rabbitChl.chl.PublishWithContext(context.Background(), receiver.manager.config.Exchange, // exchange diff --git a/register.go b/register.go index 470c511..3ea60a0 100644 --- a/register.go +++ b/register.go @@ -20,6 +20,9 @@ func Register(key string, configString string) { config.MinChannel = 10 } + if config.MinChannel > config.MaxChannel { + config.MaxChannel = config.MinChannel + } if config.Exchange == "" { _ = flog.Errorf("Rabbit配置:%s 缺少ExchangeName", config.Server) return