From 7688903c179e897e9cef83ea1e835e8e2d0e4fef Mon Sep 17 00:00:00 2001 From: steden <1470804@qq.com> Date: Wed, 10 Apr 2024 12:01:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9AasyncLocal.GC?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E3=80=82=E7=94=A8=E4=BA=8E=E4=B8=BB=E5=8A=A8?= =?UTF-8?q?=E6=B8=85=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rabbitConsumer.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rabbitConsumer.go b/rabbitConsumer.go index 5505113..b0cc16c 100644 --- a/rabbitConsumer.go +++ b/rabbitConsumer.go @@ -2,6 +2,7 @@ package rabbit import ( "github.com/farseer-go/collections" + "github.com/farseer-go/fs/asyncLocal" "github.com/farseer-go/fs/exception" "github.com/farseer-go/fs/flog" amqp "github.com/rabbitmq/amqp091-go" @@ -54,6 +55,7 @@ func (receiver *rabbitConsumer) Subscribe(queueName string, routingKey string, p } // 读取通道的消息 for page := range deliveries { + asyncLocal.GC() entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) args := receiver.createEventArgs(page, queueName) exception.Try(func() { @@ -85,6 +87,7 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string } // 读取通道的消息 for page := range deliveries { + asyncLocal.GC() entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) args := receiver.createEventArgs(page, queueName) isSuccess := false @@ -127,6 +130,7 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri go func() { var chl *amqp.Channel for { + asyncLocal.GC() time.Sleep(500 * time.Millisecond) entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) // 创建一个连接和通道 @@ -166,6 +170,7 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s go func() { var chl *amqp.Channel for { + asyncLocal.GC() time.Sleep(100 * time.Millisecond) entryMqConsumer := receiver.manager.traceManager.EntryMqConsumer(receiver.manager.config.Server, queueName, receiver.manager.config.RoutingKey) // 创建一个连接和通道