From 0db4e65380b0637399298fc0d4a8b2d896b33509 Mon Sep 17 00:00:00 2001 From: steden <1470804@qq.com> Date: Sun, 14 Jul 2024 20:39:41 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=EF=BC=9A=E5=A2=9E=E5=8A=A0ch?= =?UTF-8?q?annel=E6=9C=80=E5=A4=A7=E3=80=81=E6=9C=80=E5=B0=8F=E5=80=BC?= =?UTF-8?q?=E7=9A=84=E6=A3=80=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rabbitConfig.go | 2 +- rabbitConsumer.go | 2 +- rabbitProduct.go | 5 +++-- register.go | 3 +++ 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/rabbitConfig.go b/rabbitConfig.go index 6086dac..285af3d 100644 --- a/rabbitConfig.go +++ b/rabbitConfig.go @@ -4,7 +4,7 @@ type rabbitConfig struct { Server string // 服务端地址 UserName string // 用户名 Password string // 密码 - MinChannel int // 最低保持多少个频道 + MinChannel int32 // 最低保持多少个频道 MaxChannel int32 // 最多可以创建多少个频道 Exchange string // 交换器名称 RoutingKey string // 路由KEY diff --git a/rabbitConsumer.go b/rabbitConsumer.go index 2c19c60..d7e17d7 100644 --- a/rabbitConsumer.go +++ b/rabbitConsumer.go @@ -187,8 +187,8 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s } lst, lastPage := receiver.pullBatch(queueName, false, pullCount, chl) - entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) if lst.Count() > 0 { + entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) isSuccess := false exception.Try(func() { isSuccess = consumerHandle(lst) diff --git a/rabbitProduct.go b/rabbitProduct.go index 8439ad4..7fba4a9 100644 --- a/rabbitProduct.go +++ b/rabbitProduct.go @@ -75,10 +75,10 @@ func (receiver *rabbitProduct) popChannel() rabbitChannel { func (receiver *rabbitProduct) init() { // 首次使用 receiver.workChannelCount = 0 - receiver.chlQueue = make(chan rabbitChannel, 2048) + receiver.chlQueue = make(chan rabbitChannel, receiver.manager.config.MaxChannel) // 按最低channel要求,创建指定数量的channel go func() { - for (len(receiver.chlQueue) + parse.ToInt(receiver.workChannelCount)) < receiver.manager.config.MinChannel { + for (len(receiver.chlQueue) + parse.ToInt(receiver.workChannelCount)) < parse.ToInt(receiver.manager.config.MinChannel) { if channel := receiver.createChannelAndConfirm(); channel.chl != nil && !channel.chl.IsClosed() { receiver.chlQueue <- channel } @@ -149,6 +149,7 @@ func (receiver *rabbitProduct) SendMessage(message []byte, routingKey, messageId defer func(rabbitChl rabbitChannel) { receiver.pushChannel(rabbitChl) }(rabbitChl) + // 发布消息 err := rabbitChl.chl.PublishWithContext(context.Background(), receiver.manager.config.Exchange, // exchange diff --git a/register.go b/register.go index 470c511..3ea60a0 100644 --- a/register.go +++ b/register.go @@ -20,6 +20,9 @@ func Register(key string, configString string) { config.MinChannel = 10 } + if config.MinChannel > config.MaxChannel { + config.MaxChannel = config.MinChannel + } if config.Exchange == "" { _ = flog.Errorf("Rabbit配置:%s 缺少ExchangeName", config.Server) return