Skip to content

Commit

Permalink
Fix SQS Error Handling
Browse files Browse the repository at this point in the history
  • Loading branch information
otanim committed Oct 30, 2024
1 parent ddf27c6 commit d3e54d1
Showing 1 changed file with 53 additions and 12 deletions.
65 changes: 53 additions & 12 deletions src/ServiceBusSQSChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Ringierimu\ServiceBusNotificationsChannel;

use Aws\Exception\AwsException;
use Aws\Sqs\SqsClient;
use Illuminate\Notifications\Notification;
use Illuminate\Support\Arr;
Expand All @@ -11,10 +12,18 @@ class ServiceBusSQSChannel
{
protected SqsClient $sqs;

protected array $config;

protected bool $hasAttemptedRefresh = false;

public function __construct(array $config = [])
{
$this->config = $config ?: config('services.service_bus');
$this->initializeSqsClient();
}

protected function initializeSqsClient(): void
{
$this->sqs = new SqsClient([
'region' => Arr::get($this->config, 'sqs.region'),
'version' => 'latest',
Expand All @@ -25,7 +34,7 @@ public function __construct(array $config = [])
]);
}

public function send($notifiable, Notification $notification)
public function send($notifiable, Notification $notification): void
{
/** @var ServiceBusEvent $event */
$event = $notification->toServiceBus($notifiable);
Expand All @@ -50,21 +59,53 @@ public function send($notifiable, Notification $notification)
return;
}

$message = $notification
->toServiceBus($notifiable)
->getParams();
$message = $event->getParams();

$response = $this->sqs->sendMessage([
if (!isset($message['from'], $message['events'][0])) {
Log::error('Invalid payload structure', ['message' => $message]);
return;
}

$params = [
'QueueUrl' => Arr::get($this->config, 'sqs.queue_url'),
'MessageBody' => json_encode($message),
'MessageGroupId' => $message['from'],
]);
];

$event = $message['events'][0];
try {
$response = $this->sqs->sendMessage($params);

Log::info("{$event} sent to bus queue", [
'message_id' => $response->get('MessageId'),
'message' => $message,
]);
$eventName = $message['events'][0];

Log::info("{$eventName} sent to bus queue", [
'message_id' => $response->get('MessageId'),
'event' => $eventName,
]);
} catch (AwsException $exception) {
$code = $exception->getAwsErrorCode();

if (in_array($code, ['ExpiredToken', 'UnrecognizedClientException', 'InvalidClientTokenId'])) {
Log::info("$code received. Refreshing credentials and retrying.", [
'event' => $eventType,
'params' => $message,
'aws_error_code' => $code,
'aws_error_message' => $exception->getAwsErrorMessage(),
'tags' => ['service-bus'],
]);

if (!$this->hasAttemptedRefresh) {
$this->hasAttemptedRefresh = true;

$this->initializeSqsClient();

$this->send($notifiable, $notification);
} else {
$this->hasAttemptedRefresh = false;

throw new \Exception('Authentication failed after retrying.', 0, $exception);
}
} else {
throw $exception;
}
}
}
}

0 comments on commit d3e54d1

Please sign in to comment.