diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index d8e45bdcbb0f3..8878f0ee453ed 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -59,13 +59,13 @@ func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap, extraConsumerCo func NewKafkaClientInstanceWithConfig(ctx context.Context, config *paramtable.KafkaConfig) (*kafkaClient, error) { kafkaConfig := getBasicConfig(config.Address.GetValue()) - // connection setup timeout, default as 30000ms + // connection setup timeout, default as 30000ms, available range is [1000, 2147483647] if deadline, ok := ctx.Deadline(); ok { if deadline.Before(time.Now()) { return nil, errors.New("context timeout when new kafka client") } - timeout := time.Until(deadline).Milliseconds() - kafkaConfig.SetKey("socket.connection.setup.timeout.ms", timeout) + // timeout := time.Until(deadline).Milliseconds() + // kafkaConfig.SetKey("socket.connection.setup.timeout.ms", timeout) } if (config.SaslUsername.GetValue() == "" && config.SaslPassword.GetValue() != "") ||