Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

confusion with input kafka and output kafka #227

Open
jmreymond opened this issue Jun 19, 2019 · 2 comments
Open

confusion with input kafka and output kafka #227

jmreymond opened this issue Jun 19, 2019 · 2 comments

Comments

@jmreymond
Copy link

  • Version: 6.5.4
  • Operating System: RedHat 7.2
  • Config File (if you have sensitive info, please remove it):
  • Sample Data:
  • Steps to Reproduce:

I want to read a kafka topic and write another kafka topic.
If I read one kafka topic and write the output to a console, it is ok

`input {
kafka {
bootstrap_servers => "KAFKA-INT-1.dom101.intres:9093,KAFKA-INT-2.dom101.intres:9093,KAFKA-INT-3.dom101.intres:9093"
group_id => "JMR_ACTIVSYNC_tst"
topics => "JMR_ACTIVSYNC"
client_id => "logstash_JMR_ACTIVSYNC_tst"
consumer_threads => 3
codec => json {charset => "UTF-8"}
decorate_events => true
max_partition_fetch_bytes => "500000"
session_timeout_ms => "290000"
request_timeout_ms => "300000"
auto_offset_reset => "earliest"
jaas_path => "/etc/logstash/kafka-jaas.conf"
sasl_mechanism => "SCRAM-SHA-512"
security_protocol => "SASL_PLAINTEXT"
}
}

filter {

}

output {
if [type_log] == "JMR_ACTIVSYNC"
{
stdout { codec => rubydebug {metadata => true} }
}
}
`
If I read from a genarator and write to a kafka topic, it is ok

`input {
generator {
lines => [
"line 1",
"line 2",
"line 3"
]
# Emit all lines 3 times.
count => 3
}
}

filter {

}

output {
stdout { codec => rubydebug {metadata => true} }
kafka {
bootstrap_servers => "KAFKA-INT-1.dom101.intres:9093,KAFKA-INT-2.dom101.intres:9093,KAFKA-INT-3.dom101.intres:9093"
topic_id => "JMR_AJNA_TEST"
codec => json
jaas_path => "/tmp/logstash/kafka-jaas.conf"
sasl_mechanism => "SCRAM-SHA-512"
security_protocol => "SASL_PLAINTEXT"
}
}
`
But, it does not work with an input and an output kafka

`input {
kafka {
bootstrap_servers => "KAFKA-INT-1.dom101.intres:9093,KAFKA-INT-2.dom101.intres:9093,KAFKA-INT-3.dom101.intres:9093"
group_id => "JMR_ACTIVSYNC_tst"
topics => "JMR_ACTIVSYNC"
client_id => "logstash_JMR_ACTIVSYNC_tst"
consumer_threads => 3
codec => json {charset => "UTF-8"}
decorate_events => true
max_partition_fetch_bytes => "500000"
session_timeout_ms => "290000"
request_timeout_ms => "300000"
auto_offset_reset => "earliest"
jaas_path => "/etc/logstash/kafka-jaas.conf"
sasl_mechanism => "SCRAM-SHA-512"
security_protocol => "SASL_PLAINTEXT"
}
}

filter {

}

output {
stdout { codec => rubydebug {metadata => true} }
kafka {
bootstrap_servers => "KAFKA-INT-1.dom101.intres:9093,KAFKA-INT-2.dom101.intres:9093,KAFKA-INT-3.dom101.intres:9093"
topic_id => "JMR_AJNA_TEST"
codec => json
jaas_path => "/tmp/logstash/kafka-jaas.conf"
sasl_mechanism => "SCRAM-SHA-512"
security_protocol => "SASL_PLAINTEXT"
}
}
`

the result is access authorization ! There is a confusion in logstash between input kafka topic and output kafka topic

`[2019-06-19T13:59:58,119][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.0.1
[2019-06-19T13:59:58,119][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : fa14705e51bd2ce5
[2019-06-19T13:59:58,120][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [KAFKA-INT-1.dom101.intres:9093, KAFKA-INT-2.dom101.intres:9093, KAFKA-INT-3.dom101.intres:9093]
check.crcs = true
client.id = logstash_JMR_ACTIVSYNC_tst-2
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = JMR_ACTIVSYNC_tst
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 500000
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 300000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-512
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 290000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2019-06-19T13:59:58,128][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.0.1
[2019-06-19T13:59:58,128][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : fa14705e51bd2ce5
[2019-06-19T13:59:59,133][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash_JMR_ACTIVSYNC_tst-1, groupId=JMR_ACTIVSYNC_tst] Error while fetching metadata with correlation id 5 : {JMR_ACTIVSYNC=TOPIC_AUTHORIZATION_FAILED}
[2019-06-19T13:59:59,133][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash_JMR_ACTIVSYNC_tst-2, groupId=JMR_ACTIVSYNC_tst] Error while fetching metadata with correlation id 5 : {JMR_ACTIVSYNC=TOPIC_AUTHORIZATION_FAILED}
[2019-06-19T13:59:59,135][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash_JMR_ACTIVSYNC_tst-0, groupId=JMR_ACTIVSYNC_tst] Error while fetching metadata with correlation id 6 : {JMR_ACTIVSYNC=TOPIC_AUTHORIZATION_FAILED}
Exception in thread "Ruby-0-Thread-16: :1" org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [JMR_ACTIVSYNC]
Exception in thread "Ruby-0-Thread-15: :1" org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [JMR_ACTIVSYNC]
Exception in thread "Ruby-0-Thread-17: :1" org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [JMR_ACTIVSYNC]
`

@GuillaumeDuf
Copy link

Hello ,
I got the same error with logstash when trying to read a topic and write in another topic from a secured KAFKA cluster

@cjlyons81
Copy link

I can confirm I am running into this same issue on Logstash 7.11. When jaas is used on both the input and the output something is being shared between the two causing issues. Perhaps it's specific to the new integrations plugin and how input and output are bundled together. I will post more detail as I have time to investigate more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants