Skip to content

Commit

Permalink
enhance: Remove pool from produer and consumer in kafka client (#38264)
Browse files Browse the repository at this point in the history
issue: #38263

Revert "fix: Move init kafka pool into once (#37786)"

Revert "enhance: Use pool to limit kafka cgo thread number (#37744)"

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Dec 6, 2024
1 parent 18bef5e commit edabfa8
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 115 deletions.
84 changes: 32 additions & 52 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ func newKafkaConsumer(config *kafka.ConfigMap, bufSize int64, topic string, grou

func (kc *Consumer) createKafkaConsumer() error {
var err error
getPool().Submit(func() (any, error) {
kc.c, err = kafka.NewConsumer(kc.config)
return nil, err
}).Await()
kc.c, err = kafka.NewConsumer(kc.config)
if err != nil {
log.Error("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err))
return err
Expand Down Expand Up @@ -136,12 +133,7 @@ func (kc *Consumer) Chan() <-chan common.Message {
return
default:
readTimeout := paramtable.Get().KafkaCfg.ReadTimeout.GetAsDuration(time.Second)
var e *kafka.Message
var err error
getPool().Submit(func() (any, error) {
e, err = kc.c.ReadMessage(readTimeout)
return nil, err
}).Await()
e, err := kc.c.ReadMessage(readTimeout)
if err != nil {
// if we failed to read message in 30 Seconds, print out a warn message since there should always be a tt
log.Warn("consume msg failed", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err))
Expand Down Expand Up @@ -174,40 +166,38 @@ func (kc *Consumer) Seek(id common.MessageID, inclusive bool) error {
}

func (kc *Consumer) internalSeek(offset kafka.Offset, inclusive bool) error {
_, err := getPool().Submit(func() (any, error) {
log.Info("kafka consumer seek start", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive))
start := time.Now()
err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}})
if err != nil {
log.Warn("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err))
return nil, err
}
log.Info("kafka consumer seek start", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive))

cost := time.Since(start).Milliseconds()
if cost > 200 {
log.Warn("kafka consumer assign take too long!", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))
}
start := time.Now()
err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}})
if err != nil {
log.Warn("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err))
return err
}

// If seek timeout is not 0 the call twice will return error isStarted RD_KAFKA_RESP_ERR__STATE.
// if the timeout is 0 it will initiate the seek but return immediately without any error reporting
kc.skipMsg = !inclusive
if err := kc.c.Seek(kafka.TopicPartition{
Topic: &kc.topic,
Partition: mqwrapper.DefaultPartitionIdx,
Offset: offset,
}, timeout); err != nil {
return nil, err
}
cost = time.Since(start).Milliseconds()
log.Info("kafka consumer seek finished", zap.String("topic name", kc.topic),
cost := time.Since(start).Milliseconds()
if cost > 200 {
log.Warn("kafka consumer assign take too long!", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))
}

kc.hasAssign = true
return nil, nil
}).Await()
return err
// If seek timeout is not 0 the call twice will return error isStarted RD_KAFKA_RESP_ERR__STATE.
// if the timeout is 0 it will initiate the seek but return immediately without any error reporting
kc.skipMsg = !inclusive
if err := kc.c.Seek(kafka.TopicPartition{
Topic: &kc.topic,
Partition: mqwrapper.DefaultPartitionIdx,
Offset: offset,
}, timeout); err != nil {
return err
}
cost = time.Since(start).Milliseconds()
log.Info("kafka consumer seek finished", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))

kc.hasAssign = true
return nil
}

func (kc *Consumer) Ack(message common.Message) {
Expand All @@ -217,13 +207,7 @@ func (kc *Consumer) Ack(message common.Message) {
}

func (kc *Consumer) GetLatestMsgID() (common.MessageID, error) {
var low, high int64
var err error

getPool().Submit(func() (any, error) {
low, high, err = kc.c.QueryWatermarkOffsets(kc.topic, mqwrapper.DefaultPartitionIdx, timeout)
return nil, err
}).Await()
low, high, err := kc.c.QueryWatermarkOffsets(kc.topic, mqwrapper.DefaultPartitionIdx, timeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -260,11 +244,7 @@ func (kc *Consumer) CheckTopicValid(topic string) error {
func (kc *Consumer) closeInternal() {
log.Info("close consumer ", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID))
start := time.Now()
var err error
getPool().Submit(func() (any, error) {
err = kc.c.Close()
return nil, err
}).Await()
err := kc.c.Close()
if err != nil {
log.Warn("failed to close ", zap.String("topic", kc.topic), zap.Error(err))
}
Expand Down
14 changes: 5 additions & 9 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,11 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqcommon.ProducerMes
header := kafka.Header{Key: key, Value: []byte(value)}
headers = append(headers, header)
}
var err error
getPool().Submit(func() (any, error) {
err = kp.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
Value: message.Payload,
Headers: headers,
}, kp.deliveryChan)
return nil, err
})
err := kp.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
Value: message.Payload,
Headers: headers,
}, kp.deliveryChan)
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
return nil, err
Expand Down
54 changes: 0 additions & 54 deletions pkg/mq/msgstream/mqwrapper/kafka/pool.go

This file was deleted.

0 comments on commit edabfa8

Please sign in to comment.