From d3e54d1ec443e4cd431c3a8abcd194d580eb3fa8 Mon Sep 17 00:00:00 2001 From: Arman Yeghiazaryan Date: Wed, 30 Oct 2024 04:42:04 +0400 Subject: [PATCH 1/5] Fix SQS Error Handling --- src/ServiceBusSQSChannel.php | 65 +++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 12 deletions(-) diff --git a/src/ServiceBusSQSChannel.php b/src/ServiceBusSQSChannel.php index ed51a45..c68eab3 100644 --- a/src/ServiceBusSQSChannel.php +++ b/src/ServiceBusSQSChannel.php @@ -2,6 +2,7 @@ namespace Ringierimu\ServiceBusNotificationsChannel; +use Aws\Exception\AwsException; use Aws\Sqs\SqsClient; use Illuminate\Notifications\Notification; use Illuminate\Support\Arr; @@ -11,10 +12,18 @@ class ServiceBusSQSChannel { protected SqsClient $sqs; + protected array $config; + + protected bool $hasAttemptedRefresh = false; + public function __construct(array $config = []) { $this->config = $config ?: config('services.service_bus'); + $this->initializeSqsClient(); + } + protected function initializeSqsClient(): void + { $this->sqs = new SqsClient([ 'region' => Arr::get($this->config, 'sqs.region'), 'version' => 'latest', @@ -25,7 +34,7 @@ public function __construct(array $config = []) ]); } - public function send($notifiable, Notification $notification) + public function send($notifiable, Notification $notification): void { /** @var ServiceBusEvent $event */ $event = $notification->toServiceBus($notifiable); @@ -50,21 +59,53 @@ public function send($notifiable, Notification $notification) return; } - $message = $notification - ->toServiceBus($notifiable) - ->getParams(); + $message = $event->getParams(); - $response = $this->sqs->sendMessage([ + if (!isset($message['from'], $message['events'][0])) { + Log::error('Invalid payload structure', ['message' => $message]); + return; + } + + $params = [ 'QueueUrl' => Arr::get($this->config, 'sqs.queue_url'), 'MessageBody' => json_encode($message), - 'MessageGroupId' => $message['from'], - ]); + ]; - $event = $message['events'][0]; + try { + $response = $this->sqs->sendMessage($params); - Log::info("{$event} sent to bus queue", [ - 'message_id' => $response->get('MessageId'), - 'message' => $message, - ]); + $eventName = $message['events'][0]; + + Log::info("{$eventName} sent to bus queue", [ + 'message_id' => $response->get('MessageId'), + 'event' => $eventName, + ]); + } catch (AwsException $exception) { + $code = $exception->getAwsErrorCode(); + + if (in_array($code, ['ExpiredToken', 'UnrecognizedClientException', 'InvalidClientTokenId'])) { + Log::info("$code received. Refreshing credentials and retrying.", [ + 'event' => $eventType, + 'params' => $message, + 'aws_error_code' => $code, + 'aws_error_message' => $exception->getAwsErrorMessage(), + 'tags' => ['service-bus'], + ]); + + if (!$this->hasAttemptedRefresh) { + $this->hasAttemptedRefresh = true; + + $this->initializeSqsClient(); + + $this->send($notifiable, $notification); + } else { + $this->hasAttemptedRefresh = false; + + throw new \Exception('Authentication failed after retrying.', 0, $exception); + } + } else { + throw $exception; + } + } } } From 8fe7bba384f39d86c645fa738d0cc1f8968ba30d Mon Sep 17 00:00:00 2001 From: Arman Yeghiazaryan Date: Wed, 30 Oct 2024 04:48:34 +0400 Subject: [PATCH 2/5] applied fixes --- src/ServiceBusSQSChannel.php | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/ServiceBusSQSChannel.php b/src/ServiceBusSQSChannel.php index c68eab3..f59c356 100644 --- a/src/ServiceBusSQSChannel.php +++ b/src/ServiceBusSQSChannel.php @@ -39,7 +39,7 @@ public function send($notifiable, Notification $notification): void /** @var ServiceBusEvent $event */ $event = $notification->toServiceBus($notifiable); $eventType = $event->getEventType(); - $params = $event->getParams(); + $message = $event->getParams(); $dontReport = Arr::get($this->config, 'dont_report', []); if (Arr::get($this->config, 'enabled') == false) { @@ -48,7 +48,7 @@ public function send($notifiable, Notification $notification): void "$eventType service bus notification [disabled]", [ 'event' => $eventType, - 'params' => $params, + 'params' => $message, 'tags' => [ 'service-bus', ], @@ -59,18 +59,24 @@ public function send($notifiable, Notification $notification): void return; } - $message = $event->getParams(); - if (!isset($message['from'], $message['events'][0])) { - Log::error('Invalid payload structure', ['message' => $message]); + Log::error('Invalid message structure', ['message' => $message]); return; } + $queueUrl = Arr::get($this->config, 'sqs.queue_url'); + $isFifoQueue = strpos($queueUrl, '.fifo') !== false; + $params = [ - 'QueueUrl' => Arr::get($this->config, 'sqs.queue_url'), + 'QueueUrl' => $queueUrl, 'MessageBody' => json_encode($message), ]; + if ($isFifoQueue) { + $params['MessageGroupId'] = $message['from']; + $params['MessageDeduplicationId'] = md5(json_encode($message)); + } + try { $response = $this->sqs->sendMessage($params); @@ -80,6 +86,8 @@ public function send($notifiable, Notification $notification): void 'message_id' => $response->get('MessageId'), 'event' => $eventName, ]); + + $this->hasAttemptedRefresh = false; } catch (AwsException $exception) { $code = $exception->getAwsErrorCode(); From df623d28f801b8962b4c2d5addb265c0f29c9e73 Mon Sep 17 00:00:00 2001 From: Arman Yeghiazaryan Date: Wed, 30 Oct 2024 04:56:59 +0400 Subject: [PATCH 3/5] cleanup --- src/ServiceBusSQSChannel.php | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/ServiceBusSQSChannel.php b/src/ServiceBusSQSChannel.php index f59c356..188e4e7 100644 --- a/src/ServiceBusSQSChannel.php +++ b/src/ServiceBusSQSChannel.php @@ -39,7 +39,7 @@ public function send($notifiable, Notification $notification): void /** @var ServiceBusEvent $event */ $event = $notification->toServiceBus($notifiable); $eventType = $event->getEventType(); - $message = $event->getParams(); + $params = $event->getParams(); $dontReport = Arr::get($this->config, 'dont_report', []); if (Arr::get($this->config, 'enabled') == false) { @@ -48,7 +48,7 @@ public function send($notifiable, Notification $notification): void "$eventType service bus notification [disabled]", [ 'event' => $eventType, - 'params' => $message, + 'params' => $params, 'tags' => [ 'service-bus', ], @@ -59,28 +59,28 @@ public function send($notifiable, Notification $notification): void return; } - if (!isset($message['from'], $message['events'][0])) { - Log::error('Invalid message structure', ['message' => $message]); + if (!isset($params['from'], $params['events'][0])) { + Log::error('Invalid message structure', ['params' => $params]); return; } $queueUrl = Arr::get($this->config, 'sqs.queue_url'); $isFifoQueue = strpos($queueUrl, '.fifo') !== false; - $params = [ + $payloadSqs = [ 'QueueUrl' => $queueUrl, - 'MessageBody' => json_encode($message), + 'MessageBody' => json_encode($params), ]; if ($isFifoQueue) { - $params['MessageGroupId'] = $message['from']; - $params['MessageDeduplicationId'] = md5(json_encode($message)); + $payloadSqs['MessageGroupId'] = $params['from']; + $payloadSqs['MessageDeduplicationId'] = md5(json_encode($params$)); } try { - $response = $this->sqs->sendMessage($params); + $response = $this->sqs->sendMessage($payloadSqs); - $eventName = $message['events'][0]; + $eventName = $params['events'][0]; Log::info("{$eventName} sent to bus queue", [ 'message_id' => $response->get('MessageId'), @@ -94,7 +94,7 @@ public function send($notifiable, Notification $notification): void if (in_array($code, ['ExpiredToken', 'UnrecognizedClientException', 'InvalidClientTokenId'])) { Log::info("$code received. Refreshing credentials and retrying.", [ 'event' => $eventType, - 'params' => $message, + 'params' => $params, 'aws_error_code' => $code, 'aws_error_message' => $exception->getAwsErrorMessage(), 'tags' => ['service-bus'], From e3174ae749a859606b989c7ca279d082bc39455b Mon Sep 17 00:00:00 2001 From: Arman Yeghiazaryan Date: Wed, 30 Oct 2024 04:59:40 +0400 Subject: [PATCH 4/5] cleanup --- src/ServiceBusSQSChannel.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ServiceBusSQSChannel.php b/src/ServiceBusSQSChannel.php index 188e4e7..ecf706a 100644 --- a/src/ServiceBusSQSChannel.php +++ b/src/ServiceBusSQSChannel.php @@ -74,7 +74,7 @@ public function send($notifiable, Notification $notification): void if ($isFifoQueue) { $payloadSqs['MessageGroupId'] = $params['from']; - $payloadSqs['MessageDeduplicationId'] = md5(json_encode($params$)); + $payloadSqs['MessageDeduplicationId'] = md5(json_encode($params)); } try { @@ -84,7 +84,7 @@ public function send($notifiable, Notification $notification): void Log::info("{$eventName} sent to bus queue", [ 'message_id' => $response->get('MessageId'), - 'event' => $eventName, + 'params' => $params, ]); $this->hasAttemptedRefresh = false; From d6a21d30e013e1a59ab72ebd6c6e30769b278edf Mon Sep 17 00:00:00 2001 From: Arman Yeghiazaryan Date: Thu, 31 Oct 2024 01:38:58 +0400 Subject: [PATCH 5/5] applied requested changes --- src/ServiceBusSQSChannel.php | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/ServiceBusSQSChannel.php b/src/ServiceBusSQSChannel.php index ecf706a..544013c 100644 --- a/src/ServiceBusSQSChannel.php +++ b/src/ServiceBusSQSChannel.php @@ -25,7 +25,7 @@ public function __construct(array $config = []) protected function initializeSqsClient(): void { $this->sqs = new SqsClient([ - 'region' => Arr::get($this->config, 'sqs.region'), + 'region' => Arr::get($this->config, 'sqs.region', 'eu-west-1'), 'version' => 'latest', 'credentials' => [ 'key' => Arr::get($this->config, 'sqs.key'), @@ -77,15 +77,22 @@ public function send($notifiable, Notification $notification): void $payloadSqs['MessageDeduplicationId'] = md5(json_encode($params)); } + $this->sendMessageToSqs($payloadSqs, $eventType, $params, $dontReport); + } + + protected function sendMessageToSqs(array $payloadSqs, string $eventType, array $params, array $dontReport): void + { try { $response = $this->sqs->sendMessage($payloadSqs); $eventName = $params['events'][0]; - Log::info("{$eventName} sent to bus queue", [ - 'message_id' => $response->get('MessageId'), - 'params' => $params, - ]); + if (!in_array($eventType, $dontReport)) { + Log::info("{$eventName} sent to bus queue", [ + 'message_id' => $response->get('MessageId'), + 'params' => $params, + ]); + } $this->hasAttemptedRefresh = false; } catch (AwsException $exception) { @@ -105,7 +112,7 @@ public function send($notifiable, Notification $notification): void $this->initializeSqsClient(); - $this->send($notifiable, $notification); + $this->sendMessageToSqs($payloadSqs, $eventType, $params, $dontReport); } else { $this->hasAttemptedRefresh = false;