diff --git a/src/Middleware/Retry/NonBlockingRetryMiddleware.php b/src/Middleware/Retry/NonBlockingRetryMiddleware.php new file mode 100644 index 0000000..4ff3b1d --- /dev/null +++ b/src/Middleware/Retry/NonBlockingRetryMiddleware.php @@ -0,0 +1,172 @@ +exchangeRegistry = $exchangeRegistry; + $this->logger = $logger; + + // Initialise the message options resolver + $this->messageOptionsResolver = $messageOptionsResolver ?: new OptionsResolver(); + $this->configureMessageOptions($this->messageOptionsResolver); + } + + /** + * @inheritDoc + */ + public function configureOptions(OptionsResolver $resolver) + { + $resolver->setRequired(['exchange']); + } + + /** + * @inheritDoc + */ + public function setOptions(array $options) + { + $this->exchange = $this->exchangeRegistry->get($options['exchange']); + } + + /** + * @param OptionsResolver $resolver + */ + private function configureMessageOptions(OptionsResolver $resolver) + { + $resolver->setDefaults(['max_attempts' => 10]); + $resolver->setAllowedTypes('max_attempts', 'int'); + } + + /** + * @param Message $message + * @return array + */ + private function getMessageOptions(Message $message) + { + $options = $message->getHeader('retry_options', []); + return $this->messageOptionsResolver->resolve($options); + } + + /** + * Exponential backoff liberated from Sidekiq + * https://github.com/mperham/sidekiq/blob/v3.3.4/lib/sidekiq/middleware/server/retry_jobs.rb#L179 + * + * @param int $attempts + * @return int The number of seconds to wait until next attempt + */ + private function getBackoffPeriod($attempts) + { + return (pow($attempts, 4) + 15 + (rand(1, 30) * ($attempts + 1))); + } + + /** + * @param Message $message + */ + private function sendMessageToRetryQueue(Message $message) + { + // Remove the original x-death header that interferes with the retry + $message->removeHeader('x-death'); + + $message->setExpiration(60000); // tell RabbitMQ to move message back to normal queue after one minute + + // Republish the message onto the retry exchange + $flags = $message->isMandatory() ? AMQP_MANDATORY : AMQP_NOPARAM; + $this->exchange->publish($message->getBody(), $message->getRoutingKey(), $flags, $message->getAttributes()); + } + + /** + * @inheritDoc + */ + public function __invoke(Message $message, Queue $queue, callable $next) + { + $options = $this->getMessageOptions($message); + + $attempts = (int) $message->getHeader('retry_attempts', 0); + + // For BC - set the retry_at header if the message has already been on the retry queue prior to release + // if we've got here it will be because the old expiry has been reached so set retry_at to current time + if ($attempts > 0 && $message->getHeader('retry_at') === null) { + $message->setHeader('retry_at', time()); + } + + // Check to see if we're ready to process the message, will be true if either the message isn't retryable + // or if it is and the retry_at timestamp has been reached + $shouldProcess = (int)$options['max_attempts'] === 0 || (int)$message->getHeader('retry_at', 0) <= time(); + if (!$shouldProcess) { + $this->sendMessageToRetryQueue($message); + return true; + } + + try { + return $next($message, $queue); + } catch (Exception $exception) { + if ($attempts < $options['max_attempts']) { + // Increment the retry attempt counter + $message->setHeader('retry_attempts', ++$attempts); + + // Set a custom header to indicate when we want to attempt to processing next. + // If this hasn't been reached, we'll put the message back on the retry queue. + $backoffPeriod = $this->getBackoffPeriod($attempts); + $message->setHeader('retry_at', time() + $backoffPeriod); + + $this->sendMessageToRetryQueue($message); + + if ($this->logger) { + $this->logger->info( + sprintf('Retrying message in %d seconds (attempt %d)', $backoffPeriod, $attempts) + ); + } + } elseif ($options['max_attempts'] > 0 && $this->logger) { + $this->logger->critical(sprintf('Failed to process message after %d retries', $attempts)); + } + + throw $exception; + } + } +} diff --git a/tests/Middleware/Retry/NonBlockingRetryMiddlewareTest.php b/tests/Middleware/Retry/NonBlockingRetryMiddlewareTest.php new file mode 100644 index 0000000..d95f940 --- /dev/null +++ b/tests/Middleware/Retry/NonBlockingRetryMiddlewareTest.php @@ -0,0 +1,379 @@ +exchange = Mockery::mock('Radish\Broker\Exchange', [ + 'publish' => null, + ]); + $this->exchangeRegistry = Mockery::mock('Radish\Broker\ExchangeRegistry', [ + 'get' => $this->exchange, + ]); + $this->logger = Mockery::mock('Psr\Log\LoggerInterface', [ + 'info' => null, + 'critical' => null, + ]); + + $this->middleware = new NonBlockingRetryMiddleware($this->exchangeRegistry, $this->logger); + $this->middleware->setOptions([ + 'exchange' => 'test', + ]); + } + + /** + * @dataProvider returnProvider + */ + public function testWhenNoExceptions($return) + { + $message = Mockery::mock('Radish\Broker\Message'); + $message->shouldReceive('getHeader') + ->andReturnUsing(function ($name, $default) { + return $default; + }); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () use ($return) { + return $return; + }; + $middleware = $this->middleware; + + $this->assertEquals($return, $middleware($message, $queue, $next)); + } + + public function returnProvider() + { + return [ + [true], + [false] + ]; + } + + public function testSetsRetryAtHeaderForMessagesAlreadyInRetryQueue() + { + $currentTimestamp = time(); + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_attempts', 3); + $message->setHeader('retry_options', [ + 'max_attempts' => 5 + ]); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + return true; + }; + $middleware = $this->middleware; + + $this->exchange->shouldReceive('publish')->never(); + + $middleware($message, $queue, $next); + + $this->assertArrayHasKey('retry_at', $message->getHeaders()); + $this->assertGreaterThanOrEqual($currentTimestamp, $message->getHeader('retry_at')); + } + + /** + * @expectedException RuntimeException + */ + public function testRepublishesMessageWithFixedExpiration() + { + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_options', [ + 'max_attempts' => 5 + ]); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + $this->exchange->shouldReceive('publish') + ->with('body', 'key', AMQP_NOPARAM, Mockery::on(function ($attributes) { + return isset($attributes['expiration']) && $attributes['expiration'] === 60000; + })) + ->once(); + + $middleware($message, $queue, $next); + } + + /** + * @expectedException RuntimeException + */ + public function testRepublishesMessageWithRetryHeader() + { + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_options', [ + 'max_attempts' => 5 + ]); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + $this->exchange->shouldReceive('publish') + ->with('body', 'key', AMQP_NOPARAM, Mockery::on(function ($attributes) { + return isset($attributes['headers']['retry_at']); + })) + ->once(); + + $middleware($message, $queue, $next); + } + + /** + * @expectedException RuntimeException + */ + public function testRepublishesMessageWithIncreasedRetryInterval() + { + $currentTimestamp = time(); + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_attempts', 3); + $message->setHeader('retry_at', $currentTimestamp); + $message->setHeader('retry_options', [ + 'max_attempts' => 5 + ]); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + $this->exchange->shouldReceive('publish') + ->with('body', 'key', AMQP_NOPARAM, Mockery::on(function ($attributes) use ($currentTimestamp) { + return $attributes['headers']['retry_at'] >= $currentTimestamp + 276 && //lower boundary of back-off calculation + $attributes['headers']['retry_at'] <= $currentTimestamp + 421; //upper boundary of back-off calculation + })) + ->once(); + + $middleware($message, $queue, $next); + } + + public function testRepublishesMessageWithoutIncreasingRetryIntervalOrRetryCountIfNotReadyToBeProcessed() + { + $currentTimestamp = time(); + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_attempts', 3); + $message->setHeader('retry_at', $currentTimestamp *2); // set far in the future so won't get processed + $message->setHeader('retry_options', [ + 'max_attempts' => 5 + ]); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + $this->exchange->shouldReceive('publish') + ->with('body', 'key', AMQP_NOPARAM, Mockery::on(function ($attributes) use ($currentTimestamp) { + return $attributes['headers']['retry_attempts'] === 3 && + $attributes['headers']['retry_at'] === $currentTimestamp * 2; + })) + ->once(); + + $middleware($message, $queue, $next); + } + + public function testRepublishesMessageWithoutLoggingIfNotReadyToBeProcessed() + { + $currentTimestamp = time(); + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_attempts', 3); + $message->setHeader('retry_at', $currentTimestamp *2); // set far in the future so won't get processed + $message->setHeader('retry_options', [ + 'max_attempts' => 5 + ]); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + $this->logger->shouldReceive('info')->never(); + + $middleware($message, $queue, $next); + } + + /** + * @expectedException RuntimeException + */ + public function testRemovesXDeathHeaderBeforeRepublishing() + { + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_options', [ + 'max_attempts' => 5 + ]); + $message->setHeader('x-death', []); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + $this->exchange->shouldReceive('publish') + ->with('body', 'key', AMQP_NOPARAM, Mockery::on(function ($attributes) { + return !isset($attributes['headers']['x-death']); + })) + ->once(); + + $middleware($message, $queue, $next); + } + + /** + * @expectedException RuntimeException + */ + public function testSetsRetryAttemptHeader() + { + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_options', [ + 'max_attempts' => 5 + ]); + $message->setHeader('x-death', []); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + $this->exchange->shouldReceive('publish') + ->with('body', 'key', AMQP_NOPARAM, Mockery::on(function ($attributes) { + return isset($attributes['headers']['retry_attempts']) && $attributes['headers']['retry_attempts'] === 1; + })) + ->once(); + + $middleware($message, $queue, $next); + } + + public function retryAttemptsDataProvider() + { + // headers, shouldRepublish + return [ + [['retry_attempts' => 5, 'retry_options'=> ['max_attempts' => 5]], false], + [['retry_attempts' => 3, 'retry_options'=> ['max_attempts' => 5]], true], + [['retry_attempts' => 10], false], // max_attempts defaults to 10 + [['retry_attempts' => 9], true], // max_attempts defaults to 10 + [['retry_options' => ['max_attempts' => 0]], false], + ]; + } + + /** + * @dataProvider retryAttemptsDataProvider + * @expectedException RuntimeException + */ + public function testRetryLimitsAreRespected($headers, $shouldRepublish) + { + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeaders($headers); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + if ($shouldRepublish) { + $this->exchange->shouldReceive('publish') + ->with('body', 'key', AMQP_NOPARAM, Mockery::on(function ($attributes) use ($headers) { + return $attributes['headers']['retry_attempts'] === $headers['retry_attempts'] + 1; + })) + ->once(); + } else { + $this->exchange->shouldReceive('publish') + ->never(); + } + + $middleware($message, $queue, $next); + } + + /** + * @expectedException RuntimeException + */ + public function testCriticalErrorIsLoggedWhenRetryLimitReached() + { + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_attempts', 5); + $message->setHeader('retry_options', ['max_attempts' => 5]); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + $this->logger->shouldReceive('critical') + ->once() + ->with('Failed to process message after 5 retries'); + + $middleware($message, $queue, $next); + } + + /** + * @expectedException RuntimeException + */ + public function testInfoMessageIsLoggedWhenMessageRequeuedDueToRetryFailure() + { + $message = new Message(); + $message->setBody('body'); + $message->setRoutingKey('key'); + $message->setHeader('retry_attempts', 4); + $message->setHeader('retry_options', ['max_attempts' => 5]); + $queue = Mockery::mock('Radish\Broker\Queue'); + $next = function () { + throw new \RuntimeException(); + }; + $middleware = $this->middleware; + + $this->logger->shouldReceive('info') + ->once() + ->with(Mockery::on( + function ($message) { + return preg_match('/Retrying message in \d+ seconds \(attempt 5\)/', $message) === 1; + } + + )); + + $middleware($message, $queue, $next); + } +}