Skip to content

Commit

Permalink
修改日志内容
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Jan 17, 2024
1 parent 950e950 commit df20633
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions rabbitConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ func (receiver *rabbitConsumer) createQueueAndBindAndConsume(queueName, routingK

// 设置预读数量
if err = chl.Qos(prefetchCount, 0, false); err != nil {
_ = flog.Errorf("failed to Qos %s: %s", queueName, err)
_ = flog.Errorf("rabbit:failed to Qos %s: %s", queueName, err)
return nil, nil, err
}

// 订阅消息
deliveries, err := chl.Consume(queueName, "", autoAck, false, false, false, nil)
if err != nil {
_ = flog.Errorf("failed to Subscribe %s: %s", queueName, err)
_ = flog.Errorf("rabbit:failed to Subscribe %s: %s", queueName, err)
return nil, nil, err
}
return chl, deliveries, nil
Expand All @@ -59,7 +59,7 @@ func (receiver *rabbitConsumer) Subscribe(queueName string, routingKey string, p
exception.Try(func() {
consumerHandle(string(page.Body), args)
}).CatchException(func(exp any) {
entryMqConsumer.Error(flog.Errorf("Subscribe exception:%s", exp))
entryMqConsumer.Error(flog.Errorf("rabbit:Subscribe exception:%s", exp))
})
entryMqConsumer.End()
}
Expand Down Expand Up @@ -91,16 +91,16 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string
exception.Try(func() {
if isSuccess = consumerHandle(string(page.Body), args); isSuccess {
if err = page.Ack(false); err != nil {
_ = flog.Errorf("subscribeAck failed to Ack q=%s: %s %s", queueName, err, string(page.Body))
_ = flog.Errorf("rabbit:SubscribeAck failed to Ack q=%s: %s %s", queueName, err, string(page.Body))
}
}
}).CatchException(func(exp any) {
entryMqConsumer.Error(flog.Errorf("subscribeAck exception: q=%s err:%s", queueName, exp))
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeAck exception: q=%s err:%s", queueName, exp))
})
if !isSuccess {
// Nack
if err = page.Nack(false, true); err != nil {
entryMqConsumer.Error(flog.Errorf("subscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body)))
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeAck failed to Nack %s: q=%s %s", queueName, err, string(page.Body)))
}
}
entryMqConsumer.End()
Expand All @@ -117,7 +117,7 @@ func (receiver *rabbitConsumer) SubscribeAck(queueName string, routingKey string

func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey string, pullCount int, consumerHandle func(messages collections.List[EventArgs])) {
if pullCount < 1 {
flog.Panicf("the parameter pullCount must be greater than 0, %s: %d", queueName, pullCount)
flog.Panicf("rabbit:the parameter pullCount must be greater than 0, %s: %d", queueName, pullCount)
}

if _, err := receiver.createAndBindQueue(nil, queueName, routingKey); err != nil {
Expand Down Expand Up @@ -145,7 +145,7 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri
exception.Try(func() {
consumerHandle(lst)
}).CatchException(func(exp any) {
entryMqConsumer.Error(flog.Errorf("SubscribeBatch exception:%s", exp))
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatch exception:%s", exp))
})
// 数量大于0,才追踪
entryMqConsumer.End()
Expand All @@ -156,7 +156,7 @@ func (receiver *rabbitConsumer) SubscribeBatch(queueName string, routingKey stri

func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey string, pullCount int, consumerHandle func(messages collections.List[EventArgs]) bool) {
if pullCount < 1 {
flog.Panicf("the parameter pullCount must be greater than 0, %s: %d", queueName, pullCount)
flog.Panicf("rabbit:the parameter pullCount must be greater than 0, %s: %d", queueName, pullCount)
}

if _, err := receiver.createAndBindQueue(nil, queueName, routingKey); err != nil {
Expand Down Expand Up @@ -186,16 +186,16 @@ func (receiver *rabbitConsumer) SubscribeBatchAck(queueName string, routingKey s
isSuccess = consumerHandle(lst)
if isSuccess {
if err := lastPage.Ack(true); err != nil {
entryMqConsumer.Error(flog.Errorf("subscribeBatchAck failed to Ack %s: %s", queueName, err))
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatchAck failed to Ack %s: %s", queueName, err))
}
}
}).CatchException(func(exp any) {
entryMqConsumer.Error(flog.Errorf("subscribeBatchAck exception:%s", exp))
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatchAck exception:%s", exp))
})
if !isSuccess {
// Nack
if err := lastPage.Nack(true, true); err != nil {
entryMqConsumer.Error(flog.Errorf("subscribeBatchAck failed to Nack %s: %s", queueName, err))
entryMqConsumer.Error(flog.Errorf("rabbit:SubscribeBatchAck failed to Nack %s: %s", queueName, err))
}
}
// 数量大于0,才追踪
Expand Down Expand Up @@ -240,7 +240,7 @@ func (receiver *rabbitConsumer) pullBatch(queueName string, autoAck bool, pullCo
for lst.Count() < pullCount {
msg, ok, err := chl.Get(queueName, autoAck)
if err != nil {
_ = flog.Errorf("failed to Get %s: %s", queueName, err)
_ = flog.Errorf("rabbit:failed to Get %s: %s", queueName, err)
}
if !ok {
break
Expand Down

0 comments on commit df20633

Please sign in to comment.