Skip to content

Commit

Permalink
Merge branch 'release/4.11.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
matiux committed Apr 24, 2024
2 parents 07f1fe9 + c16d90e commit e66bcc4
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,44 +36,64 @@ public function __construct(
}

public function consume(null|string $queue = null): null|Message
{
$messages = $this->doConsume($queue, 1);

return !empty($messages) ? current($messages) : null;
}

/** @return AWSMessage[] */
private function doConsume(null|string $queue = null, int $maxNumberOfMessages): array
{
$queue ??= $this->getQueueUrlFromConfig();

if (10 < $maxNumberOfMessages) {
throw new \InvalidArgumentException('Exceed max batch size of 10 messages');
}

$args = [
'QueueUrl' => $queue,
'AttributeNames' => $this->attributeNames,
'MessageAttributeNames' => $this->messageAttributeNames,
'MaxNumberOfMessages' => $maxNumberOfMessages,
];

$response = $this->getClient()->receiveMessage($args);

return $this->extractSqsMessageFromResponse($response);
return $this->extractSqsMessagesFromResponse($response);
}

private function extractSqsMessageFromResponse(Result $response): null|AWSMessage
/** @return AWSMessage[] */
private function extractSqsMessagesFromResponse(Result $response): array
{
if (empty($response['Messages'])) {
return null;
if (null === $response['Messages']) {
return [];
}

Assert::isArray($response['Messages']);
$message = (array) $response['Messages'][0];

[$body, $messageAttributes, $attributes] = $this->parseMessage($message);

$type = $this->extractType($messageAttributes);
$occurredAt = $this->extractOccurredAt($messageAttributes);

return $this->AWSMessageFactory->build(
body: $body,
occurredAt: $occurredAt,
type: $type,
id: (string) $message['ReceiptHandle'],
extra: [
'MessageAttributes' => $messageAttributes,
'Attributes' => $attributes,
],
);

$sqsMessages = [];

/** @var array $message */
foreach ($response['Messages'] as $message) {
[$body, $messageAttributes, $attributes] = $this->parseMessage($message);

$type = $this->extractType($messageAttributes);
$occurredAt = $this->extractOccurredAt($messageAttributes);

$sqsMessages[] = $this->AWSMessageFactory->build(
body: $body,
occurredAt: $occurredAt,
type: $type,
id: (string) $message['ReceiptHandle'],
extra: [
'MessageAttributes' => $messageAttributes,
'Attributes' => $attributes,
],
);
}

return $sqsMessages;
}

/**
Expand Down Expand Up @@ -163,9 +183,9 @@ private function extractOccurredAt(array $messageAttributes): null|\DateTimeImmu
/**
* {@inheritDoc}
*/
public function consumeBatch(): array
public function consumeBatch(null|string $queue = null, int $maxNumberOfMessages = 1): array
{
throw new \BadMethodCallException();
return $this->doConsume($queue, $maxNumberOfMessages);
}

public function delete(string $messageId, null|string $queue = null): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public function consume(null|string $queue = null): null|Message
*
* @codeCoverageIgnore
*/
public function consumeBatch(): array
public function consumeBatch(null|string $queue = null, int $maxNumberOfMessages = 1): array
{
return [];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,5 @@ public function delete(string $messageId, null|string $queue = null): void

public function deleteBatch(\ArrayObject $messagesId): void {}

public function consumeBatch(): array {}
public function consumeBatch(null|string $queue = null, int $maxNumberOfMessages = 1): array {}
}
2 changes: 1 addition & 1 deletion src/Matiux/DDDStarterPack/Message/MessageConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public function consume(null|string $queue = null): null|Message;
/**
* @return Message[]
*/
public function consumeBatch(): array;
public function consumeBatch(null|string $queue = null, int $maxNumberOfMessages = 1): array;

public function delete(string $messageId, null|string $queue = null): void;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class SQSMessageConsumerTest extends TestCase
use SnsRawClient;

private AWSMessage $message;
private AWSMessage $message2;
private MessageConsumer $messageConsumer;
private MessageProducer $messageProducer;
private \DateTimeImmutable $occurredAt;
Expand All @@ -47,6 +48,21 @@ public function setUp(): void
]),
occurredAt: $this->occurredAt,
type: 'MyType',
id: Uuid::uuid4()->toString(),
extra: [
'MessageGroupId' => Uuid::uuid4()->toString(),
'MessageDeduplicationId' => Uuid::uuid4()->toString(),
],
);

$this->message2 = new AWSMessage(
body: json_encode([
'Foo' => 'Bar',
'occurredAt' => $this->occurredAt->format(\DateTimeInterface::RFC3339_EXTENDED),
]),
occurredAt: null,
type: null,
id: Uuid::uuid4()->toString(),
extra: [
'MessageGroupId' => Uuid::uuid4()->toString(),
'MessageDeduplicationId' => Uuid::uuid4()->toString(),
Expand All @@ -63,7 +79,7 @@ public function tearDown(): void
* @test
*
* @group sqs
* @group producer
* @group consumer
*/
public function message_consumer_can_receive_message(): void
{
Expand Down Expand Up @@ -98,24 +114,11 @@ public function message_consumer_can_receive_message(): void
* @test
*
* @group sqs
* @group producer
* @group consumer
*/
public function message_consumer_can_receive_message_without_message_attributes(): void
{
$awsMessage = new AWSMessage(
body: json_encode([
'Foo' => 'Bar',
'occurredAt' => $this->occurredAt->format(\DateTimeInterface::RFC3339_EXTENDED),
]),
occurredAt: null,
type: null,
extra: [
'MessageGroupId' => Uuid::uuid4()->toString(),
'MessageDeduplicationId' => Uuid::uuid4()->toString(),
],
);

$this->messageProducer->send($awsMessage);
$this->messageProducer->send($this->message2);

$message = $this->messageConsumer->consume();

Expand All @@ -141,7 +144,7 @@ public function message_consumer_can_receive_message_without_message_attributes(
* @test
*
* @group sqs
* @group producer
* @group consumer
*/
public function message_consumer_can_delete_message(): void
{
Expand All @@ -161,4 +164,38 @@ public function message_consumer_can_delete_message(): void

self::assertNull($message);
}

/**
* @test
*
* @group sqs
* @group consumer
*/
public function message_consumer_can_receive_multiple_messages(): void
{
$this->messageProducer->sendBatch([$this->message, $this->message2]);

$messages = $this->messageConsumer->consumeBatch(maxNumberOfMessages: 10);

self::assertCount(2, $messages);
self::assertContainsOnlyInstancesOf(AWSMessage::class, $messages);

foreach ($messages as $message) {
$this->deleteMessage((string) $message->id());
}
}

/**
* @test
*
* @group sqs
* @group consumer
*/
public function message_consumer_cannot_receive_more_than_10_messages_in_a_single_batch(): void
{
self::expectException(\InvalidArgumentException::class);
self::expectExceptionMessage('Exceed max batch size of 10 messages');

$this->messageConsumer->consumeBatch(maxNumberOfMessages: 11);
}
}

0 comments on commit e66bcc4

Please sign in to comment.