Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/remove low level consumer #40

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ infection-testing:
make coverage
cp -f build/logs/phpunit/junit.xml build/logs/phpunit/coverage/junit.xml
sudo php-ext-disable pcov
${INFECTION} --coverage=build/logs/phpunit/coverage --min-msi=91 --threads=`nproc`
${INFECTION} --coverage=build/logs/phpunit/coverage --min-msi=93 --threads=`nproc`
sudo php-ext-enable pcov

pcov-enable:
Expand Down
40 changes: 0 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,46 +204,6 @@ while (true) {
}
```

#### Kafka Low Level

```php
<?php

use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;

$consumer = KafkaConsumerBuilder::create()
->withAdditionalConfig(
[
'compression.codec' => 'lz4',
'auto.commit.interval.ms' => 500
]
)
->withAdditionalBroker('kafka:9092')
->withConsumerGroup('testGroup')
->withAdditionalSubscription('test-topic')
->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL)
->build();

$consumer->subscribe();

while (true) {
try {
$message = $consumer->consume();
// your business logic
$consumer->commit($message);
} catch (KafkaConsumerTimeoutException $e) {
//no messages were read in a given time
} catch (KafkaConsumerEndOfPartitionException $e) {
//only occurs if enable.partition.eof is true (default: false)
} catch (KafkaConsumerConsumeException $e) {
// Failed
}
}
```

#### Avro Consumer
To create an avro consumer add the avro decoder.

Expand Down
40 changes: 2 additions & 38 deletions src/Conf/KafkaConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

namespace Jobcloud\Kafka\Conf;

use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Kafka\Consumer\TopicSubscription;
use RdKafka\Conf as RdKafkaConf;
use RdKafka\TopicConf as RdKafkaTopicConf;

class KafkaConfiguration extends RdKafkaConf
{
Expand All @@ -22,32 +20,17 @@ class KafkaConfiguration extends RdKafkaConf
*/
protected $topicSubscriptions;

/**
* @var string
*/
private $type;

/**
* @var array<string,int>
*/
private $lowLevelTopicSettings = [
'auto.commit.interval.ms' => 1,
'auto.offset.reset' => 1,
];

/**
* @param string[] $brokers
* @param array|TopicSubscription[] $topicSubscriptions
* @param mixed[] $config
* @param string $type
*/
public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [], string $type = '')
public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [])
{
parent::__construct();

$this->brokers = $brokers;
$this->topicSubscriptions = $topicSubscriptions;
$this->type = $type;
$this->initializeConfig($config);
}

Expand Down Expand Up @@ -79,23 +62,13 @@ public function getConfiguration(): array
* @param mixed[] $config
* @return void
*/
protected function initializeConfig(array $config = []): void
private function initializeConfig(array $config = []): void
{
$topicConf = new RdKafkaTopicConf();

foreach ($config as $name => $value) {
if (false === is_scalar($value)) {
continue;
}

if (
KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL === $this->type
&& true === $this->isLowLevelTopicConfSetting($name)
) {
$topicConf->set($name, (string) $value);
$this->setDefaultTopicConf($topicConf);
}

if (true === is_bool($value)) {
$value = true === $value ? 'true' : 'false';
}
Expand All @@ -105,13 +78,4 @@ protected function initializeConfig(array $config = []): void

$this->set('metadata.broker.list', implode(',', $this->getBrokers()));
}

/**
* @param string $settingName
* @return bool
*/
private function isLowLevelTopicConfSetting(string $settingName): bool
{
return true === isset($this->lowLevelTopicSettings[$settingName]);
}
}
7 changes: 4 additions & 3 deletions src/Consumer/AbstractKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
use Jobcloud\Kafka\Conf\KafkaConfiguration;
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
use Jobcloud\Kafka\Message\KafkaConsumerMessage;
use RdKafka\Consumer as RdKafkaLowLevelConsumer;
use RdKafka\Exception as RdKafkaException;
use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer;
use RdKafka\KafkaConsumerTopic as RdKafkaConsumerTopic;
use RdKafka\Metadata\Topic as RdKafkaMetadataTopic;
use RdKafka\Message as RdKafkaMessage;
use RdKafka\TopicPartition as RdKafkaTopicPartition;
Expand All @@ -32,7 +32,7 @@ abstract class AbstractKafkaConsumer implements KafkaConsumerInterface
protected $subscribed = false;

/**
* @var RdKafkaLowLevelConsumer|RdKafkaHighLevelConsumer
* @var RdKafkaHighLevelConsumer
*/
protected $consumer;

Expand Down Expand Up @@ -141,6 +141,7 @@ public function decodeMessage(KafkaConsumerMessageInterface $message): KafkaCons
*/
public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000): RdKafkaMetadataTopic
{
/** @var RdKafkaConsumerTopic $topic */
$topic = $this->consumer->newTopic($topicName);
return $this->consumer
->getMetadata(
Expand Down Expand Up @@ -222,7 +223,7 @@ protected function getAllTopicPartitions(string $topic): array
* @param RdKafkaMessage $message
* @return KafkaConsumerMessageInterface
*/
protected function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMessageInterface
private function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMessageInterface
{
return new KafkaConsumerMessage(
(string) $message->topic_name,
Expand Down
49 changes: 2 additions & 47 deletions src/Consumer/KafkaConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,10 @@
use Jobcloud\Kafka\Exception\KafkaConsumerBuilderException;
use Jobcloud\Kafka\Message\Decoder\DecoderInterface;
use Jobcloud\Kafka\Message\Decoder\NullDecoder;
use RdKafka\Consumer as RdKafkaLowLevelConsumer;
use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer;

final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface
{

public const CONSUMER_TYPE_LOW_LEVEL = 'low';
public const CONSUMER_TYPE_HIGH_LEVEL = 'high';

/**
* @var string[]
*/
Expand All @@ -42,11 +37,6 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface
*/
private $consumerGroup = 'default';

/**
* @var string
*/
private $consumerType = self::CONSUMER_TYPE_HIGH_LEVEL;

/**
* @var callable
*/
Expand Down Expand Up @@ -180,20 +170,6 @@ public function withConsumerGroup(string $consumerGroup): KafkaConsumerBuilderIn
return $that;
}

/**
* Set the consumer type, can be either CONSUMER_TYPE_LOW_LEVEL or CONSUMER_TYPE_HIGH_LEVEL
*
* @param string $consumerType
* @return KafkaConsumerBuilderInterface
*/
public function withConsumerType(string $consumerType): KafkaConsumerBuilderInterface
{
$that = clone $this;
$that->consumerType = $consumerType;

return $that;
}

/**
* Set a callback to be called on errors.
* The default callback will throw an exception for every error
Expand Down Expand Up @@ -299,34 +275,13 @@ public function build(): KafkaConsumerInterface
$kafkaConfig = new KafkaConfiguration(
$this->brokers,
$this->topics,
$this->config,
$this->consumerType
$this->config
);

//set consumer callbacks
$this->registerCallbacks($kafkaConfig);

//create RdConsumer
if (self::CONSUMER_TYPE_LOW_LEVEL === $this->consumerType) {
if (null !== $this->consumeCallback) {
throw new KafkaConsumerBuilderException(
sprintf(
KafkaConsumerBuilderException::UNSUPPORTED_CALLBACK_EXCEPTION_MESSAGE,
'consumerCallback',
KafkaLowLevelConsumer::class
)
);
}

$rdKafkaConsumer = new RdKafkaLowLevelConsumer($kafkaConfig);

return new KafkaLowLevelConsumer(
$rdKafkaConsumer,
$kafkaConfig,
$this->decoder
);
}

$rdKafkaConsumer = new RdKafkaHighLevelConsumer($kafkaConfig);

return new KafkaHighLevelConsumer($rdKafkaConsumer, $kafkaConfig, $this->decoder);
Expand All @@ -349,7 +304,7 @@ private function registerCallbacks(KafkaConfiguration $conf): void
}

if (null !== $this->logCallback) {
$conf->setLogCb($this->logCallback);
//$conf->setLogCb($this->logCallback);
}

if (null !== $this->offsetCommitCallback) {
Expand Down
8 changes: 0 additions & 8 deletions src/Consumer/KafkaConsumerBuilderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ public function withAdditionalConfig(array $config): self;
*/
public function withConsumerGroup(string $consumerGroup): self;

/**
* Set the consumer type, can be either CONSUMER_TYPE_LOW_LEVEL or CONSUMER_TYPE_HIGH_LEVEL
*
* @param string $consumerType
* @return KafkaConsumerBuilderInterface
*/
public function withConsumerType(string $consumerType): self;

/**
* Set a callback to be called on errors.
* The default callback will throw an exception for every error
Expand Down
Loading