Skip to content

Commit

Permalink
feat(MPC-616): add getTopicSubscriptions method to consumers (#66)
Browse files Browse the repository at this point in the history
* feat(MPC-616): add getTopicSubscriptions method to consumers

* feat(MPC-616): adjust 'for next major release' comment

* feat(MPC-616): fix method annotation
  • Loading branch information
healerz authored Jun 3, 2021
1 parent b02c90f commit 15b92ec
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 4 deletions.
8 changes: 8 additions & 0 deletions src/Consumer/AbstractKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ protected function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMes
);
}

/**
* @return array<int, TopicSubscription>
*/
public function getTopicSubscriptions(): array
{
return $this->kafkaConfiguration->getTopicSubscriptions();
}

/**
* @param integer $timeoutMs
* @return null|RdKafkaMessage
Expand Down
14 changes: 12 additions & 2 deletions src/Consumer/KafkaConsumerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

namespace Jobcloud\Kafka\Consumer;

use Jobcloud\Kafka\Consumer\ConsumerInterface;
use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface;
use RdKafka\Metadata\Topic as RdKafkaMetadataTopic;
use RdKafka\ConsumerTopic as RdKafkaConsumerTopic;
use RdKafka\TopicPartition as RdKafkaTopicPartition;

/**
* @todo v2: subscribe(array $topicSubscriptions = [])
* @method array getTopicSubscriptions()
*/

interface KafkaConsumerInterface
{
/**
Expand Down Expand Up @@ -105,4 +108,11 @@ public function getFirstOffsetForTopicPartition(string $topic, int $partition, i
* @return integer
*/
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int;

/**
* @todo v2
*
* @return array<int, TopicSubscription>
*/
//public function getTopicSubscriptions(): array;
}
4 changes: 2 additions & 2 deletions src/Consumer/KafkaHighLevelConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public function __construct(
*/
public function subscribe(array $topicSubscriptions = []): void
{
$subscriptions = $this->getTopicSubscriptions($topicSubscriptions);
$subscriptions = $this->getTopicSubscriptionNames($topicSubscriptions);
$assignments = $this->getTopicAssignments($topicSubscriptions);

if ([] !== $subscriptions && [] !== $assignments) {
Expand Down Expand Up @@ -244,7 +244,7 @@ private function getOffsetsToCommitForMessages(array $messages): array
* @param array<TopicSubscription> $topicSubscriptions
* @return array|string[]
*/
private function getTopicSubscriptions(array $topicSubscriptions = []): array
private function getTopicSubscriptionNames(array $topicSubscriptions = []): array
{
$subscriptions = [];

Expand Down
24 changes: 24 additions & 0 deletions tests/Unit/Consumer/KafkaHighLevelConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Jobcloud\Kafka\Tests\Unit\Kafka\Consumer;

use Jobcloud\Kafka\Consumer\KafkaHighLevelConsumer;
use Jobcloud\Kafka\Consumer\TopicSubscriptionInterface;
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
use Jobcloud\Kafka\Message\Decoder\DecoderInterface;
use Jobcloud\Kafka\Consumer\TopicSubscription;
Expand Down Expand Up @@ -639,6 +640,29 @@ public function testClose(): void
$kafkaConsumer->close();
}

/**
* @return void
*/
public function testGetTopicSubscriptionsReturnsTopicSubscriptions(): void
{
$rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class);
$decoderMock = $this->getMockForAbstractClass(DecoderInterface::class);

$topicSubscriptionsMock = [
$this->createMock(TopicSubscriptionInterface::class),
$this->createMock(TopicSubscriptionInterface::class)
];

$kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class);
$kafkaConfigurationMock->expects(self::once())
->method('getTopicSubscriptions')
->willReturn($topicSubscriptionsMock);

$kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock);

self::assertSame($topicSubscriptionsMock, $kafkaConsumer->getTopicSubscriptions());
}

/**
* @param int $partitionId
* @return RdKafkaMetadataPartition|MockObject
Expand Down
24 changes: 24 additions & 0 deletions tests/Unit/Consumer/KafkaLowLevelConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Jobcloud\Kafka\Tests\Unit\Kafka\Consumer;

use Jobcloud\Kafka\Consumer\KafkaLowLevelConsumer;
use Jobcloud\Kafka\Consumer\TopicSubscriptionInterface;
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
use Jobcloud\Kafka\Message\Decoder\DecoderInterface;
Expand Down Expand Up @@ -555,6 +556,29 @@ function (string $topic, int $partition, int &$lowOffset, int &$highOffset, int
$this->assertEquals(5, $lowOffset);
}

/**
* @return void
*/
public function testGetTopicSubscriptionsReturnsTopicSubscriptions(): void
{
$rdKafkaConsumerMock = $this->createMock(RdKafkaLowLevelConsumer::class);
$decoderMock = $this->getMockForAbstractClass(DecoderInterface::class);

$topicSubscriptionsMock = [
$this->createMock(TopicSubscriptionInterface::class),
$this->createMock(TopicSubscriptionInterface::class)
];

$kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class);
$kafkaConfigurationMock->expects(self::once())
->method('getTopicSubscriptions')
->willReturn($topicSubscriptionsMock);

$kafkaConsumer = new KafkaLowLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock);

self::assertSame($topicSubscriptionsMock, $kafkaConsumer->getTopicSubscriptions());
}

/**
* @param int $partitionId
* @return RdKafkaMetadataPartition|MockObject
Expand Down

0 comments on commit 15b92ec

Please sign in to comment.