You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
private final String topic;
private final Properties props;
public KafkaExample(String brokers, String username, String password) {
this.topic = username + "-default";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, username, password);
String serializer = StringSerializer.class.getName();
String deserializer = StringDeserializer.class.getName();
props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", username + "-consumer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", jaasCfg);
}
public void consume() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s [%d] offset=%d, key=%s, value=\"%s\"\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
}
}
public void produce() {
Thread one = new Thread() {
public void run() {
try {
Producer<String, String> producer = new KafkaProducer<>(props);
int i = 0;
while(true) {
Date d = new Date();
producer.send(new ProducerRecord<>(topic, Integer.toString(i), d.toString()));
Thread.sleep(1000);
i++;
}
} catch (InterruptedException v) {
System.out.println(v);
}
}
};
one.start();
}
public static void main(String[] args) {
String brokers = "velomobile-01.srvs.cloudkafka.com:9094,velomobile-02.srvs.cloudkafka.com:9094,velomobile-03.srvs.cloudkafka.com:9094";
String username = "username";
String password = "password";
KafkaExample c = new KafkaExample(brokers, username, password);
c.produce();
//System.out.println("---------- produced ----------");
//c.consume();
}
}
when i ran it as java application in eclipse, it is showing this in console
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "Thread-0" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:181)
at practice.KafkaExample$1.run(KafkaExample.java:62)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:73)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:80)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:271)
... 2 more
Caused by: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:289)
at org.apache.kafka.common.security.kerberos.Login.(Login.java:104)
at org.apache.kafka.common.security.kerberos.LoginManager.(LoginManager.java:44)
at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55)
... 5 more
The text was updated successfully, but these errors were encountered:
i used the code available at https://github.com/CloudKarafka/java-kafka-example
replaced the brokers, username, password values with my cloudkarafka details.
following is that program
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
public class KafkaExample {
}
when i ran it as java application in eclipse, it is showing this in console
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "Thread-0" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:181)
at practice.KafkaExample$1.run(KafkaExample.java:62)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:73)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:80)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:271)
... 2 more
Caused by: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:289)
at org.apache.kafka.common.security.kerberos.Login.(Login.java:104)
at org.apache.kafka.common.security.kerberos.LoginManager.(LoginManager.java:44)
at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55)
... 5 more
The text was updated successfully, but these errors were encountered: