diff --git a/src/ServiceBusSQSChannel.php b/src/ServiceBusSQSChannel.php index ed51a45..544013c 100644 --- a/src/ServiceBusSQSChannel.php +++ b/src/ServiceBusSQSChannel.php @@ -2,6 +2,7 @@ namespace Ringierimu\ServiceBusNotificationsChannel; +use Aws\Exception\AwsException; use Aws\Sqs\SqsClient; use Illuminate\Notifications\Notification; use Illuminate\Support\Arr; @@ -11,12 +12,20 @@ class ServiceBusSQSChannel { protected SqsClient $sqs; + protected array $config; + + protected bool $hasAttemptedRefresh = false; + public function __construct(array $config = []) { $this->config = $config ?: config('services.service_bus'); + $this->initializeSqsClient(); + } + protected function initializeSqsClient(): void + { $this->sqs = new SqsClient([ - 'region' => Arr::get($this->config, 'sqs.region'), + 'region' => Arr::get($this->config, 'sqs.region', 'eu-west-1'), 'version' => 'latest', 'credentials' => [ 'key' => Arr::get($this->config, 'sqs.key'), @@ -25,7 +34,7 @@ public function __construct(array $config = []) ]); } - public function send($notifiable, Notification $notification) + public function send($notifiable, Notification $notification): void { /** @var ServiceBusEvent $event */ $event = $notification->toServiceBus($notifiable); @@ -50,21 +59,68 @@ public function send($notifiable, Notification $notification) return; } - $message = $notification - ->toServiceBus($notifiable) - ->getParams(); + if (!isset($params['from'], $params['events'][0])) { + Log::error('Invalid message structure', ['params' => $params]); + return; + } - $response = $this->sqs->sendMessage([ - 'QueueUrl' => Arr::get($this->config, 'sqs.queue_url'), - 'MessageBody' => json_encode($message), - 'MessageGroupId' => $message['from'], - ]); + $queueUrl = Arr::get($this->config, 'sqs.queue_url'); + $isFifoQueue = strpos($queueUrl, '.fifo') !== false; - $event = $message['events'][0]; + $payloadSqs = [ + 'QueueUrl' => $queueUrl, + 'MessageBody' => json_encode($params), + ]; - Log::info("{$event} sent to bus queue", [ - 'message_id' => $response->get('MessageId'), - 'message' => $message, - ]); + if ($isFifoQueue) { + $payloadSqs['MessageGroupId'] = $params['from']; + $payloadSqs['MessageDeduplicationId'] = md5(json_encode($params)); + } + + $this->sendMessageToSqs($payloadSqs, $eventType, $params, $dontReport); + } + + protected function sendMessageToSqs(array $payloadSqs, string $eventType, array $params, array $dontReport): void + { + try { + $response = $this->sqs->sendMessage($payloadSqs); + + $eventName = $params['events'][0]; + + if (!in_array($eventType, $dontReport)) { + Log::info("{$eventName} sent to bus queue", [ + 'message_id' => $response->get('MessageId'), + 'params' => $params, + ]); + } + + $this->hasAttemptedRefresh = false; + } catch (AwsException $exception) { + $code = $exception->getAwsErrorCode(); + + if (in_array($code, ['ExpiredToken', 'UnrecognizedClientException', 'InvalidClientTokenId'])) { + Log::info("$code received. Refreshing credentials and retrying.", [ + 'event' => $eventType, + 'params' => $params, + 'aws_error_code' => $code, + 'aws_error_message' => $exception->getAwsErrorMessage(), + 'tags' => ['service-bus'], + ]); + + if (!$this->hasAttemptedRefresh) { + $this->hasAttemptedRefresh = true; + + $this->initializeSqsClient(); + + $this->sendMessageToSqs($payloadSqs, $eventType, $params, $dontReport); + } else { + $this->hasAttemptedRefresh = false; + + throw new \Exception('Authentication failed after retrying.', 0, $exception); + } + } else { + throw $exception; + } + } } }