Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka producer timeout after waiting for 5000ms #23

Open
AlejandroITOP opened this issue Nov 25, 2019 · 2 comments
Open

Kafka producer timeout after waiting for 5000ms #23

AlejandroITOP opened this issue Nov 25, 2019 · 2 comments

Comments

@AlejandroITOP
Copy link

Hello, I am using a kafka producer to pass data to my kafka, the problem is that pointing to localhost I always create the topic that I specify in the "Topic" field but then after a while I skip the following error and I don't know how to fix it.

The error is :
019/11/25 13:16:58 - Kafka producer.0 - ERROR (version 8.3.0.0-371, build 8.3.0.0-371 from 2019-06-11 11.09.08 by buildguy) : Error desconocido
2019/11/25 13:16:58 - Kafka producer.0 - ERROR (version 8.3.0.0-371, build 8.3.0.0-371 from 2019-06-11 11.09.08 by buildguy) : java.lang.IllegalStateException: java.util.concurrent.TimeoutException: Timeout after waiting for 5000 ms.
2019/11/25 13:16:58 - Kafka producer.0 - at org.pentaho.big.data.kettle.plugins.kafka.KafkaProducerOutput.processRow(KafkaProducerOutput.java:123)
2019/11/25 13:16:58 - Kafka producer.0 - at org.pentaho.di.trans.step.RunThread.run(RunThread.java:62)
2019/11/25 13:16:58 - Kafka producer.0 - at java.lang.Thread.run(Thread.java:748)
2019/11/25 13:16:58 - Kafka producer.0 - Caused by: java.util.concurrent.TimeoutException: Timeout after waiting for 5000 ms.
2019/11/25 13:16:58 - Kafka producer.0 - at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
2019/11/25 13:16:58 - Kafka producer.0 - at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
2019/11/25 13:16:58 - Kafka producer.0 - at org.pentaho.big.data.kettle.plugins.kafka.KafkaProducerOutput.processRow(KafkaProducerOutput.java:119)
2019/11/25 13:16:58 - Kafka producer.0 - ... 2 more
2019/11/25 13:17:23 - Kafka producer.0 - Procesamiento finalizado (I=0, O=0, R=1, W=0, U=0, E=1)
2019/11/25 13:17:23 - Kafka - ERROR (version 8.3.0.0-371, build 8.3.0.0-371 from 2019-06-11 11.09.08 by buildguy) : Errores detectados!
2019/11/25 13:17:23 - Spoon - La transformaci�n ha finalizado!!
2019/11/25 13:17:23 - Kafka - ERROR (version 8.3.0.0-371, build 8.3.0.0-371 from 2019-06-11 11.09.08 by buildguy) : Errores detectados!
2019/11/25 13:17:23 - Kafka - ERROR (version 8.3.0.0-371, build 8.3.0.0-371 from 2019-06-11 11.09.08 by buildguy) : Errores detectados!
2019/11/25 13:17:23 - Kafka - Transformaci�n detectada
2019/11/25 13:17:23 - Kafka - Transformaci�n est� matando los otros pasos!

@spektom
Copy link
Member

spektom commented Nov 25, 2019

Looks like an issue connecting to Kafka broker. Are you able to connect using simple client?

@AlejandroITOP
Copy link
Author

AlejandroITOP commented Nov 26, 2019

I explain to you, the connection I am making is direct.

image

So far for testing I have used a program in python to insert data into a specific topic. With this code everything works correctly (pointing to the same boostrap server that I am targeting with pentaho / kafka-producer).
Both the python and the pentaho create the topic if it does not exist when executing the kafka producer but only in python is the data insertion being executed while in pentaho the error I attached in my first comment is skipped.

-Attached python code:

import sys
import json
import math
from datetime import datetime
from time import sleep
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

list_category = ['10', '20', '30']
for i in range(0, 100000):
for cur_cate in list_category:
cur_result = {};
cur_result['time'] = datetime.strftime(datetime.utcnow(), "%Y-%m-%dT%H:%M:%SZ");
cur_result['category'] = cur_cate
cur_result['value_01'] = int(math.sin(i/float(cur_cate))*100)*1;
cur_result['value_02'] = int(math.sin(i/float(cur_cate))*100)*2;
cur_result['value_03'] = int(math.sin(i/float(cur_cate))*100)*3;
jcur_result= json.dumps(cur_result)
print(jcur_result)
producer.send('employee', cur_result)
sys.stdout.flush()
producer.flush()
sleep(1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants