Skip to content

Commit

Permalink
调整:增加channel最大、最小值的检测
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Jul 14, 2024
1 parent 55a79a6 commit 0db4e65
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion rabbitConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type rabbitConfig struct {
Server string // 服务端地址
UserName string // 用户名
Password string // 密码
MinChannel int // 最低保持多少个频道
MinChannel int32 // 最低保持多少个频道
MaxChannel int32 // 最多可以创建多少个频道
Exchange string // 交换器名称
RoutingKey string // 路由KEY
Expand Down
2 changes: 1 addition & 1 deletion rabbitConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s
}

lst, lastPage := receiver.pullBatch(queueName, false, pullCount, chl)
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
if lst.Count() > 0 {
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
isSuccess := false
exception.Try(func() {
isSuccess = consumerHandle(lst)
Expand Down
5 changes: 3 additions & 2 deletions rabbitProduct.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ func (receiver *rabbitProduct) popChannel() rabbitChannel {
func (receiver *rabbitProduct) init() {
// 首次使用
receiver.workChannelCount = 0
receiver.chlQueue = make(chan rabbitChannel, 2048)
receiver.chlQueue = make(chan rabbitChannel, receiver.manager.config.MaxChannel)
// 按最低channel要求,创建指定数量的channel
go func() {
for (len(receiver.chlQueue) + parse.ToInt(receiver.workChannelCount)) < receiver.manager.config.MinChannel {
for (len(receiver.chlQueue) + parse.ToInt(receiver.workChannelCount)) < parse.ToInt(receiver.manager.config.MinChannel) {
if channel := receiver.createChannelAndConfirm(); channel.chl != nil && !channel.chl.IsClosed() {
receiver.chlQueue <- channel
}
Expand Down Expand Up @@ -149,6 +149,7 @@ func (receiver *rabbitProduct) SendMessage(message []byte, routingKey, messageId
defer func(rabbitChl rabbitChannel) {
receiver.pushChannel(rabbitChl)
}(rabbitChl)

// 发布消息
err := rabbitChl.chl.PublishWithContext(context.Background(),
receiver.manager.config.Exchange, // exchange
Expand Down
3 changes: 3 additions & 0 deletions register.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ func Register(key string, configString string) {
config.MinChannel = 10
}

if config.MinChannel > config.MaxChannel {
config.MaxChannel = config.MinChannel
}
if config.Exchange == "" {
_ = flog.Errorf("Rabbit配置:%s 缺少ExchangeName", config.Server)
return
Expand Down

0 comments on commit 0db4e65

Please sign in to comment.