From d3e54d1ec443e4cd431c3a8abcd194d580eb3fa8 Mon Sep 17 00:00:00 2001 From: Arman Yeghiazaryan Date: Wed, 30 Oct 2024 04:42:04 +0400 Subject: [PATCH] Fix SQS Error Handling --- src/ServiceBusSQSChannel.php | 65 +++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 12 deletions(-) diff --git a/src/ServiceBusSQSChannel.php b/src/ServiceBusSQSChannel.php index ed51a45..c68eab3 100644 --- a/src/ServiceBusSQSChannel.php +++ b/src/ServiceBusSQSChannel.php @@ -2,6 +2,7 @@ namespace Ringierimu\ServiceBusNotificationsChannel; +use Aws\Exception\AwsException; use Aws\Sqs\SqsClient; use Illuminate\Notifications\Notification; use Illuminate\Support\Arr; @@ -11,10 +12,18 @@ class ServiceBusSQSChannel { protected SqsClient $sqs; + protected array $config; + + protected bool $hasAttemptedRefresh = false; + public function __construct(array $config = []) { $this->config = $config ?: config('services.service_bus'); + $this->initializeSqsClient(); + } + protected function initializeSqsClient(): void + { $this->sqs = new SqsClient([ 'region' => Arr::get($this->config, 'sqs.region'), 'version' => 'latest', @@ -25,7 +34,7 @@ public function __construct(array $config = []) ]); } - public function send($notifiable, Notification $notification) + public function send($notifiable, Notification $notification): void { /** @var ServiceBusEvent $event */ $event = $notification->toServiceBus($notifiable); @@ -50,21 +59,53 @@ public function send($notifiable, Notification $notification) return; } - $message = $notification - ->toServiceBus($notifiable) - ->getParams(); + $message = $event->getParams(); - $response = $this->sqs->sendMessage([ + if (!isset($message['from'], $message['events'][0])) { + Log::error('Invalid payload structure', ['message' => $message]); + return; + } + + $params = [ 'QueueUrl' => Arr::get($this->config, 'sqs.queue_url'), 'MessageBody' => json_encode($message), - 'MessageGroupId' => $message['from'], - ]); + ]; - $event = $message['events'][0]; + try { + $response = $this->sqs->sendMessage($params); - Log::info("{$event} sent to bus queue", [ - 'message_id' => $response->get('MessageId'), - 'message' => $message, - ]); + $eventName = $message['events'][0]; + + Log::info("{$eventName} sent to bus queue", [ + 'message_id' => $response->get('MessageId'), + 'event' => $eventName, + ]); + } catch (AwsException $exception) { + $code = $exception->getAwsErrorCode(); + + if (in_array($code, ['ExpiredToken', 'UnrecognizedClientException', 'InvalidClientTokenId'])) { + Log::info("$code received. Refreshing credentials and retrying.", [ + 'event' => $eventType, + 'params' => $message, + 'aws_error_code' => $code, + 'aws_error_message' => $exception->getAwsErrorMessage(), + 'tags' => ['service-bus'], + ]); + + if (!$this->hasAttemptedRefresh) { + $this->hasAttemptedRefresh = true; + + $this->initializeSqsClient(); + + $this->send($notifiable, $notification); + } else { + $this->hasAttemptedRefresh = false; + + throw new \Exception('Authentication failed after retrying.', 0, $exception); + } + } else { + throw $exception; + } + } } }