Skip to content

Commit

Permalink
调整:asyncLocal自动释放功能
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Jul 15, 2024
1 parent d870e14 commit d580feb
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions rabbitConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func (receiver *rabbitConsumer) Subscribe(queueName string, routingKey string, p
}
// 读取通道的消息
for page := range deliveries {
asyncLocal.Release()
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
args := receiver.createEventArgs(page, queueName)
exception.Try(func() {
Expand All @@ -65,6 +64,7 @@ func (receiver *rabbitConsumer) Subscribe(queueName string, routingKey string, p
entryMqConsumer.Error(flog.Errorf("rabbit:Subscribe exception:%s", exp))
})
entryMqConsumer.End()
asyncLocal.Release()
}
// 通道关闭了
if chl != nil {
Expand All @@ -88,7 +88,6 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string
}
// 读取通道的消息
for page := range deliveries {
asyncLocal.Release()
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(page.CorrelationId, page.AppId, receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
args := receiver.createEventArgs(page, queueName)
isSuccess := false
Expand All @@ -108,6 +107,7 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string
}
}
entryMqConsumer.End()
asyncLocal.Release()
}
// 通道关闭了
if chl != nil {
Expand All @@ -131,7 +131,6 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri
go func() {
var chl *amqp.Channel
for {
asyncLocal.Release()
time.Sleep(500 * time.Millisecond)
// 创建一个连接和通道
var err error
Expand All @@ -145,9 +144,8 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri
}
}

lst, _ := receiver.pullBatch(queueName, true, pullCount, chl)
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
if lst.Count() > 0 {
if lst, _ := receiver.pullBatch(queueName, true, pullCount, chl); lst.Count() > 0 {
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
exception.Try(func() {
consumerHandle(lst)
}).CatchException(func(exp any) {
Expand All @@ -156,6 +154,7 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri
// 数量大于0,才追踪
entryMqConsumer.End()
}
asyncLocal.Release()
}
}()
}
Expand All @@ -172,7 +171,6 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s
go func() {
var chl *amqp.Channel
for {
asyncLocal.Release()
time.Sleep(100 * time.Millisecond)
// 创建一个连接和通道
var err error
Expand All @@ -186,8 +184,7 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s
}
}

lst, lastPage := receiver.pullBatch(queueName, false, pullCount, chl)
if lst.Count() > 0 {
if lst, lastPage := receiver.pullBatch(queueName, false, pullCount, chl); lst.Count() > 0 {
entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer("", "", receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey)
isSuccess := false
exception.Try(func() {
Expand All @@ -212,6 +209,7 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s
// 数量大于0,才追踪
entryMqConsumer.End()
}
asyncLocal.Release()
}
}()
}
Expand Down

0 comments on commit d580feb

Please sign in to comment.