diff --git a/src/Producer/KafkaProducer.php b/src/Producer/KafkaProducer.php index ae4a963..3b1e8bc 100644 --- a/src/Producer/KafkaProducer.php +++ b/src/Producer/KafkaProducer.php @@ -74,7 +74,38 @@ public function produce(KafkaProducerMessageInterface $message, bool $autoPoll = $topicProducer = $this->getProducerTopicForTopic($message->getTopicName()); - $topicProducer->producev($message->getPartition(), RD_KAFKA_MSG_F_BLOCK, + $topicProducer->producev( + $message->getPartition(), + RD_KAFKA_MSG_F_BLOCK, + $message->getBody(), + $message->getKey(), + $message->getHeaders() + ); + + // This is a test comment + if (true === $autoPoll) { + $this->producer->poll($pollTimeoutMs); + } + } + + /** + * Produces a message to the topic and partition defined in the message + * If a schema name was given, the message body will be avro serialized. + * + * @param KafkaProducerMessageInterface $message + * @param boolean $autoPoll + * @param integer $pollTimeoutMs + * @return void + */ + public function produceSameSame(KafkaProducerMessageInterface $message, bool $autoPoll = true, int $pollTimeoutMs = 0): void + { + $message = $this->encoder->encode($message); + + $topicProducer = $this->getProducerTopicForTopic($message->getTopicName()); + + $topicProducer->producev( + $message->getPartition(), + RD_KAFKA_MSG_F_BLOCK, $message->getBody(), $message->getKey(), $message->getHeaders() @@ -97,6 +128,7 @@ public function produce(KafkaProducerMessageInterface $message, bool $autoPoll = public function syncProduce(KafkaProducerMessageInterface $message): void { $this->produce($message, true, -1); + $this->produceSameSame($message, true, -1); } /**