Skip to content

Commit

Permalink
disbale reset kafka connection timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Enwei Jiao <[email protected]>
  • Loading branch information
jiaoew1991 committed Nov 23, 2023
1 parent a04b528 commit 8b73873
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap, extraConsumerCo
func NewKafkaClientInstanceWithConfig(ctx context.Context, config *paramtable.KafkaConfig) (*kafkaClient, error) {
kafkaConfig := getBasicConfig(config.Address.GetValue())

// connection setup timeout, default as 30000ms
// connection setup timeout, default as 30000ms, available range is [1000, 2147483647]
if deadline, ok := ctx.Deadline(); ok {
if deadline.Before(time.Now()) {
return nil, errors.New("context timeout when new kafka client")
}
timeout := time.Until(deadline).Milliseconds()
kafkaConfig.SetKey("socket.connection.setup.timeout.ms", timeout)
// timeout := time.Until(deadline).Milliseconds()
// kafkaConfig.SetKey("socket.connection.setup.timeout.ms", timeout)
}

if (config.SaslUsername.GetValue() == "" && config.SaslPassword.GetValue() != "") ||
Expand Down

0 comments on commit 8b73873

Please sign in to comment.